aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session/application.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-01-15 13:49:33 -0800
committerDave Barach <openvpp@barachs.net>2021-03-29 20:20:03 +0000
commit41d5f541d37dc564565b3b29eb370b65bb5a9036 (patch)
tree49c80b5c140c0693c37a037ef513c62d92c74a7e /src/vnet/session/application.c
parenta840db21e8cce5f27f2a41bd245d59e6aeb8a932 (diff)
svm session vcl: per app rx message queues
Add option to use per app private segments for app to vpp message queues, as opposed to exposing internal message queues segment. When so configured, internal message queues are still polled by the session queue node but external app message queues are handled by a new input node (appsl-rx-mqs-input) that runs in interrupt state. Signaling of the node, when mqs receive new messages, is done through eventfds epolled by worker epoll input nodes. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Iffe8ce5a9944a56a14e6d0f492a850cb9e392d16
Diffstat (limited to 'src/vnet/session/application.c')
-rw-r--r--src/vnet/session/application.c294
1 files changed, 291 insertions, 3 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index eb8a7169d8a..b055ab49f3c 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -412,6 +412,262 @@ application_lookup_name (const u8 * name)
return 0;
}
+void
+appsl_pending_rx_mqs_add_tail (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ app_rx_mq_elt_t *head;
+
+ if (!aw->pending_rx_mqs)
+ {
+ elt->next = elt->prev = elt;
+ aw->pending_rx_mqs = elt;
+ return;
+ }
+
+ head = aw->pending_rx_mqs;
+
+ ASSERT (head != elt);
+
+ elt->prev = head->prev;
+ elt->next = head;
+
+ head->prev->next = elt;
+ head->prev = elt;
+}
+
+void
+appsl_pending_rx_mqs_del (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ if (elt->next == elt)
+ {
+ elt->next = elt->prev = 0;
+ aw->pending_rx_mqs = 0;
+ return;
+ }
+
+ if (elt == aw->pending_rx_mqs)
+ aw->pending_rx_mqs = elt->next;
+
+ elt->next->prev = elt->prev;
+ elt->prev->next = elt->next;
+ elt->next = elt->prev = 0;
+}
+
+vlib_node_registration_t appsl_rx_mqs_input_node;
+
+VLIB_NODE_FN (appsl_rx_mqs_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 thread_index = vm->thread_index, n_msgs = 0;
+ app_rx_mq_elt_t *elt, *next;
+ app_main_t *am = &app_main;
+ session_worker_t *wrk;
+ int __clib_unused rv;
+ appsl_wrk_t *aw;
+ u64 buf;
+
+ aw = &am->wrk[thread_index];
+ elt = aw->pending_rx_mqs;
+ if (!elt)
+ return 0;
+
+ wrk = session_main_get_worker (thread_index);
+
+ do
+ {
+ if (!(elt->flags & APP_RX_MQ_F_POSTPONED))
+ rv = read (svm_msg_q_get_eventfd (elt->mq), &buf, sizeof (buf));
+ n_msgs += session_wrk_handle_mq (wrk, elt->mq);
+
+ next = elt->next;
+ appsl_pending_rx_mqs_del (aw, elt);
+ if (!svm_msg_q_is_empty (elt->mq))
+ {
+ elt->flags |= APP_RX_MQ_F_POSTPONED;
+ appsl_pending_rx_mqs_add_tail (aw, elt);
+ }
+ else
+ {
+ elt->flags = 0;
+ }
+ elt = next;
+ }
+ while (aw->pending_rx_mqs && elt != aw->pending_rx_mqs);
+
+ if (aw->pending_rx_mqs)
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return n_msgs;
+}
+
+VLIB_REGISTER_NODE (appsl_rx_mqs_input_node) = {
+ .name = "appsl-rx-mqs-input",
+ .type = VLIB_NODE_TYPE_INPUT,
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+
+static clib_error_t *
+app_rx_mq_fd_read_ready (clib_file_t *cf)
+{
+ app_rx_mq_handle_t *handle = (app_rx_mq_handle_t *) &cf->private_data;
+ vlib_main_t *vm = vlib_get_main ();
+ app_main_t *am = &app_main;
+ app_rx_mq_elt_t *mqe;
+ application_t *app;
+ appsl_wrk_t *aw;
+
+ ASSERT (vlib_get_thread_index () == handle->thread_index);
+ app = application_get_if_valid (handle->app_index);
+ if (!app)
+ return 0;
+
+ mqe = &app->rx_mqs[handle->thread_index];
+ if ((mqe->flags & APP_RX_MQ_F_PENDING) || svm_msg_q_is_empty (mqe->mq))
+ return 0;
+
+ aw = &am->wrk[handle->thread_index];
+ appsl_pending_rx_mqs_add_tail (aw, mqe);
+ mqe->flags |= APP_RX_MQ_F_PENDING;
+
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return 0;
+}
+
+static clib_error_t *
+app_rx_mq_fd_write_ready (clib_file_t *cf)
+{
+ clib_warning ("should not be called");
+ return 0;
+}
+
+static void
+app_rx_mqs_epoll_add (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ clib_file_t template = { 0 };
+ app_rx_mq_handle_t handle;
+ u32 thread_index;
+ int fd;
+
+ thread_index = mqe - app->rx_mqs;
+ fd = svm_msg_q_get_eventfd (mqe->mq);
+
+ handle.app_index = app->app_index;
+ handle.thread_index = thread_index;
+
+ template.read_function = app_rx_mq_fd_read_ready;
+ template.write_function = app_rx_mq_fd_write_ready;
+ template.file_descriptor = fd;
+ template.private_data = handle.as_u64;
+ template.polling_thread_index = thread_index;
+ template.description =
+ format (0, "app-%u-rx-mq-%u", app->app_index, thread_index);
+ mqe->file_index = clib_file_add (&file_main, &template);
+}
+
+static void
+app_rx_mqs_epoll_del (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ u32 thread_index = mqe - app->rx_mqs;
+ app_main_t *am = &app_main;
+ appsl_wrk_t *aw;
+
+ aw = &am->wrk[thread_index];
+
+ if (mqe->flags & APP_RX_MQ_F_PENDING)
+ {
+ session_wrk_handle_mq (session_main_get_worker (thread_index), mqe->mq);
+ appsl_pending_rx_mqs_del (aw, mqe);
+ }
+
+ clib_file_del_by_index (&file_main, mqe->file_index);
+}
+
+svm_msg_q_t *
+application_rx_mq_get (application_t *app, u32 mq_index)
+{
+ if (!app->rx_mqs)
+ return 0;
+
+ return app->rx_mqs[mq_index].mq;
+}
+
+static int
+app_rx_mqs_alloc (application_t *app)
+{
+ u32 evt_q_length, evt_size = sizeof (session_event_t);
+ fifo_segment_t *eqs = &app->rx_mqs_segment;
+ u32 n_mqs = vlib_num_workers () + 1;
+ segment_manager_props_t *props;
+ int i;
+
+ props = application_segment_manager_properties (app);
+ evt_q_length = clib_max (props->evt_q_size, 128);
+
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ { evt_q_length, evt_size, 0 }, { evt_q_length >> 1, 256, 0 }
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+
+ eqs->ssvm.ssvm_size = svm_msg_q_size_to_alloc (cfg) * n_mqs + (16 << 10);
+ eqs->ssvm.name = format (0, "%s-rx-mqs-seg%c", app->name, 0);
+
+ if (ssvm_server_init (&eqs->ssvm, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return SESSION_E_SEG_CREATE;
+ }
+
+ fifo_segment_init (eqs);
+
+ /* Fifo segment filled only with mqs */
+ eqs->h->n_mqs = n_mqs;
+ vec_validate (app->rx_mqs, n_mqs - 1);
+
+ for (i = 0; i < n_mqs; i++)
+ {
+ app->rx_mqs[i].mq = fifo_segment_msg_q_alloc (eqs, i, cfg);
+ if (svm_msg_q_alloc_eventfd (app->rx_mqs[i].mq))
+ {
+ clib_warning ("eventfd returned");
+ fifo_segment_cleanup (eqs);
+ ssvm_delete (&eqs->ssvm);
+ return SESSION_E_EVENTFD_ALLOC;
+ }
+ app_rx_mqs_epoll_add (app, &app->rx_mqs[i]);
+ app->rx_mqs[i].app_index = app->app_index;
+ }
+
+ return 0;
+}
+
+u8
+application_use_private_rx_mqs (void)
+{
+ return session_main.use_private_rx_mqs;
+}
+
+fifo_segment_t *
+application_get_rx_mqs_segment (application_t *app)
+{
+ if (application_use_private_rx_mqs ())
+ return &app->rx_mqs_segment;
+ return session_main_get_evt_q_segment ();
+}
+
+void
+application_enable_rx_mqs_nodes (u8 is_en)
+{
+ u8 state = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
+
+ foreach_vlib_main ()
+ vlib_node_set_state (this_vlib_main, appsl_rx_mqs_input_node.index, state);
+}
+
static application_t *
application_alloc (void)
{
@@ -596,6 +852,20 @@ application_free (application_t * app)
pool_free (app->worker_maps);
/*
+ * Free rx mqs if allocated
+ */
+ if (app->rx_mqs)
+ {
+ int i;
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ app_rx_mqs_epoll_del (app, &app->rx_mqs[i]);
+
+ fifo_segment_cleanup (&app->rx_mqs_segment);
+ ssvm_delete (&app->rx_mqs_segment.ssvm);
+ vec_free (app->rx_mqs);
+ }
+
+ /*
* Cleanup remaining state
*/
if (application_is_builtin (app))
@@ -875,8 +1145,12 @@ vnet_application_attach (vnet_app_attach_args_t * a)
a->segment_handle = segment_manager_segment_handle (sm, fs);
segment_manager_segment_reader_unlock (sm);
+
+ if (!application_is_builtin (app) && application_use_private_rx_mqs ())
+ rv = app_rx_mqs_alloc (app);
+
vec_free (app_name);
- return 0;
+ return rv;
}
/**
@@ -1537,6 +1811,8 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
{
app_worker_map_t *map;
app_worker_t *wrk;
+ int i;
+
/* *INDENT-OFF* */
pool_foreach (map, app->worker_maps) {
wrk = app_worker_get (map->wrk_index);
@@ -1545,6 +1821,10 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
wrk->event_queue);
}
/* *INDENT-ON* */
+
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ vlib_cli_output (vm, "[A%d][R%d]%U", app->app_index, i, format_svm_msg_q,
+ app->rx_mqs[i].mq);
}
static clib_error_t *
@@ -1731,10 +2011,18 @@ vnet_app_del_cert_key_pair (u32 index)
clib_error_t *
application_init (vlib_main_t * vm)
{
+ app_main_t *am = &app_main;
+ u32 n_workers;
+
+ n_workers = vlib_num_workers ();
+
/* Index 0 was originally used by legacy apis, maintain as invalid */
(void) app_cert_key_pair_alloc ();
- app_main.last_crypto_engine = CRYPTO_ENGINE_LAST;
- app_main.app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+ am->last_crypto_engine = CRYPTO_ENGINE_LAST;
+ am->app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+
+ vec_validate (am->wrk, n_workers);
+
return 0;
}