summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/vlib/main.c4
-rw-r--r--src/vnet/sctp/sctp_output.c2
-rw-r--r--src/vnet/session/session.c30
-rw-r--r--src/vnet/session/session.h6
-rw-r--r--src/vnet/session/session_node.c45
-rw-r--r--src/vnet/tcp/tcp_output.c2
6 files changed, 88 insertions, 1 deletions
diff --git a/src/vlib/main.c b/src/vlib/main.c
index d1f7592e502..7da519241bb 100644
--- a/src/vlib/main.c
+++ b/src/vlib/main.c
@@ -1276,6 +1276,7 @@ dispatch_process (vlib_main_t * vm,
vlib_node_main_t *nm = &vm->node_main;
vlib_node_runtime_t *node_runtime = &p->node_runtime;
vlib_node_t *node = vlib_get_node (vm, node_runtime->node_index);
+ u32 old_process_index;
u64 t;
uword n_vectors, is_suspend;
@@ -1291,11 +1292,12 @@ dispatch_process (vlib_main_t * vm,
f ? f->n_vectors : 0, /* is_after */ 0);
/* Save away current process for suspend. */
+ old_process_index = nm->current_process_index;
nm->current_process_index = node->runtime_index;
n_vectors = vlib_process_startup (vm, p, f);
- nm->current_process_index = ~0;
+ nm->current_process_index = old_process_index;
ASSERT (n_vectors != VLIB_PROCESS_RETURN_LONGJMP_RETURN);
is_suspend = n_vectors == VLIB_PROCESS_RETURN_LONGJMP_SUSPEND;
diff --git a/src/vnet/sctp/sctp_output.c b/src/vnet/sctp/sctp_output.c
index a65f166a9c6..3c83f68dc29 100644
--- a/src/vnet/sctp/sctp_output.c
+++ b/src/vnet/sctp/sctp_output.c
@@ -406,6 +406,8 @@ sctp_enqueue_to_ip_lookup (vlib_main_t * vm, vlib_buffer_t * b, u32 bi,
u8 is_ip4, u32 fib_index)
{
sctp_enqueue_to_ip_lookup_i (vm, b, bi, is_ip4, fib_index, 0);
+ if (vm->thread_index == 0 && vlib_num_workers ())
+ session_flush_frames_main_thread (vm);
}
/**
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 66cad2acff3..2697c26381e 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1272,6 +1272,14 @@ listen_session_get_local_session_endpoint (stream_session_t * listener,
return 0;
}
+void
+session_flush_frames_main_thread (vlib_main_t * vm)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event_mt (vm, session_queue_process_node.index,
+ SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
@@ -1366,8 +1374,30 @@ void
session_node_enable_disable (u8 is_en)
{
u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u8 have_workers = vtm->n_threads != 0;
+
/* *INDENT-OFF* */
foreach_vlib_main (({
+ if (have_workers && ii == 0)
+ {
+ vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
+ state);
+ if (is_en)
+ {
+ vlib_node_t *n = vlib_get_node (this_vlib_main,
+ session_queue_process_node.index);
+ vlib_start_process (this_vlib_main, n->runtime_index);
+ }
+ else
+ {
+ vlib_process_signal_event_mt (this_vlib_main,
+ session_queue_process_node.index,
+ SESSION_Q_PROCESS_STOP, 0);
+ }
+
+ continue;
+ }
vlib_node_set_state (this_vlib_main, session_queue_node.index,
state);
}));
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index d6cc2cb3327..b24e22923f1 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -254,6 +254,10 @@ struct _session_manager_main
extern session_manager_main_t session_manager_main;
extern vlib_node_registration_t session_queue_node;
+extern vlib_node_registration_t session_queue_process_node;
+
+#define SESSION_Q_PROCESS_FLUSH_FRAMES 1
+#define SESSION_Q_PROCESS_STOP 2
/*
* Session manager function
@@ -594,6 +598,8 @@ int
listen_session_get_local_session_endpoint (stream_session_t * listener,
session_endpoint_t * sep);
+void session_flush_frames_main_thread (vlib_main_t * vm);
+
always_inline u8
session_manager_is_enabled ()
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 07cca6d46e7..46fc4dc8745 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -869,6 +869,51 @@ session_queue_exit (vlib_main_t * vm)
VLIB_MAIN_LOOP_EXIT_FUNCTION (session_queue_exit);
+static uword
+session_queue_process (vlib_main_t * vm, vlib_node_runtime_t * rt,
+ vlib_frame_t * f)
+{
+ f64 now, timeout = 1.0;
+ uword *event_data = 0;
+ uword event_type;
+
+ while (1)
+ {
+ vlib_process_wait_for_event_or_clock (vm, timeout);
+ now = vlib_time_now (vm);
+ event_type = vlib_process_get_events (vm, (uword **) & event_data);
+
+ switch (event_type)
+ {
+ case SESSION_Q_PROCESS_FLUSH_FRAMES:
+ /* Flush the frames by updating all transports times */
+ transport_update_time (now, 0);
+ break;
+ case SESSION_Q_PROCESS_STOP:
+ timeout = 100000.0;
+ break;
+ case ~0:
+ /* Timed out. Update time for all transports to trigger all
+ * outstanding retransmits. */
+ transport_update_time (now, 0);
+ break;
+ }
+ vec_reset_length (event_data);
+ }
+ return 0;
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_process_node) =
+{
+ .function = session_queue_process,
+ .type = VLIB_NODE_TYPE_PROCESS,
+ .name = "session-queue-process",
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index fe6d09c2c0e..0d938cae372 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -675,6 +675,8 @@ tcp_enqueue_to_ip_lookup (vlib_main_t * vm, vlib_buffer_t * b, u32 bi,
u8 is_ip4, u32 fib_index)
{
tcp_enqueue_to_ip_lookup_i (vm, b, bi, is_ip4, fib_index, 0);
+ if (vm->thread_index == 0 && vlib_num_workers ())
+ session_flush_frames_main_thread (vm);
}
always_inline void