summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/vnet/session/session.c1
-rw-r--r--src/vnet/session/session.h6
-rw-r--r--src/vnet/session/session_node.c86
3 files changed, 88 insertions, 5 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index f6b61abc2e0..f38db777ae3 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1818,6 +1818,7 @@ session_manager_main_enable (vlib_main_t * vm)
wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
+ wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
wrk->vm = vlib_get_main_by_index (i);
wrk->last_vlib_time = vlib_time_now (vm);
wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index bf326811b24..245ec25f135 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -145,6 +145,12 @@ typedef struct session_worker_
/** Clib file for timerfd. Used only if adaptive mode is on */
uword timerfd_file;
+ /** List of pending connects for first worker */
+ clib_llist_index_t pending_connects;
+
+ /** Flag that is set if main thread signaled to handle connects */
+ u32 pending_connects_ntf;
+
#if SESSION_DEBUG
/** last event poll time by thread */
clib_time_type_t last_event_poll;
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index b68ff53dd7a..bd60bd7056b 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -119,16 +119,13 @@ session_mq_listen_uri_handler (void *data)
}
static void
-session_mq_connect_handler (void *data)
+session_mq_connect_one (session_connect_msg_t *mp)
{
- session_connect_msg_t *mp = (session_connect_msg_t *) data;
vnet_connect_args_t _a, *a = &_a;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_connect_handler, mp);
-
app = application_lookup (mp->client_index);
if (!app)
return;
@@ -168,6 +165,85 @@ session_mq_connect_handler (void *data)
}
static void
+session_mq_handle_connects_rpc (void *arg)
+{
+ u32 max_connects = 32, n_connects = 0;
+ vlib_main_t *vm = vlib_get_main ();
+ session_evt_elt_t *he, *elt, *next;
+ session_worker_t *fwrk;
+ u8 need_reschedule = 1;
+
+ ASSERT (vlib_get_thread_index () == 0);
+
+ /* Pending connects on linked list pertaining to first worker */
+ fwrk = session_main_get_worker (1);
+
+ vlib_worker_thread_barrier_sync (vm);
+
+ he = pool_elt_at_index (fwrk->event_elts, fwrk->pending_connects);
+ elt = clib_llist_next (fwrk->event_elts, evt_list, he);
+
+ /* Avoid holding the barrier for too long */
+ while (n_connects < max_connects && elt != he)
+ {
+ next = clib_llist_next (fwrk->event_elts, evt_list, elt);
+ clib_llist_remove (fwrk->event_elts, evt_list, elt);
+ session_mq_connect_one (session_evt_ctrl_data (fwrk, elt));
+ session_evt_elt_free (fwrk, elt);
+ elt = next;
+ n_connects += 1;
+ }
+
+ if (clib_llist_is_empty (fwrk->event_elts, evt_list, he))
+ {
+ fwrk->pending_connects_ntf = 0;
+ need_reschedule = 0;
+ }
+
+ vlib_worker_thread_barrier_release (vm);
+
+ if (need_reschedule)
+ {
+ vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+ elt = session_evt_alloc_ctrl (session_main_get_worker (0));
+ elt->evt.event_type = SESSION_CTRL_EVT_RPC;
+ elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+ }
+}
+
+static void
+session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
+{
+ u32 thread_index = wrk - session_main.wrk;
+ session_evt_elt_t *he;
+
+ /* No workers, so just deal with the connect now */
+ if (PREDICT_FALSE (!thread_index))
+ {
+ session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
+ return;
+ }
+
+ if (PREDICT_FALSE (thread_index != 1))
+ {
+ clib_warning ("Connect on wrong thread. Dropping");
+ return;
+ }
+
+ /* Add to pending list to be handled by main thread */
+ he = pool_elt_at_index (wrk->event_elts, wrk->pending_connects);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+
+ if (!wrk->pending_connects_ntf)
+ {
+ vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
+ session_queue_node.index);
+ session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+ wrk->pending_connects_ntf = 1;
+ }
+}
+
+static void
session_mq_connect_uri_handler (void *data)
{
session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
@@ -1331,7 +1407,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
break;
case SESSION_CTRL_EVT_CONNECT:
- session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_connect_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_CONNECT_URI:
session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));