aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/vnet/session/application.c3
-rw-r--r--src/vnet/session/session.c30
-rw-r--r--src/vnet/session/session.h30
-rw-r--r--src/vnet/session/session_node.c117
4 files changed, 176 insertions, 4 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 16acc9c997d..a93e4b9dbbf 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -497,6 +497,9 @@ VLIB_NODE_FN (appsl_rx_mqs_input_node)
if (aw->pending_rx_mqs)
vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+ if (n_msgs && wrk->state == SESSION_WRK_INTERRUPT)
+ vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+
return n_msgs;
}
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index c24a95fd9a6..7513aa32ed8 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -28,11 +28,12 @@ static inline int
session_send_evt_to_thread (void *data, void *args, u32 thread_index,
session_evt_type_t evt_type)
{
+ session_worker_t *wrk = session_main_get_worker (thread_index);
session_event_t *evt;
svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
- mq = session_main_get_vpp_event_queue (thread_index);
+ mq = wrk->vpp_event_queue;
if (PREDICT_FALSE (svm_msg_q_lock (mq)))
return -1;
if (PREDICT_FALSE (svm_msg_q_is_full (mq)
@@ -72,6 +73,10 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
evt->event_type = evt_type;
svm_msg_q_add_and_unlock (mq, &msg);
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+
return 0;
}
@@ -121,19 +126,20 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
void
session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
{
- session_t *s;
+ session_t *s = session_get (tc->s_index, tc->thread_index);
- s = session_get (tc->s_index, tc->thread_index);
ASSERT (s->thread_index == vlib_get_thread_index ());
ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
+
if (!(s->flags & SESSION_F_CUSTOM_TX))
{
s->flags |= SESSION_F_CUSTOM_TX;
if (svm_fifo_set_event (s->tx_fifo)
|| transport_connection_is_descheduled (tc))
{
- session_worker_t *wrk;
session_evt_elt_t *elt;
+ session_worker_t *wrk;
+
wrk = session_main_get_worker (tc->thread_index);
if (has_prio)
elt = session_evt_alloc_new (wrk);
@@ -142,6 +148,10 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
elt->evt.session_index = tc->s_index;
elt->evt.event_type = SESSION_IO_EVT_TX;
tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm,
+ session_queue_node.index);
}
}
}
@@ -157,6 +167,9 @@ sesssion_reschedule_tx (transport_connection_t * tc)
elt = session_evt_alloc_new (wrk);
elt->evt.session_index = tc->s_index;
elt->evt.event_type = SESSION_IO_EVT_TX;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
static void
@@ -175,6 +188,9 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
clib_memset (&elt->evt, 0, sizeof (session_event_t));
elt->evt.session_handle = session_handle (s);
elt->evt.event_type = evt;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
else
session_send_ctrl_evt_to_thread (s, evt);
@@ -1693,6 +1709,9 @@ session_manager_main_enable (vlib_main_t * vm)
if (num_threads > 1)
clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
+
+ if (!smm->no_adaptive && smm->use_private_rx_mqs)
+ session_wrk_enable_adaptive_mode (wrk);
}
/* Allocate vpp event queues segment and queue */
@@ -1817,6 +1836,7 @@ session_main_init (vlib_main_t * vm)
smm->session_enable_asap = 0;
smm->poll_main = 0;
smm->use_private_rx_mqs = 0;
+ smm->no_adaptive = 0;
smm->session_baseva = HIGH_SEGMENT_BASEVA;
#if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1938,6 +1958,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
smm->poll_main = 1;
else if (unformat (input, "use-private-rx-mqs"))
smm->use_private_rx_mqs = 1;
+ else if (unformat (input, "no-adaptive"))
+ smm->no_adaptive = 1;
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 55863163a89..93278d6c3e1 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -69,6 +69,18 @@ typedef struct session_ctrl_evt_data_
u8 data[SESSION_CTRL_MSG_MAX_SIZE];
} session_evt_ctrl_data_t;
+typedef enum session_wrk_state_
+{
+ SESSION_WRK_POLLING,
+ SESSION_WRK_INTERRUPT,
+ SESSION_WRK_IDLE,
+} __clib_packed session_wrk_state_t;
+
+typedef enum session_wrk_flags_
+{
+ SESSION_WRK_F_ADAPTIVE = 1 << 0,
+} __clib_packed session_wrk_flag_t;
+
typedef struct session_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -91,6 +103,15 @@ typedef struct session_worker_
/** Per-proto vector of sessions to enqueue */
u32 **session_to_enqueue;
+ /** Timerfd used to periodically signal wrk session queue node */
+ u32 timerfd;
+
+ /** Worker flags */
+ session_wrk_flag_t flags;
+
+ /** Worker state */
+ session_wrk_state_t state;
+
/** Context for session tx */
session_tx_context_t ctx;
@@ -121,6 +142,9 @@ typedef struct session_worker_
/** Vector of nexts for the pending tx buffers */
u16 *pending_tx_nexts;
+ /** Clib file for timerfd. Used only if adaptive mode is on */
+ uword timerfd_file;
+
#if SESSION_DEBUG
/** last event poll time by thread */
clib_time_type_t last_event_poll;
@@ -177,6 +201,9 @@ typedef struct session_main_
/** Allocate private rx mqs for external apps */
u8 use_private_rx_mqs;
+ /** Do not enable session queue node adaptive mode */
+ u8 no_adaptive;
+
/** vpp fifo event queue configured length */
u32 configured_event_queue_length;
@@ -682,6 +709,8 @@ session_add_pending_tx_buffer (u32 thread_index, u32 bi, u32 next_node)
session_worker_t *wrk = session_main_get_worker (thread_index);
vec_add1 (wrk->pending_tx_buffers, bi);
vec_add1 (wrk->pending_tx_nexts, next_node);
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
always_inline void
@@ -691,6 +720,7 @@ session_wrk_update_time (session_worker_t *wrk, f64 now)
wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
}
+void session_wrk_enable_adaptive_mode (session_worker_t *wrk);
fifo_segment_t *session_main_get_evt_q_segment (void);
void session_node_enable_disable (u8 is_en);
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index f8157cc6214..d7adbb5fd06 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -24,6 +24,7 @@
#include <vnet/session/application_local.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+#include <sys/timerfd.h>
#define app_check_thread_and_barrier(_fn, _arg) \
if (!vlib_thread_is_main_w_barrier ()) \
@@ -1418,6 +1419,79 @@ session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
return n_to_dequeue;
}
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+ struct itimerspec its;
+
+ its.it_value.tv_sec = 0;
+ its.it_value.tv_nsec = time_ns;
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+ clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+ if (state == SESSION_WRK_INTERRUPT)
+ return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+ else if (state == SESSION_WRK_IDLE)
+ return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+ else
+ return 0;
+}
+
+static inline void
+session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state)
+{
+ u64 time_ns;
+
+ wrk->state = state;
+ time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+ session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+ vlib_main_t *vm = wrk->vm;
+
+ if (wrk->state == SESSION_WRK_POLLING)
+ {
+ if (pool_elts (wrk->event_elts) == 3 &&
+ vlib_last_vectors_per_main_loop (vm) < 1)
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_INTERRUPT);
+ }
+ }
+ else if (wrk->state == SESSION_WRK_INTERRUPT)
+ {
+ if (pool_elts (wrk->event_elts) > 3 ||
+ vlib_last_vectors_per_main_loop (vm) > 1)
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_POLLING);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_POLLING);
+ }
+ else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_IDLE);
+ }
+ }
+ else
+ {
+ if (pool_elts (wrk->event_elts))
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+ }
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -1513,6 +1587,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
+ if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+ session_wrk_update_state (wrk);
+
return n_tx_packets;
}
@@ -1531,6 +1608,46 @@ VLIB_REGISTER_NODE (session_queue_node) =
/* *INDENT-ON* */
static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+ session_worker_t *wrk = session_main_get_worker (cf->private_data);
+ u64 buf;
+ int rv;
+
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+ rv = read (wrk->timerfd, &buf, sizeof (buf));
+ if (rv < 0 && errno != EAGAIN)
+ clib_unix_warning ("failed");
+ return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+ return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+ u32 thread_index = wrk->vm->thread_index;
+ clib_file_t template = { 0 };
+
+ if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+ clib_warning ("timerfd_create");
+
+ template.read_function = session_wrk_tfd_read_ready;
+ template.write_function = session_wrk_tfd_write_ready;
+ template.file_descriptor = wrk->timerfd;
+ template.private_data = thread_index;
+ template.polling_thread_index = thread_index;
+ template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+ wrk->timerfd_file = clib_file_add (&file_main, &template);
+ wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
+static clib_error_t *
session_queue_exit (vlib_main_t * vm)
{
if (vlib_get_n_threads () < 2)