summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-01-15 13:49:33 -0800
committerDave Barach <openvpp@barachs.net>2021-03-29 20:20:03 +0000
commit41d5f541d37dc564565b3b29eb370b65bb5a9036 (patch)
tree49c80b5c140c0693c37a037ef513c62d92c74a7e
parenta840db21e8cce5f27f2a41bd245d59e6aeb8a932 (diff)
svm session vcl: per app rx message queues
Add option to use per app private segments for app to vpp message queues, as opposed to exposing internal message queues segment. When so configured, internal message queues are still polled by the session queue node but external app message queues are handled by a new input node (appsl-rx-mqs-input) that runs in interrupt state. Signaling of the node, when mqs receive new messages, is done through eventfds epolled by worker epoll input nodes. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Iffe8ce5a9944a56a14e6d0f492a850cb9e392d16
-rw-r--r--src/svm/message_queue.c2
-rw-r--r--src/vcl/test/test_vcl.py20
-rw-r--r--src/vcl/vcl_sapi.c3
-rw-r--r--src/vnet/session/application.c294
-rw-r--r--src/vnet/session/application.h56
-rw-r--r--src/vnet/session/session.c6
-rw-r--r--src/vnet/session/session.h5
-rw-r--r--src/vnet/session/session_api.c107
-rw-r--r--src/vnet/session/session_debug.h16
-rw-r--r--src/vnet/session/session_node.c50
-rw-r--r--src/vnet/session/session_types.h63
11 files changed, 518 insertions, 104 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 5c04b19e64c..e08ba06fb27 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -482,7 +482,7 @@ int
svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ if ((fd = eventfd (0, 0)) < 0)
return -1;
svm_msg_q_set_eventfd (mq, fd);
return 0;
diff --git a/src/vcl/test/test_vcl.py b/src/vcl/test/test_vcl.py
index f5a5bebdad8..80b9f2f0211 100644
--- a/src/vcl/test/test_vcl.py
+++ b/src/vcl/test/test_vcl.py
@@ -281,6 +281,7 @@ class LDPCutThruTestCase(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_ldp_cut_thru_echo(self):
@@ -351,6 +352,7 @@ class VCLCutThruTestCase(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_vcl_cut_thru_echo(self):
""" run VCL cut thru echo test """
@@ -406,6 +408,7 @@ class VCLThruHostStackEcho(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show app server"))
self.logger.debug(self.vapi.cli("show session verbose"))
+ self.logger.debug(self.vapi.cli("show app mq"))
class VCLThruHostStackTLS(VCLTestCase):
@@ -444,6 +447,7 @@ class VCLThruHostStackTLS(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show app server"))
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
class VCLThruHostStackBidirNsock(VCLTestCase):
@@ -476,6 +480,7 @@ class VCLThruHostStackBidirNsock(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_vcl_thru_host_stack_bi_dir_nsock(self):
""" run VCL thru host stack bi-directional (multiple sockets) test """
@@ -517,6 +522,7 @@ class LDPThruHostStackBidirNsock(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_ldp_thru_host_stack_bi_dir_nsock(self):
""" run LDP thru host stack bi-directional (multiple sockets) test """
@@ -632,6 +638,7 @@ class LDPThruHostStackIperf(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
def test_ldp_thru_host_stack_iperf3(self):
@@ -668,6 +675,7 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
def test_ldp_thru_host_stack_iperf3_udp(self):
@@ -689,6 +697,10 @@ class LDPIpv6CutThruTestCase(VCLTestCase):
def tearDownClass(cls):
super(LDPIpv6CutThruTestCase, cls).tearDownClass()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def setUp(self):
super(LDPIpv6CutThruTestCase, self).setUp()
@@ -765,6 +777,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
def tearDownClass(cls):
super(VCLIpv6CutThruTestCase, cls).tearDownClass()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def setUp(self):
super(VCLIpv6CutThruTestCase, self).setUp()
@@ -789,6 +805,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
super(VCLIpv6CutThruTestCase, self).tearDown()
self.cut_thru_tear_down()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def test_vcl_ipv6_cut_thru_echo(self):
""" run VCL IPv6 cut thru echo test """
diff --git a/src/vcl/vcl_sapi.c b/src/vcl/vcl_sapi.c
index 5258722a484..14401da9d57 100644
--- a/src/vcl/vcl_sapi.c
+++ b/src/vcl/vcl_sapi.c
@@ -94,7 +94,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_ctrl_mq,
mp->vpp_ctrl_mq_thread, &wrk->ctrl_mq);
vcm->ctrl_mq = wrk->ctrl_mq;
-
vcm->app_index = mp->app_index;
return 0;
@@ -156,7 +155,7 @@ vcl_sapi_attach (void)
app_sapi_msg_t _rmp, *rmp = &_rmp;
clib_error_t *err;
clib_socket_t *cs;
- int fds[SESSION_N_FD_TYPE];
+ int fds[32];
/*
* Init client socket and send attach
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index eb8a7169d8a..b055ab49f3c 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -412,6 +412,262 @@ application_lookup_name (const u8 * name)
return 0;
}
+void
+appsl_pending_rx_mqs_add_tail (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ app_rx_mq_elt_t *head;
+
+ if (!aw->pending_rx_mqs)
+ {
+ elt->next = elt->prev = elt;
+ aw->pending_rx_mqs = elt;
+ return;
+ }
+
+ head = aw->pending_rx_mqs;
+
+ ASSERT (head != elt);
+
+ elt->prev = head->prev;
+ elt->next = head;
+
+ head->prev->next = elt;
+ head->prev = elt;
+}
+
+void
+appsl_pending_rx_mqs_del (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ if (elt->next == elt)
+ {
+ elt->next = elt->prev = 0;
+ aw->pending_rx_mqs = 0;
+ return;
+ }
+
+ if (elt == aw->pending_rx_mqs)
+ aw->pending_rx_mqs = elt->next;
+
+ elt->next->prev = elt->prev;
+ elt->prev->next = elt->next;
+ elt->next = elt->prev = 0;
+}
+
+vlib_node_registration_t appsl_rx_mqs_input_node;
+
+VLIB_NODE_FN (appsl_rx_mqs_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 thread_index = vm->thread_index, n_msgs = 0;
+ app_rx_mq_elt_t *elt, *next;
+ app_main_t *am = &app_main;
+ session_worker_t *wrk;
+ int __clib_unused rv;
+ appsl_wrk_t *aw;
+ u64 buf;
+
+ aw = &am->wrk[thread_index];
+ elt = aw->pending_rx_mqs;
+ if (!elt)
+ return 0;
+
+ wrk = session_main_get_worker (thread_index);
+
+ do
+ {
+ if (!(elt->flags & APP_RX_MQ_F_POSTPONED))
+ rv = read (svm_msg_q_get_eventfd (elt->mq), &buf, sizeof (buf));
+ n_msgs += session_wrk_handle_mq (wrk, elt->mq);
+
+ next = elt->next;
+ appsl_pending_rx_mqs_del (aw, elt);
+ if (!svm_msg_q_is_empty (elt->mq))
+ {
+ elt->flags |= APP_RX_MQ_F_POSTPONED;
+ appsl_pending_rx_mqs_add_tail (aw, elt);
+ }
+ else
+ {
+ elt->flags = 0;
+ }
+ elt = next;
+ }
+ while (aw->pending_rx_mqs && elt != aw->pending_rx_mqs);
+
+ if (aw->pending_rx_mqs)
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return n_msgs;
+}
+
+VLIB_REGISTER_NODE (appsl_rx_mqs_input_node) = {
+ .name = "appsl-rx-mqs-input",
+ .type = VLIB_NODE_TYPE_INPUT,
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+
+static clib_error_t *
+app_rx_mq_fd_read_ready (clib_file_t *cf)
+{
+ app_rx_mq_handle_t *handle = (app_rx_mq_handle_t *) &cf->private_data;
+ vlib_main_t *vm = vlib_get_main ();
+ app_main_t *am = &app_main;
+ app_rx_mq_elt_t *mqe;
+ application_t *app;
+ appsl_wrk_t *aw;
+
+ ASSERT (vlib_get_thread_index () == handle->thread_index);
+ app = application_get_if_valid (handle->app_index);
+ if (!app)
+ return 0;
+
+ mqe = &app->rx_mqs[handle->thread_index];
+ if ((mqe->flags & APP_RX_MQ_F_PENDING) || svm_msg_q_is_empty (mqe->mq))
+ return 0;
+
+ aw = &am->wrk[handle->thread_index];
+ appsl_pending_rx_mqs_add_tail (aw, mqe);
+ mqe->flags |= APP_RX_MQ_F_PENDING;
+
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return 0;
+}
+
+static clib_error_t *
+app_rx_mq_fd_write_ready (clib_file_t *cf)
+{
+ clib_warning ("should not be called");
+ return 0;
+}
+
+static void
+app_rx_mqs_epoll_add (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ clib_file_t template = { 0 };
+ app_rx_mq_handle_t handle;
+ u32 thread_index;
+ int fd;
+
+ thread_index = mqe - app->rx_mqs;
+ fd = svm_msg_q_get_eventfd (mqe->mq);
+
+ handle.app_index = app->app_index;
+ handle.thread_index = thread_index;
+
+ template.read_function = app_rx_mq_fd_read_ready;
+ template.write_function = app_rx_mq_fd_write_ready;
+ template.file_descriptor = fd;
+ template.private_data = handle.as_u64;
+ template.polling_thread_index = thread_index;
+ template.description =
+ format (0, "app-%u-rx-mq-%u", app->app_index, thread_index);
+ mqe->file_index = clib_file_add (&file_main, &template);
+}
+
+static void
+app_rx_mqs_epoll_del (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ u32 thread_index = mqe - app->rx_mqs;
+ app_main_t *am = &app_main;
+ appsl_wrk_t *aw;
+
+ aw = &am->wrk[thread_index];
+
+ if (mqe->flags & APP_RX_MQ_F_PENDING)
+ {
+ session_wrk_handle_mq (session_main_get_worker (thread_index), mqe->mq);
+ appsl_pending_rx_mqs_del (aw, mqe);
+ }
+
+ clib_file_del_by_index (&file_main, mqe->file_index);
+}
+
+svm_msg_q_t *
+application_rx_mq_get (application_t *app, u32 mq_index)
+{
+ if (!app->rx_mqs)
+ return 0;
+
+ return app->rx_mqs[mq_index].mq;
+}
+
+static int
+app_rx_mqs_alloc (application_t *app)
+{
+ u32 evt_q_length, evt_size = sizeof (session_event_t);
+ fifo_segment_t *eqs = &app->rx_mqs_segment;
+ u32 n_mqs = vlib_num_workers () + 1;
+ segment_manager_props_t *props;
+ int i;
+
+ props = application_segment_manager_properties (app);
+ evt_q_length = clib_max (props->evt_q_size, 128);
+
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ { evt_q_length, evt_size, 0 }, { evt_q_length >> 1, 256, 0 }
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+
+ eqs->ssvm.ssvm_size = svm_msg_q_size_to_alloc (cfg) * n_mqs + (16 << 10);
+ eqs->ssvm.name = format (0, "%s-rx-mqs-seg%c", app->name, 0);
+
+ if (ssvm_server_init (&eqs->ssvm, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return SESSION_E_SEG_CREATE;
+ }
+
+ fifo_segment_init (eqs);
+
+ /* Fifo segment filled only with mqs */
+ eqs->h->n_mqs = n_mqs;
+ vec_validate (app->rx_mqs, n_mqs - 1);
+
+ for (i = 0; i < n_mqs; i++)
+ {
+ app->rx_mqs[i].mq = fifo_segment_msg_q_alloc (eqs, i, cfg);
+ if (svm_msg_q_alloc_eventfd (app->rx_mqs[i].mq))
+ {
+ clib_warning ("eventfd returned");
+ fifo_segment_cleanup (eqs);
+ ssvm_delete (&eqs->ssvm);
+ return SESSION_E_EVENTFD_ALLOC;
+ }
+ app_rx_mqs_epoll_add (app, &app->rx_mqs[i]);
+ app->rx_mqs[i].app_index = app->app_index;
+ }
+
+ return 0;
+}
+
+u8
+application_use_private_rx_mqs (void)
+{
+ return session_main.use_private_rx_mqs;
+}
+
+fifo_segment_t *
+application_get_rx_mqs_segment (application_t *app)
+{
+ if (application_use_private_rx_mqs ())
+ return &app->rx_mqs_segment;
+ return session_main_get_evt_q_segment ();
+}
+
+void
+application_enable_rx_mqs_nodes (u8 is_en)
+{
+ u8 state = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
+
+ foreach_vlib_main ()
+ vlib_node_set_state (this_vlib_main, appsl_rx_mqs_input_node.index, state);
+}
+
static application_t *
application_alloc (void)
{
@@ -596,6 +852,20 @@ application_free (application_t * app)
pool_free (app->worker_maps);
/*
+ * Free rx mqs if allocated
+ */
+ if (app->rx_mqs)
+ {
+ int i;
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ app_rx_mqs_epoll_del (app, &app->rx_mqs[i]);
+
+ fifo_segment_cleanup (&app->rx_mqs_segment);
+ ssvm_delete (&app->rx_mqs_segment.ssvm);
+ vec_free (app->rx_mqs);
+ }
+
+ /*
* Cleanup remaining state
*/
if (application_is_builtin (app))
@@ -875,8 +1145,12 @@ vnet_application_attach (vnet_app_attach_args_t * a)
a->segment_handle = segment_manager_segment_handle (sm, fs);
segment_manager_segment_reader_unlock (sm);
+
+ if (!application_is_builtin (app) && application_use_private_rx_mqs ())
+ rv = app_rx_mqs_alloc (app);
+
vec_free (app_name);
- return 0;
+ return rv;
}
/**
@@ -1537,6 +1811,8 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
{
app_worker_map_t *map;
app_worker_t *wrk;
+ int i;
+
/* *INDENT-OFF* */
pool_foreach (map, app->worker_maps) {
wrk = app_worker_get (map->wrk_index);
@@ -1545,6 +1821,10 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
wrk->event_queue);
}
/* *INDENT-ON* */
+
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ vlib_cli_output (vm, "[A%d][R%d]%U", app->app_index, i, format_svm_msg_q,
+ app->rx_mqs[i].mq);
}
static clib_error_t *
@@ -1731,10 +2011,18 @@ vnet_app_del_cert_key_pair (u32 index)
clib_error_t *
application_init (vlib_main_t * vm)
{
+ app_main_t *am = &app_main;
+ u32 n_workers;
+
+ n_workers = vlib_num_workers ();
+
/* Index 0 was originally used by legacy apis, maintain as invalid */
(void) app_cert_key_pair_alloc ();
- app_main.last_crypto_engine = CRYPTO_ENGINE_LAST;
- app_main.app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+ am->last_crypto_engine = CRYPTO_ENGINE_LAST;
+ am->app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+
+ vec_validate (am->wrk, n_workers);
+
return 0;
}
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index b6f957ab871..0bfd4d1d813 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -92,6 +92,22 @@ typedef struct app_listener_
the app listener */
} app_listener_t;
+typedef enum app_rx_mq_flags_
+{
+ APP_RX_MQ_F_PENDING = 1 << 0,
+ APP_RX_MQ_F_POSTPONED = 1 << 1,
+} app_rx_mq_flags_t;
+
+typedef struct app_rx_mq_elt_
+{
+ struct app_rx_mq_elt_ *next;
+ struct app_rx_mq_elt_ *prev;
+ svm_msg_q_t *mq;
+ uword file_index;
+ u32 app_index;
+ u8 flags;
+} app_rx_mq_elt_t;
+
typedef struct application_
{
/** App index in app pool */
@@ -127,8 +143,38 @@ typedef struct application_
char quic_iv[17];
u8 quic_iv_set;
+ /** Segment where rx mqs were allocated */
+ fifo_segment_t rx_mqs_segment;
+
+ /**
+ * Fixed vector of rx mqs that can be a part of pending_rx_mqs
+ * linked list maintained by the app sublayer for each worker
+ */
+ app_rx_mq_elt_t *rx_mqs;
} application_t;
+typedef struct app_rx_mq_handle_
+{
+ union
+ {
+ struct
+ {
+ u32 app_index;
+ u32 thread_index;
+ };
+ u64 as_u64;
+ };
+} __attribute__ ((aligned (sizeof (u64)))) app_rx_mq_handle_t;
+
+/**
+ * App sublayer per vpp worker state
+ */
+typedef struct asl_wrk_
+{
+ /** Linked list of mqs with pending messages */
+ app_rx_mq_elt_t *pending_rx_mqs;
+} appsl_wrk_t;
+
typedef struct app_main_
{
/**
@@ -155,6 +201,11 @@ typedef struct app_main_
* Last registered crypto engine type
*/
crypto_engine_type_t last_crypto_engine;
+
+ /**
+ * App sublayer per-worker state
+ */
+ appsl_wrk_t *wrk;
} app_main_t;
typedef struct app_init_args_
@@ -239,6 +290,11 @@ segment_manager_props_t *application_get_segment_manager_properties (u32
segment_manager_props_t
* application_segment_manager_properties (application_t * app);
+svm_msg_q_t *application_rx_mq_get (application_t *app, u32 mq_index);
+u8 application_use_private_rx_mqs (void);
+fifo_segment_t *application_get_rx_mqs_segment (application_t *app);
+void application_enable_rx_mqs_nodes (u8 is_en);
+
/*
* App worker
*/
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 2400a19a351..c447557f8b0 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1776,6 +1776,9 @@ session_node_enable_disable (u8 is_en)
}
vlib_node_set_state (this_vlib_main, session_queue_node.index, state);
}
+
+ if (session_main.use_private_rx_mqs)
+ application_enable_rx_mqs_nodes (is_en);
}
clib_error_t *
@@ -1808,6 +1811,7 @@ session_main_init (vlib_main_t * vm)
smm->is_enabled = 0;
smm->session_enable_asap = 0;
smm->poll_main = 0;
+ smm->use_private_rx_mqs = 0;
smm->session_baseva = HIGH_SEGMENT_BASEVA;
#if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1927,6 +1931,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
appns_sapi_enable ();
else if (unformat (input, "poll-main"))
smm->poll_main = 1;
+ else if (unformat (input, "use-private-rx-mqs"))
+ smm->use_private_rx_mqs = 1;
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e8afcd03d69..aba8a1c3fd1 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -174,6 +174,9 @@ typedef struct session_main_
/** Poll session node in main thread */
u8 poll_main;
+ /** Allocate private rx mqs for external apps */
+ u8 use_private_rx_mqs;
+
/** vpp fifo event queue configured length */
u32 configured_event_queue_length;
@@ -296,6 +299,8 @@ session_evt_alloc_old (session_worker_t * wrk)
return elt;
}
+int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
+
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
void session_free_w_fifos (session_t * s);
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 3ca3c2a2692..5910cd366fb 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -143,7 +143,7 @@ mq_send_session_accepted_cb (session_t * s)
m.segment_handle = session_segment_handle (s);
m.flags = s->flags;
- eq_seg = session_main_get_evt_q_segment ();
+ eq_seg = application_get_rx_mqs_segment (app);
if (session_has_transport (s))
{
@@ -271,6 +271,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
+ application_t *app;
+
+ app_wrk = app_worker_get (app_wrk_index);
m.context = api_context;
m.retval = err;
@@ -278,7 +281,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
if (err)
goto snd_msg;
- eq_seg = session_main_get_evt_q_segment ();
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
if (session_has_transport (s))
{
@@ -322,7 +326,6 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_wrk = app_worker_get (app_wrk_index);
app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -348,9 +351,12 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
+ application_t *app;
app_listener_t *al;
session_t *ls = 0;
+ app_wrk = app_worker_get (app_wrk_index);
+
m.context = api_context;
m.retval = rv;
@@ -368,8 +374,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
m.lcl_port = tep.port;
m.lcl_is_ip4 = tep.is_ip4;
clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
-
- eq_seg = session_main_get_evt_q_segment ();
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
@@ -382,7 +388,6 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_wrk = app_worker_get (app_wrk_index);
app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -429,10 +434,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
app_worker_t *app_wrk;
session_event_t *evt;
svm_msg_q_t *app_mq;
+ application_t *app;
u32 thread_index;
thread_index = session_thread_from_handle (new_sh);
- eq_seg = session_main_get_evt_q_segment ();
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_mq = app_wrk->event_queue;
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
m.handle = session_handle (s);
m.new_handle = new_sh;
@@ -440,8 +449,6 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
m.segment_handle = SESSION_INVALID_HANDLE;
- app_wrk = app_worker_get (s->app_wrk_index);
- app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
return;
@@ -604,17 +611,20 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
static void
vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
{
- int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
- vl_api_app_attach_reply_t *rmp;
- fifo_segment_t *segp, *evt_q_segment = 0;
+ int rv = 0, *fds = 0, n_fds = 0, n_workers, i;
+ fifo_segment_t *segp, *rx_mqs_seg = 0;
vnet_app_attach_args_t _a, *a = &_a;
+ vl_api_app_attach_reply_t *rmp;
u8 fd_flags = 0, ctrl_thread;
vl_api_registration_t *reg;
+ svm_msg_q_t *rx_mq;
+ application_t *app;
reg = vl_api_client_index_to_registration (mp->client_index);
if (!reg)
return;
+ n_workers = vlib_num_workers ();
if (!session_main_is_enabled () || appns_sapi_enabled ())
{
rv = VNET_API_ERROR_FEATURE_DISABLED;
@@ -645,13 +655,16 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
}
vec_free (a->namespace_id);
- /* Send event queues segment */
- if ((evt_q_segment = session_main_get_evt_q_segment ()))
- {
- fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
- fds[n_fds] = evt_q_segment->ssvm.fd;
- n_fds += 1;
- }
+ vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
+ /* Send rx mqs segment */
+ app = application_get (a->app_index);
+ rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = rx_mqs_seg->ssvm.fd;
+ n_fds += 1;
+
/* Send fifo segment fd if needed */
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
{
@@ -666,17 +679,27 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
n_fds += 1;
}
+ if (application_use_private_rx_mqs ())
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+ for (i = 0; i < n_workers + 1; i++)
+ {
+ rx_mq = application_rx_mq_get (app, i);
+ fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+ n_fds += 1;
+ }
+ }
+
done:
/* *INDENT-OFF* */
REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
if (!rv)
{
- ctrl_thread = vlib_num_workers () ? 1 : 0;
+ ctrl_thread = n_workers ? 1 : 0;
segp = (fifo_segment_t *) a->segment;
rmp->app_index = clib_host_to_net_u32 (a->app_index);
rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
- rmp->vpp_ctrl_mq =
- fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+ rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
@@ -692,6 +715,7 @@ done:
if (n_fds)
session_send_fds (reg, fds, n_fds);
+ vec_free (fds);
}
static void
@@ -1268,15 +1292,16 @@ static void
session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
app_sapi_attach_msg_t * mp)
{
- int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+ int rv = 0, *fds = 0, n_fds = 0, i, n_workers;
vnet_app_attach_args_t _a, *a = &_a;
app_sapi_attach_reply_msg_t *rmp;
- fifo_segment_t *evt_q_segment;
u8 fd_flags = 0, ctrl_thread;
app_ns_api_handle_t *handle;
+ fifo_segment_t *rx_mqs_seg;
app_sapi_msg_t msg = { 0 };
app_worker_t *app_wrk;
application_t *app;
+ svm_msg_q_t *rx_mq;
/* Make sure name is null terminated */
mp->name[63] = 0;
@@ -1295,13 +1320,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
goto done;
}
+ n_workers = vlib_num_workers ();
+ vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
/* Send event queues segment */
- if ((evt_q_segment = session_main_get_evt_q_segment ()))
- {
- fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
- fds[n_fds] = evt_q_segment->ssvm.fd;
- n_fds += 1;
- }
+ app = application_get (a->app_index);
+ rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = rx_mqs_seg->ssvm.fd;
+ n_fds += 1;
+
/* Send fifo segment fd if needed */
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
{
@@ -1316,6 +1345,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
n_fds += 1;
}
+ if (application_use_private_rx_mqs ())
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+ for (i = 0; i < n_workers + 1; i++)
+ {
+ rx_mq = application_rx_mq_get (app, i);
+ fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+ n_fds += 1;
+ }
+ }
+
done:
msg.type = APP_SAPI_MSG_TYPE_ATTACH_REPLY;
@@ -1323,12 +1363,11 @@ done:
rmp->retval = rv;
if (!rv)
{
- ctrl_thread = vlib_num_workers ()? 1 : 0;
+ ctrl_thread = n_workers ? 1 : 0;
rmp->app_index = a->app_index;
rmp->app_mq =
fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
- rmp->vpp_ctrl_mq =
- fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+ rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
@@ -1339,13 +1378,13 @@ done:
/* Update app index for socket */
handle = (app_ns_api_handle_t *) & cs->private_data;
- app = application_get (a->app_index);
app_wrk = application_get_worker (app, 0);
handle->aah_app_wrk_index = app_wrk->wrk_index;
}
clib_socket_sendmsg (cs, &msg, sizeof (msg), fds, n_fds);
vec_free (a->name);
+ vec_free (fds);
}
static void
diff --git a/src/vnet/session/session_debug.h b/src/vnet/session/session_debug.h
index a42d90d471c..9e49a35dbe6 100644
--- a/src/vnet/session/session_debug.h
+++ b/src/vnet/session/session_debug.h
@@ -237,14 +237,14 @@ extern session_dbg_main_t session_dbg_main;
#if SESSION_CLOCKS_EVT_DBG
-#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...) \
- session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
- sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff; \
-
-#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _dq, _args...) \
- session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
- sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _dq * _cnt; \
- sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff; \
+#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...) \
+ session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
+ sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff;
+
+#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _args...) \
+ session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
+ sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _cnt; \
+ sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff;
#define SESSION_EVT_DSP_CNTRS_CTRL_EVTS_HANDLER(_wrk, _diff, _args...) \
session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index ccf93cbbf61..f3713d00cd1 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -1399,19 +1399,35 @@ session_flush_pending_tx_buffers (session_worker_t * wrk,
vec_reset_length (wrk->pending_tx_nexts);
}
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ u32 i, n_to_dequeue = 0;
+ session_event_t *evt;
+
+ n_to_dequeue = svm_msg_q_size (mq);
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ svm_msg_q_sub_raw (mq, msg);
+ evt = svm_msg_q_msg_data (mq, msg);
+ session_evt_add_to_list (wrk, evt);
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ return n_to_dequeue;
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
+ u32 thread_index = vm->thread_index, __clib_unused n_evts;
+ session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
session_main_t *smm = vnet_get_session_main ();
- u32 thread_index = vm->thread_index, n_to_dequeue;
session_worker_t *wrk = &smm->wrk[thread_index];
- session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
clib_llist_index_t ei, next_ei, old_ti;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- int i = 0, n_tx_packets;
- session_event_t *evt;
- svm_msg_q_t *mq;
+ int n_tx_packets;
SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
@@ -1426,25 +1442,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
/*
- * Dequeue and handle new events
+ * Dequeue new internal mq events
*/
- /* Try to dequeue what is available. Don't wait for lock.
- * XXX: we may need priorities here */
- mq = wrk->vpp_event_queue;
- n_to_dequeue = svm_msg_q_size (mq);
- if (n_to_dequeue)
- {
- for (i = 0; i < n_to_dequeue; i++)
- {
- svm_msg_q_sub_raw (mq, msg);
- evt = svm_msg_q_msg_data (mq, msg);
- session_evt_add_to_list (wrk, evt);
- svm_msg_q_free_msg (mq, msg);
- }
- }
-
- SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+ n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+ SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
/*
* Handle control events
@@ -1452,12 +1454,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
- /* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
session_event_dispatch_ctrl (wrk, elt);
}));
- /* *INDENT-ON* */
SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 7a9f687e4ab..32cb9530bd9 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -449,37 +449,38 @@ typedef struct session_dgram_header_
STATIC_ASSERT (sizeof (session_dgram_hdr_t) == (SESSION_CONN_ID_LEN + 8),
"session conn id wrong length");
-#define foreach_session_error \
- _(NONE, "no error") \
- _(UNKNOWN, "generic/unknown error") \
- _(REFUSED, "refused") \
- _(TIMEDOUT, "timedout") \
- _(ALLOC, "obj/memory allocation error") \
- _(OWNER, "object not owned by application") \
- _(NOROUTE, "no route") \
- _(NOINTF, "no resolving interface") \
- _(NOIP, "no ip for lcl interface") \
- _(NOPORT, "no lcl port") \
- _(NOSUPPORT, "not supported") \
- _(NOLISTEN, "not listening") \
- _(NOSESSION, "session does not exist") \
- _(NOAPP, "app not attached") \
- _(PORTINUSE, "lcl port in use") \
- _(IPINUSE, "ip in use") \
- _(ALREADY_LISTENING, "ip port pair already listened on") \
- _(INVALID_RMT_IP, "invalid remote ip") \
- _(INVALID_APPWRK, "invalid app worker") \
- _(INVALID_NS, "invalid namespace") \
- _(SEG_NO_SPACE, "Couldn't allocate a fifo pair") \
- _(SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair") \
- _(SEG_CREATE, "Couldn't create a new segment") \
- _(FILTERED, "session filtered") \
- _(SCOPE, "scope not supported") \
- _(BAPI_NO_FD, "bapi doesn't have a socket fd") \
- _(BAPI_SEND_FD, "couldn't send fd over bapi socket fd") \
- _(BAPI_NO_REG, "app bapi registration not found") \
- _(MQ_MSG_ALLOC, "failed to alloc mq msg") \
- _(TLS_HANDSHAKE, "failed tls handshake") \
+#define foreach_session_error \
+ _ (NONE, "no error") \
+ _ (UNKNOWN, "generic/unknown error") \
+ _ (REFUSED, "refused") \
+ _ (TIMEDOUT, "timedout") \
+ _ (ALLOC, "obj/memory allocation error") \
+ _ (OWNER, "object not owned by application") \
+ _ (NOROUTE, "no route") \
+ _ (NOINTF, "no resolving interface") \
+ _ (NOIP, "no ip for lcl interface") \
+ _ (NOPORT, "no lcl port") \
+ _ (NOSUPPORT, "not supported") \
+ _ (NOLISTEN, "not listening") \
+ _ (NOSESSION, "session does not exist") \
+ _ (NOAPP, "app not attached") \
+ _ (PORTINUSE, "lcl port in use") \
+ _ (IPINUSE, "ip in use") \
+ _ (ALREADY_LISTENING, "ip port pair already listened on") \
+ _ (INVALID_RMT_IP, "invalid remote ip") \
+ _ (INVALID_APPWRK, "invalid app worker") \
+ _ (INVALID_NS, "invalid namespace") \
+ _ (SEG_NO_SPACE, "Couldn't allocate a fifo pair") \
+ _ (SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair") \
+ _ (SEG_CREATE, "Couldn't create a new segment") \
+ _ (FILTERED, "session filtered") \
+ _ (SCOPE, "scope not supported") \
+ _ (BAPI_NO_FD, "bapi doesn't have a socket fd") \
+ _ (BAPI_SEND_FD, "couldn't send fd over bapi socket fd") \
+ _ (BAPI_NO_REG, "app bapi registration not found") \
+ _ (MQ_MSG_ALLOC, "failed to alloc mq msg") \
+ _ (TLS_HANDSHAKE, "failed tls handshake") \
+ _ (EVENTFD_ALLOC, "failed to alloc eventfd")
typedef enum session_error_p_
{