aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-03-18 15:04:34 -0700
committerDamjan Marion <dmarion@me.com>2021-04-06 11:15:38 +0000
commit7da8829d8152ef5105a57231fd1d91700e9b4f6c (patch)
tree56d4014ec34ec377ae06a08150a962d855c3a170 /src
parentd82349ecad23bfd80e9a518f595adf78f363c68d (diff)
session: basic support for interrupt mode
Experimental support for session layer interrupt mode. When enabled (use-private-rx-mqs must be set) session queue node switches to interrupt state when lightly loaded, i.e., no events and less than 1 vector/dispatch. Because transport protocols require a periodic time update, when in interrupt state the session queue node workers register a timerfd with the unix-epoll-input node that when triggered signals, i.e., wakes up, the queue node. Under light load, the timer is set to trigger every 1ms whereas if no session is allocated, the worker moves to idle state and the timeout is set to 100ms. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I905b00777fbc025faf9c4074fce4c516cd139387
Diffstat (limited to 'src')
-rw-r--r--src/vnet/session/application.c3
-rw-r--r--src/vnet/session/session.c30
-rw-r--r--src/vnet/session/session.h30
-rw-r--r--src/vnet/session/session_node.c117
4 files changed, 176 insertions, 4 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 16acc9c997d..a93e4b9dbbf 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -497,6 +497,9 @@ VLIB_NODE_FN (appsl_rx_mqs_input_node)
if (aw->pending_rx_mqs)
vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+ if (n_msgs && wrk->state == SESSION_WRK_INTERRUPT)
+ vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+
return n_msgs;
}
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index c24a95fd9a6..7513aa32ed8 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -28,11 +28,12 @@ static inline int
session_send_evt_to_thread (void *data, void *args, u32 thread_index,
session_evt_type_t evt_type)
{
+ session_worker_t *wrk = session_main_get_worker (thread_index);
session_event_t *evt;
svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
- mq = session_main_get_vpp_event_queue (thread_index);
+ mq = wrk->vpp_event_queue;
if (PREDICT_FALSE (svm_msg_q_lock (mq)))
return -1;
if (PREDICT_FALSE (svm_msg_q_is_full (mq)
@@ -72,6 +73,10 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
evt->event_type = evt_type;
svm_msg_q_add_and_unlock (mq, &msg);
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+
return 0;
}
@@ -121,19 +126,20 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
void
session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
{
- session_t *s;
+ session_t *s = session_get (tc->s_index, tc->thread_index);
- s = session_get (tc->s_index, tc->thread_index);
ASSERT (s->thread_index == vlib_get_thread_index ());
ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
+
if (!(s->flags & SESSION_F_CUSTOM_TX))
{
s->flags |= SESSION_F_CUSTOM_TX;
if (svm_fifo_set_event (s->tx_fifo)
|| transport_connection_is_descheduled (tc))
{
- session_worker_t *wrk;
session_evt_elt_t *elt;
+ session_worker_t *wrk;
+
wrk = session_main_get_worker (tc->thread_index);
if (has_prio)
elt = session_evt_alloc_new (wrk);
@@ -142,6 +148,10 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
elt->evt.session_index = tc->s_index;
elt->evt.event_type = SESSION_IO_EVT_TX;
tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm,
+ session_queue_node.index);
}
}
}
@@ -157,6 +167,9 @@ sesssion_reschedule_tx (transport_connection_t * tc)
elt = session_evt_alloc_new (wrk);
elt->evt.session_index = tc->s_index;
elt->evt.event_type = SESSION_IO_EVT_TX;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
static void
@@ -175,6 +188,9 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
clib_memset (&elt->evt, 0, sizeof (session_event_t));
elt->evt.session_handle = session_handle (s);
elt->evt.event_type = evt;
+
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
else
session_send_ctrl_evt_to_thread (s, evt);
@@ -1693,6 +1709,9 @@ session_manager_main_enable (vlib_main_t * vm)
if (num_threads > 1)
clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
+
+ if (!smm->no_adaptive && smm->use_private_rx_mqs)
+ session_wrk_enable_adaptive_mode (wrk);
}
/* Allocate vpp event queues segment and queue */
@@ -1817,6 +1836,7 @@ session_main_init (vlib_main_t * vm)
smm->session_enable_asap = 0;
smm->poll_main = 0;
smm->use_private_rx_mqs = 0;
+ smm->no_adaptive = 0;
smm->session_baseva = HIGH_SEGMENT_BASEVA;
#if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1938,6 +1958,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
smm->poll_main = 1;
else if (unformat (input, "use-private-rx-mqs"))
smm->use_private_rx_mqs = 1;
+ else if (unformat (input, "no-adaptive"))
+ smm->no_adaptive = 1;
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 55863163a89..93278d6c3e1 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -69,6 +69,18 @@ typedef struct session_ctrl_evt_data_
u8 data[SESSION_CTRL_MSG_MAX_SIZE];
} session_evt_ctrl_data_t;
+typedef enum session_wrk_state_
+{
+ SESSION_WRK_POLLING,
+ SESSION_WRK_INTERRUPT,
+ SESSION_WRK_IDLE,
+} __clib_packed session_wrk_state_t;
+
+typedef enum session_wrk_flags_
+{
+ SESSION_WRK_F_ADAPTIVE = 1 << 0,
+} __clib_packed session_wrk_flag_t;
+
typedef struct session_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -91,6 +103,15 @@ typedef struct session_worker_
/** Per-proto vector of sessions to enqueue */
u32 **session_to_enqueue;
+ /** Timerfd used to periodically signal wrk session queue node */
+ u32 timerfd;
+
+ /** Worker flags */
+ session_wrk_flag_t flags;
+
+ /** Worker state */
+ session_wrk_state_t state;
+
/** Context for session tx */
session_tx_context_t ctx;
@@ -121,6 +142,9 @@ typedef struct session_worker_
/** Vector of nexts for the pending tx buffers */
u16 *pending_tx_nexts;
+ /** Clib file for timerfd. Used only if adaptive mode is on */
+ uword timerfd_file;
+
#if SESSION_DEBUG
/** last event poll time by thread */
clib_time_type_t last_event_poll;
@@ -177,6 +201,9 @@ typedef struct session_main_
/** Allocate private rx mqs for external apps */
u8 use_private_rx_mqs;
+ /** Do not enable session queue node adaptive mode */
+ u8 no_adaptive;
+
/** vpp fifo event queue configured length */
u32 configured_event_queue_length;
@@ -682,6 +709,8 @@ session_add_pending_tx_buffer (u32 thread_index, u32 bi, u32 next_node)
session_worker_t *wrk = session_main_get_worker (thread_index);
vec_add1 (wrk->pending_tx_buffers, bi);
vec_add1 (wrk->pending_tx_nexts, next_node);
+ if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
}
always_inline void
@@ -691,6 +720,7 @@ session_wrk_update_time (session_worker_t *wrk, f64 now)
wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
}
+void session_wrk_enable_adaptive_mode (session_worker_t *wrk);
fifo_segment_t *session_main_get_evt_q_segment (void);
void session_node_enable_disable (u8 is_en);
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index f8157cc6214..d7adbb5fd06 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -24,6 +24,7 @@
#include <vnet/session/application_local.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+#include <sys/timerfd.h>
#define app_check_thread_and_barrier(_fn, _arg) \
if (!vlib_thread_is_main_w_barrier ()) \
@@ -1418,6 +1419,79 @@ session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
return n_to_dequeue;
}
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+ struct itimerspec its;
+
+ its.it_value.tv_sec = 0;
+ its.it_value.tv_nsec = time_ns;
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+ clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+ if (state == SESSION_WRK_INTERRUPT)
+ return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+ else if (state == SESSION_WRK_IDLE)
+ return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+ else
+ return 0;
+}
+
+static inline void
+session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state)
+{
+ u64 time_ns;
+
+ wrk->state = state;
+ time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+ session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+ vlib_main_t *vm = wrk->vm;
+
+ if (wrk->state == SESSION_WRK_POLLING)
+ {
+ if (pool_elts (wrk->event_elts) == 3 &&
+ vlib_last_vectors_per_main_loop (vm) < 1)
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_INTERRUPT);
+ }
+ }
+ else if (wrk->state == SESSION_WRK_INTERRUPT)
+ {
+ if (pool_elts (wrk->event_elts) > 3 ||
+ vlib_last_vectors_per_main_loop (vm) > 1)
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_POLLING);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_POLLING);
+ }
+ else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_IDLE);
+ }
+ }
+ else
+ {
+ if (pool_elts (wrk->event_elts))
+ {
+ session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+ }
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -1513,6 +1587,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
+ if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+ session_wrk_update_state (wrk);
+
return n_tx_packets;
}
@@ -1531,6 +1608,46 @@ VLIB_REGISTER_NODE (session_queue_node) =
/* *INDENT-ON* */
static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+ session_worker_t *wrk = session_main_get_worker (cf->private_data);
+ u64 buf;
+ int rv;
+
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+ rv = read (wrk->timerfd, &buf, sizeof (buf));
+ if (rv < 0 && errno != EAGAIN)
+ clib_unix_warning ("failed");
+ return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+ return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+ u32 thread_index = wrk->vm->thread_index;
+ clib_file_t template = { 0 };
+
+ if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+ clib_warning ("timerfd_create");
+
+ template.read_function = session_wrk_tfd_read_ready;
+ template.write_function = session_wrk_tfd_write_ready;
+ template.file_descriptor = wrk->timerfd;
+ template.private_data = thread_index;
+ template.polling_thread_index = thread_index;
+ template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+ wrk->timerfd_file = clib_file_add (&file_main, &template);
+ wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
+static clib_error_t *
session_queue_exit (vlib_main_t * vm)
{
if (vlib_get_n_threads () < 2)