summaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session_node.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session/session_node.c')
-rw-r--r--src/vnet/session/session_node.c157
1 files changed, 101 insertions, 56 deletions
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 2916c640c58..89c8ab0c891 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -26,12 +26,28 @@
#include <svm/queue.h>
#include <sys/timerfd.h>
-#define app_check_thread_and_barrier(_fn, _arg) \
- if (!vlib_thread_is_main_w_barrier ()) \
- { \
- vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg)); \
- return; \
- }
+static inline void
+session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
+{
+ session_evt_elt_t *he;
+ u32 thread_index;
+ u8 is_empty;
+
+ thread_index = wrk->vm->thread_index;
+ he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+ is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+ if (is_empty)
+ vlib_rpc_call_main_thread (session_wrk_handle_evts_main_rpc,
+ (u8 *) &thread_index, sizeof (thread_index));
+}
+
+#define app_check_thread_and_barrier(_wrk, _elt) \
+ if (!vlib_thread_is_main_w_barrier ()) \
+ { \
+ session_wrk_send_evt_to_main (wrk, elt); \
+ return; \
+ }
static void
session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
@@ -93,16 +109,17 @@ session_mq_free_ext_config (application_t *app, uword offset)
}
static void
-session_mq_listen_handler (void *data)
+session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_listen_msg_t *mp = (session_listen_msg_t *) data;
vnet_listen_args_t _a, *a = &_a;
+ session_listen_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_listen_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
@@ -132,16 +149,17 @@ session_mq_listen_handler (void *data)
}
static void
-session_mq_listen_uri_handler (void *data)
+session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
vnet_listen_args_t _a, *a = &_a;
+ session_listen_uri_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
@@ -312,16 +330,17 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
}
static void
-session_mq_connect_uri_handler (void *data)
+session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
vnet_connect_args_t _a, *a = &_a;
+ session_connect_uri_msg_t *mp;
app_worker_t *app_wrk;
application_t *app;
int rv;
- app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
@@ -371,14 +390,15 @@ session_mq_disconnect_handler (void *data)
}
static void
-app_mq_detach_handler (void *data)
+app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
vnet_app_detach_args_t _a, *a = &_a;
+ session_app_detach_msg_t *mp;
application_t *app;
- app_check_thread_and_barrier (app_mq_detach_handler, mp);
+ app_check_thread_and_barrier (wrk, elt);
+ mp = session_evt_ctrl_data (wrk, elt);
app = application_lookup (mp->client_index);
if (!app)
return;
@@ -389,18 +409,19 @@ app_mq_detach_handler (void *data)
}
static void
-session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
+session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
- vlib_main_t *vm = vlib_get_main ();
vnet_unlisten_args_t _a, *a = &_a;
+ session_unlisten_msg_t *mp;
app_worker_t *app_wrk;
session_handle_t sh;
application_t *app;
- u32 context;
int rv;
+ app_check_thread_and_barrier (wrk, elt);
+
+ mp = session_evt_ctrl_data (wrk, elt);
sh = mp->handle;
- context = mp->context;
app = application_lookup (mp->client_index);
if (!app)
@@ -411,56 +432,34 @@ session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
a->handle = sh;
a->wrk_map_index = mp->wrk_index;
- vlib_worker_thread_barrier_sync (vm);
-
if ((rv = vnet_unlisten (a)))
clib_warning ("unlisten returned: %d", rv);
- vlib_worker_thread_barrier_release (vm);
-
app_wrk = application_get_worker (app, a->wrk_map_index);
if (!app_wrk)
return;
- mq_send_unlisten_reply (app_wrk, sh, context, rv);
- clib_mem_free (mp);
-}
-
-static void
-session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
-{
- u32 thread_index = wrk - session_main.wrk;
- session_unlisten_msg_t *mp, *arg;
-
- mp = session_evt_ctrl_data (wrk, elt);
- arg = clib_mem_alloc (sizeof (session_unlisten_msg_t));
- clib_memcpy_fast (arg, mp, sizeof (*arg));
-
- if (PREDICT_FALSE (!thread_index))
- {
- session_mq_unlisten_rpc (arg);
- return;
- }
-
- session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg);
+ mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
}
static void
-session_mq_accepted_reply_handler (void *data)
+session_mq_accepted_reply_handler (session_worker_t *wrk,
+ session_evt_elt_t *elt)
{
- session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ session_accepted_reply_msg_t *mp;
session_state_t old_state;
app_worker_t *app_wrk;
session_t *s;
+ mp = session_evt_ctrl_data (wrk, elt);
+
/* Mail this back from the main thread. We're not polling in main
* thread so we're using other workers for notifications. */
if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
vlib_get_thread_index () != 0)
{
- vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
- (u8 *) mp, sizeof (*mp));
+ session_wrk_send_evt_to_main (wrk, elt);
return;
}
@@ -774,6 +773,52 @@ session_mq_transport_attr_handler (void *data)
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
}
+void
+session_wrk_handle_evts_main_rpc (void *args)
+{
+ session_evt_elt_t *he, *elt, *next;
+ session_worker_t *fwrk;
+ u32 thread_index;
+
+ ASSERT (vlib_thread_is_main_w_barrier ());
+ thread_index = *(u32 *) args;
+ fwrk = session_main_get_worker (thread_index);
+
+ he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
+ elt = clib_llist_next (fwrk->event_elts, evt_list, he);
+
+ while (elt != he)
+ {
+ next = clib_llist_next (fwrk->event_elts, evt_list, elt);
+ clib_llist_remove (fwrk->event_elts, evt_list, elt);
+ switch (elt->evt.event_type)
+ {
+ case SESSION_CTRL_EVT_LISTEN:
+ session_mq_listen_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN:
+ session_mq_unlisten_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_APP_DETACH:
+ app_mq_detach_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_CONNECT_URI:
+ session_mq_connect_uri_handler (fwrk, elt);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (fwrk, elt);
+ break;
+ default:
+ clib_warning ("unhandled %u", elt->evt.event_type);
+ ALWAYS_ASSERT (0);
+ break;
+ }
+ session_evt_ctrl_data_free (fwrk, elt);
+ clib_llist_put (fwrk->event_elts, elt);
+ elt = next;
+ }
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
@@ -1514,10 +1559,10 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
session_transport_reset (s);
break;
case SESSION_CTRL_EVT_LISTEN:
- session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_listen_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_LISTEN_URI:
- session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_listen_uri_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_UNLISTEN:
session_mq_unlisten_handler (wrk, elt);
@@ -1526,7 +1571,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
session_mq_connect_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_CONNECT_URI:
- session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_connect_uri_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_SHUTDOWN:
session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
@@ -1538,7 +1583,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
break;
case SESSION_CTRL_EVT_ACCEPTED_REPLY:
- session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+ session_mq_accepted_reply_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
@@ -1551,7 +1596,7 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
break;
case SESSION_CTRL_EVT_APP_DETACH:
- app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+ app_mq_detach_handler (wrk, elt);
break;
case SESSION_CTRL_EVT_APP_WRK_RPC:
session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));