summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2022-03-18 08:33:08 -0700
committerDave Barach <vpp@barachs.net>2022-12-02 22:59:13 +0000
commit309f7aac170767028a2e6e7e9424ec3d13304aff (patch)
tree84b4e26408e994f196c4e0029709c806be5bfef6
parent06bbab0c45c805544c981b8765ea3d85760d66a8 (diff)
session: move connects to first worker
Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978
-rw-r--r--src/plugins/hs_apps/echo_client.c8
-rw-r--r--src/plugins/unittest/session_test.c8
-rw-r--r--src/vnet/session/application.c2
-rw-r--r--src/vnet/session/application_worker.c4
-rw-r--r--src/vnet/session/session.c6
-rw-r--r--src/vnet/session/session.h32
-rw-r--r--src/vnet/session/session_node.c91
-rw-r--r--src/vnet/session/transport.c3
-rw-r--r--src/vnet/tcp/tcp.c6
-rw-r--r--src/vnet/tcp/tcp_inlines.h2
-rw-r--r--src/vnet/tcp/tcp_timer.h15
-rw-r--r--src/vnet/tls/tls.c1
12 files changed, 72 insertions, 106 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index c6c7ede2f92..14d47be7422 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -826,7 +826,6 @@ ec_connect_rpc (void *args)
{
ec_main_t *ecm = &ec_main;
vnet_connect_args_t _a = {}, *a = &_a;
- vlib_main_t *vm = vlib_get_main ();
int rv, needs_crypto;
u32 n_clients, ci;
@@ -838,8 +837,6 @@ ec_connect_rpc (void *args)
ci = ecm->connect_conn_index;
- vlib_worker_thread_barrier_sync (vm);
-
while (ci < n_clients)
{
/* Crude pacing for call setups */
@@ -873,8 +870,6 @@ ec_connect_rpc (void *args)
ci += 1;
}
- vlib_worker_thread_barrier_release (vm);
-
if (ci < ecm->expected_connections && ecm->run_test != EC_EXITING)
ec_program_connects ();
@@ -884,7 +879,8 @@ ec_connect_rpc (void *args)
void
ec_program_connects (void)
{
- session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (), ec_connect_rpc,
+ 0);
}
#define ec_cli(_fmt, _args...) \
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index 6a292c783eb..70d9fd02048 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -404,14 +404,6 @@ session_test_endpoint_cfg (vlib_main_t * vm, unformat_input_t * input)
SESSION_TEST ((tc->lcl_port == placeholder_client_port),
"ports should be equal");
- /* These sessions, because of the way they're established are pinned to
- * main thread, even when we have workers and we avoid polling main thread,
- * i.e., we can't cleanup pending disconnects, so force cleanup for both
- */
- session_transport_cleanup (s);
- s = session_get (accepted_session_index, accepted_session_thread);
- session_transport_cleanup (s);
-
vnet_app_detach_args_t detach_args = {
.app_index = server_index,
.api_client_index = ~0,
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 3b2c7cdb35a..ad4d447c543 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -1357,7 +1357,7 @@ vnet_connect (vnet_connect_args_t * a)
app_worker_t *client_wrk;
application_t *client;
- ASSERT (vlib_thread_is_main_w_barrier ());
+ ASSERT (session_vlib_thread_is_cl_thread ());
if (session_endpoint_is_zero (&a->sep))
return SESSION_E_INVALID_RMT_IP;
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index 844e78f7fa9..0cb179161b8 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -393,7 +393,7 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
{
session_handle_t *shp;
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
pool_get (app_wrk->half_open_table, shp);
*shp = sh;
@@ -404,7 +404,7 @@ int
app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
{
application_t *app = application_get (app_wrk->app_index);
- ASSERT (vlib_get_thread_index () <= 1);
+ ASSERT (session_vlib_thread_is_cl_thread ());
pool_put_index (app_wrk->half_open_table, s->ho_index);
if (app->cb_fns.half_open_cleanup_callback)
app->cb_fns.half_open_cleanup_callback (s);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 91e9ed5451d..eaba80f24e2 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -334,15 +334,15 @@ void
session_half_open_delete_notify (transport_connection_t *tc)
{
/* Notification from ctrl thread accepted without rpc */
- if (!tc->thread_index)
+ if (tc->thread_index == transport_cl_thread ())
{
session_half_open_free (ho_session_get (tc->s_index));
}
else
{
void *args = uword_to_pointer ((uword) tc->s_index, void *);
- session_send_rpc_evt_to_thread_force (0, session_half_open_free_rpc,
- args);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ session_half_open_free_rpc, args);
}
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 16000e62fa8..b8cc1c383c9 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -157,9 +157,6 @@ typedef struct session_worker_
/** Flag that is set if main thread signaled to handle connects */
u32 n_pending_connects;
- /** Main thread loops in poll mode without a connect */
- u32 no_connect_loops;
-
/** List head for first worker evts pending handling on main */
clib_llist_index_t evts_pending_main;
@@ -212,7 +209,9 @@ typedef struct session_main_
* Trade memory for speed, for now */
u32 *session_type_to_next;
- /** Thread for cl and ho that rely on cl allocs */
+ /** Thread used for allocating active open connections, i.e., half-opens
+ * for transports like tcp, and sessions that will be migrated for cl
+ * transports like udp. If vpp has workers, this will be first worker. */
u32 transport_cl_thread;
transport_proto_t last_transport_proto_type;
@@ -616,6 +615,13 @@ transport_cl_thread (void)
return session_main.transport_cl_thread;
}
+always_inline u32
+session_vlib_thread_is_cl_thread (void)
+{
+ return (vlib_get_thread_index () == transport_cl_thread () ||
+ vlib_thread_is_main_w_barrier ());
+}
+
/*
* Listen sessions
*/
@@ -668,29 +674,17 @@ always_inline session_t *
ho_session_alloc (void)
{
session_t *s;
- ASSERT (vlib_get_thread_index () == 0);
- s = session_alloc (0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
+ s = session_alloc (transport_cl_thread ());
s->session_state = SESSION_STATE_CONNECTING;
s->flags |= SESSION_F_HALF_OPEN;
- /* Not ideal. Half-opens are only allocated from main with worker barrier
- * but can be cleaned up, i.e., session_half_open_free, from main without
- * a barrier. In debug images, the free_bitmap can grow while workers peek
- * the sessions pool, e.g., session_half_open_migrate_notify, and as a
- * result crash while validating the session. To avoid this, grow the bitmap
- * now. */
- if (CLIB_DEBUG)
- {
- session_t *sp = session_main.wrk[0].sessions;
- clib_bitmap_validate (pool_header (sp)->free_bitmap,
- s->session_index + 1);
- }
return s;
}
always_inline session_t *
ho_session_get (u32 ho_index)
{
- return session_get (ho_index, 0 /* half-open thread */);
+ return session_get (ho_index, transport_cl_thread ());
}
always_inline void
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 8f6503d2806..be00925bb00 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -224,23 +224,20 @@ static void
session_mq_handle_connects_rpc (void *arg)
{
u32 max_connects = 32, n_connects = 0;
- vlib_main_t *vm = vlib_get_main ();
session_evt_elt_t *he, *elt, *next;
- session_worker_t *fwrk, *wrk;
+ session_worker_t *fwrk;
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (session_vlib_thread_is_cl_thread ());
/* Pending connects on linked list pertaining to first worker */
- fwrk = session_main_get_worker (1);
+ fwrk = session_main_get_worker (transport_cl_thread ());
if (!fwrk->n_pending_connects)
- goto update_state;
-
- vlib_worker_thread_barrier_sync (vm);
+ return;
he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
elt = clib_llist_next (fwrk->event_elts, evt_list, he);
- /* Avoid holding the barrier for too long */
+ /* Avoid holding the worker for too long */
while (n_connects < max_connects && elt != he)
{
next = clib_llist_next (fwrk->event_elts, evt_list, elt);
@@ -254,45 +251,10 @@ session_mq_handle_connects_rpc (void *arg)
/* Decrement with worker barrier */
fwrk->n_pending_connects -= n_connects;
-
- vlib_worker_thread_barrier_release (vm);
-
-update_state:
-
- /* Switch worker to poll mode if it was in interrupt mode and had work or
- * back to interrupt if threshold of loops without a connect is passed.
- * While in poll mode, reprogram connects rpc */
- wrk = session_main_get_worker (0);
- if (wrk->state != SESSION_WRK_POLLING)
- {
- if (n_connects)
- {
- session_wrk_set_state (wrk, SESSION_WRK_POLLING);
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_POLLING);
- wrk->no_connect_loops = 0;
- }
- }
- else
- {
- if (!n_connects)
- {
- if (++wrk->no_connect_loops > 1e5)
- {
- session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_INTERRUPT);
- }
- }
- else
- wrk->no_connect_loops = 0;
- }
-
- if (wrk->state == SESSION_WRK_POLLING)
+ if (fwrk->n_pending_connects > 0)
{
- elt = session_evt_alloc_ctrl (wrk);
- elt->evt.event_type = SESSION_CTRL_EVT_RPC;
- elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+ session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
+ session_mq_handle_connects_rpc, 0);
}
}
@@ -302,20 +264,28 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
u32 thread_index = wrk - session_main.wrk;
session_evt_elt_t *he;
- /* No workers, so just deal with the connect now */
- if (PREDICT_FALSE (!thread_index))
+ if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
{
- session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
+ clib_warning ("Connect on wrong thread. Dropping");
return;
}
- if (PREDICT_FALSE (thread_index != 1))
+ /* If on worker, check if main has any pending messages. Avoids reordering
+ * with other control messages that need to be handled by main
+ */
+ if (thread_index)
{
- clib_warning ("Connect on wrong thread. Dropping");
- return;
+ he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+
+ /* Events pending on main, postpone to avoid reordering */
+ if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
+ {
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+ return;
+ }
}
- /* Add to pending list to be handled by main thread */
+ /* Add to pending list to be handled by first worker */
he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
@@ -323,9 +293,8 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
wrk->n_pending_connects += 1;
if (wrk->n_pending_connects == 1)
{
- vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
- session_queue_node.index);
- session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+ session_send_rpc_evt_to_thread_force (thread_index,
+ session_mq_handle_connects_rpc, 0);
}
}
@@ -812,6 +781,9 @@ session_wrk_handle_evts_main_rpc (void *args)
case SESSION_CTRL_EVT_ACCEPTED_REPLY:
session_mq_accepted_reply_handler (fwrk, elt);
break;
+ case SESSION_CTRL_EVT_CONNECT:
+ session_mq_connect_handler (fwrk, elt);
+ break;
default:
clib_warning ("unhandled %u", elt->evt.event_type);
ALWAYS_ASSERT (0);
@@ -820,8 +792,11 @@ session_wrk_handle_evts_main_rpc (void *args)
/* Regrab element in case pool moved */
elt = clib_llist_elt (fwrk->event_elts, ei);
- session_evt_ctrl_data_free (fwrk, elt);
- clib_llist_put (fwrk->event_elts, elt);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ {
+ session_evt_ctrl_data_free (fwrk, elt);
+ clib_llist_put (fwrk->event_elts, elt);
+ }
ei = next_ei;
}
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index b13370bcffd..8554c730bda 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -486,7 +486,8 @@ transport_program_endpoint_cleanup (u32 lepi)
clib_spinlock_unlock (&tm->local_endpoints_lock);
if (flush_fl)
- session_send_rpc_evt_to_thread_force (0, transport_cleanup_freelist, 0);
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+ transport_cleanup_freelist, 0);
}
int
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index bdf1751ff14..c0445b9048b 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -188,8 +188,7 @@ tcp_session_get_listener (u32 listener_index)
static tcp_connection_t *
tcp_half_open_connection_alloc (void)
{
- ASSERT (vlib_get_thread_index () == 0);
- return tcp_connection_alloc (0);
+ return tcp_connection_alloc (transport_cl_thread ());
}
/**
@@ -199,7 +198,8 @@ tcp_half_open_connection_alloc (void)
static void
tcp_half_open_connection_free (tcp_connection_t * tc)
{
- ASSERT (vlib_get_thread_index () == 0);
+ ASSERT (vlib_get_thread_index () == tc->c_thread_index ||
+ vlib_thread_is_main_w_barrier ());
return tcp_connection_free (tc);
}
diff --git a/src/vnet/tcp/tcp_inlines.h b/src/vnet/tcp/tcp_inlines.h
index 69f8ce7ff27..91c57577067 100644
--- a/src/vnet/tcp/tcp_inlines.h
+++ b/src/vnet/tcp/tcp_inlines.h
@@ -66,7 +66,7 @@ tcp_listener_get (u32 tli)
always_inline tcp_connection_t *
tcp_half_open_connection_get (u32 conn_index)
{
- return tcp_connection_get (conn_index, 0);
+ return tcp_connection_get (conn_index, transport_cl_thread ());
}
/**
diff --git a/src/vnet/tcp/tcp_timer.h b/src/vnet/tcp/tcp_timer.h
index 7f7dbf193eb..c0907cae1cc 100644
--- a/src/vnet/tcp/tcp_timer.h
+++ b/src/vnet/tcp/tcp_timer.h
@@ -17,11 +17,18 @@
#include <vnet/tcp/tcp_types.h>
+static inline u8
+tcp_timer_thread_is_valid (tcp_connection_t *tc)
+{
+ return ((tc->c_thread_index == vlib_get_thread_index ()) ||
+ vlib_thread_is_main_w_barrier ());
+}
+
always_inline void
-tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
+tcp_timer_set (tcp_timer_wheel_t *tw, tcp_connection_t *tc, u8 timer_id,
u32 interval)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
ASSERT (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID);
tc->timers[timer_id] = tw_timer_start_tcp_twsl (tw, tc->c_c_index,
timer_id, interval);
@@ -30,7 +37,7 @@ tcp_timer_set (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
always_inline void
tcp_timer_reset (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
tc->pending_timers &= ~(1 << timer_id);
if (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID)
return;
@@ -43,7 +50,7 @@ always_inline void
tcp_timer_update (tcp_timer_wheel_t * tw, tcp_connection_t * tc, u8 timer_id,
u32 interval)
{
- ASSERT (tc->c_thread_index == vlib_get_thread_index ());
+ ASSERT (tcp_timer_thread_is_valid (tc));
if (tc->timers[timer_id] != TCP_TIMER_HANDLE_INVALID)
tw_timer_update_tcp_twsl (tw, tc->timers[timer_id], interval);
else
diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c
index 85ac7f8022c..b082467c95f 100644
--- a/src/vnet/tls/tls.c
+++ b/src/vnet/tls/tls.c
@@ -121,6 +121,7 @@ tls_ctx_half_open_alloc (void)
clib_memset (ctx, 0, sizeof (*ctx));
ctx->c_c_index = ctx - tm->half_open_ctx_pool;
+ ctx->c_thread_index = transport_cl_thread ();
return ctx->c_c_index;
}