aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/svm/message_queue.c2
-rw-r--r--src/vcl/test/test_vcl.py20
-rw-r--r--src/vcl/vcl_sapi.c3
-rw-r--r--src/vnet/session/application.c294
-rw-r--r--src/vnet/session/application.h56
-rw-r--r--src/vnet/session/session.c6
-rw-r--r--src/vnet/session/session.h5
-rw-r--r--src/vnet/session/session_api.c107
-rw-r--r--src/vnet/session/session_debug.h16
-rw-r--r--src/vnet/session/session_node.c50
-rw-r--r--src/vnet/session/session_types.h63
11 files changed, 518 insertions, 104 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 5c04b19e64c..e08ba06fb27 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -482,7 +482,7 @@ int
svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ if ((fd = eventfd (0, 0)) < 0)
return -1;
svm_msg_q_set_eventfd (mq, fd);
return 0;
diff --git a/src/vcl/test/test_vcl.py b/src/vcl/test/test_vcl.py
index f5a5bebdad8..80b9f2f0211 100644
--- a/src/vcl/test/test_vcl.py
+++ b/src/vcl/test/test_vcl.py
@@ -281,6 +281,7 @@ class LDPCutThruTestCase(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_ldp_cut_thru_echo(self):
@@ -351,6 +352,7 @@ class VCLCutThruTestCase(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_vcl_cut_thru_echo(self):
""" run VCL cut thru echo test """
@@ -406,6 +408,7 @@ class VCLThruHostStackEcho(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show app server"))
self.logger.debug(self.vapi.cli("show session verbose"))
+ self.logger.debug(self.vapi.cli("show app mq"))
class VCLThruHostStackTLS(VCLTestCase):
@@ -444,6 +447,7 @@ class VCLThruHostStackTLS(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show app server"))
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
class VCLThruHostStackBidirNsock(VCLTestCase):
@@ -476,6 +480,7 @@ class VCLThruHostStackBidirNsock(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_vcl_thru_host_stack_bi_dir_nsock(self):
""" run VCL thru host stack bi-directional (multiple sockets) test """
@@ -517,6 +522,7 @@ class LDPThruHostStackBidirNsock(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
def test_ldp_thru_host_stack_bi_dir_nsock(self):
""" run LDP thru host stack bi-directional (multiple sockets) test """
@@ -632,6 +638,7 @@ class LDPThruHostStackIperf(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
def test_ldp_thru_host_stack_iperf3(self):
@@ -668,6 +675,7 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
@unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
def test_ldp_thru_host_stack_iperf3_udp(self):
@@ -689,6 +697,10 @@ class LDPIpv6CutThruTestCase(VCLTestCase):
def tearDownClass(cls):
super(LDPIpv6CutThruTestCase, cls).tearDownClass()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def setUp(self):
super(LDPIpv6CutThruTestCase, self).setUp()
@@ -765,6 +777,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
def tearDownClass(cls):
super(VCLIpv6CutThruTestCase, cls).tearDownClass()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def setUp(self):
super(VCLIpv6CutThruTestCase, self).setUp()
@@ -789,6 +805,10 @@ class VCLIpv6CutThruTestCase(VCLTestCase):
super(VCLIpv6CutThruTestCase, self).tearDown()
self.cut_thru_tear_down()
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
def test_vcl_ipv6_cut_thru_echo(self):
""" run VCL IPv6 cut thru echo test """
diff --git a/src/vcl/vcl_sapi.c b/src/vcl/vcl_sapi.c
index 5258722a484..14401da9d57 100644
--- a/src/vcl/vcl_sapi.c
+++ b/src/vcl/vcl_sapi.c
@@ -94,7 +94,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_ctrl_mq,
mp->vpp_ctrl_mq_thread, &wrk->ctrl_mq);
vcm->ctrl_mq = wrk->ctrl_mq;
-
vcm->app_index = mp->app_index;
return 0;
@@ -156,7 +155,7 @@ vcl_sapi_attach (void)
app_sapi_msg_t _rmp, *rmp = &_rmp;
clib_error_t *err;
clib_socket_t *cs;
- int fds[SESSION_N_FD_TYPE];
+ int fds[32];
/*
* Init client socket and send attach
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index eb8a7169d8a..b055ab49f3c 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -412,6 +412,262 @@ application_lookup_name (const u8 * name)
return 0;
}
+void
+appsl_pending_rx_mqs_add_tail (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ app_rx_mq_elt_t *head;
+
+ if (!aw->pending_rx_mqs)
+ {
+ elt->next = elt->prev = elt;
+ aw->pending_rx_mqs = elt;
+ return;
+ }
+
+ head = aw->pending_rx_mqs;
+
+ ASSERT (head != elt);
+
+ elt->prev = head->prev;
+ elt->next = head;
+
+ head->prev->next = elt;
+ head->prev = elt;
+}
+
+void
+appsl_pending_rx_mqs_del (appsl_wrk_t *aw, app_rx_mq_elt_t *elt)
+{
+ if (elt->next == elt)
+ {
+ elt->next = elt->prev = 0;
+ aw->pending_rx_mqs = 0;
+ return;
+ }
+
+ if (elt == aw->pending_rx_mqs)
+ aw->pending_rx_mqs = elt->next;
+
+ elt->next->prev = elt->prev;
+ elt->prev->next = elt->next;
+ elt->next = elt->prev = 0;
+}
+
+vlib_node_registration_t appsl_rx_mqs_input_node;
+
+VLIB_NODE_FN (appsl_rx_mqs_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 thread_index = vm->thread_index, n_msgs = 0;
+ app_rx_mq_elt_t *elt, *next;
+ app_main_t *am = &app_main;
+ session_worker_t *wrk;
+ int __clib_unused rv;
+ appsl_wrk_t *aw;
+ u64 buf;
+
+ aw = &am->wrk[thread_index];
+ elt = aw->pending_rx_mqs;
+ if (!elt)
+ return 0;
+
+ wrk = session_main_get_worker (thread_index);
+
+ do
+ {
+ if (!(elt->flags & APP_RX_MQ_F_POSTPONED))
+ rv = read (svm_msg_q_get_eventfd (elt->mq), &buf, sizeof (buf));
+ n_msgs += session_wrk_handle_mq (wrk, elt->mq);
+
+ next = elt->next;
+ appsl_pending_rx_mqs_del (aw, elt);
+ if (!svm_msg_q_is_empty (elt->mq))
+ {
+ elt->flags |= APP_RX_MQ_F_POSTPONED;
+ appsl_pending_rx_mqs_add_tail (aw, elt);
+ }
+ else
+ {
+ elt->flags = 0;
+ }
+ elt = next;
+ }
+ while (aw->pending_rx_mqs && elt != aw->pending_rx_mqs);
+
+ if (aw->pending_rx_mqs)
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return n_msgs;
+}
+
+VLIB_REGISTER_NODE (appsl_rx_mqs_input_node) = {
+ .name = "appsl-rx-mqs-input",
+ .type = VLIB_NODE_TYPE_INPUT,
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+
+static clib_error_t *
+app_rx_mq_fd_read_ready (clib_file_t *cf)
+{
+ app_rx_mq_handle_t *handle = (app_rx_mq_handle_t *) &cf->private_data;
+ vlib_main_t *vm = vlib_get_main ();
+ app_main_t *am = &app_main;
+ app_rx_mq_elt_t *mqe;
+ application_t *app;
+ appsl_wrk_t *aw;
+
+ ASSERT (vlib_get_thread_index () == handle->thread_index);
+ app = application_get_if_valid (handle->app_index);
+ if (!app)
+ return 0;
+
+ mqe = &app->rx_mqs[handle->thread_index];
+ if ((mqe->flags & APP_RX_MQ_F_PENDING) || svm_msg_q_is_empty (mqe->mq))
+ return 0;
+
+ aw = &am->wrk[handle->thread_index];
+ appsl_pending_rx_mqs_add_tail (aw, mqe);
+ mqe->flags |= APP_RX_MQ_F_PENDING;
+
+ vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
+
+ return 0;
+}
+
+static clib_error_t *
+app_rx_mq_fd_write_ready (clib_file_t *cf)
+{
+ clib_warning ("should not be called");
+ return 0;
+}
+
+static void
+app_rx_mqs_epoll_add (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ clib_file_t template = { 0 };
+ app_rx_mq_handle_t handle;
+ u32 thread_index;
+ int fd;
+
+ thread_index = mqe - app->rx_mqs;
+ fd = svm_msg_q_get_eventfd (mqe->mq);
+
+ handle.app_index = app->app_index;
+ handle.thread_index = thread_index;
+
+ template.read_function = app_rx_mq_fd_read_ready;
+ template.write_function = app_rx_mq_fd_write_ready;
+ template.file_descriptor = fd;
+ template.private_data = handle.as_u64;
+ template.polling_thread_index = thread_index;
+ template.description =
+ format (0, "app-%u-rx-mq-%u", app->app_index, thread_index);
+ mqe->file_index = clib_file_add (&file_main, &template);
+}
+
+static void
+app_rx_mqs_epoll_del (application_t *app, app_rx_mq_elt_t *mqe)
+{
+ u32 thread_index = mqe - app->rx_mqs;
+ app_main_t *am = &app_main;
+ appsl_wrk_t *aw;
+
+ aw = &am->wrk[thread_index];
+
+ if (mqe->flags & APP_RX_MQ_F_PENDING)
+ {
+ session_wrk_handle_mq (session_main_get_worker (thread_index), mqe->mq);
+ appsl_pending_rx_mqs_del (aw, mqe);
+ }
+
+ clib_file_del_by_index (&file_main, mqe->file_index);
+}
+
+svm_msg_q_t *
+application_rx_mq_get (application_t *app, u32 mq_index)
+{
+ if (!app->rx_mqs)
+ return 0;
+
+ return app->rx_mqs[mq_index].mq;
+}
+
+static int
+app_rx_mqs_alloc (application_t *app)
+{
+ u32 evt_q_length, evt_size = sizeof (session_event_t);
+ fifo_segment_t *eqs = &app->rx_mqs_segment;
+ u32 n_mqs = vlib_num_workers () + 1;
+ segment_manager_props_t *props;
+ int i;
+
+ props = application_segment_manager_properties (app);
+ evt_q_length = clib_max (props->evt_q_size, 128);
+
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ { evt_q_length, evt_size, 0 }, { evt_q_length >> 1, 256, 0 }
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+
+ eqs->ssvm.ssvm_size = svm_msg_q_size_to_alloc (cfg) * n_mqs + (16 << 10);
+ eqs->ssvm.name = format (0, "%s-rx-mqs-seg%c", app->name, 0);
+
+ if (ssvm_server_init (&eqs->ssvm, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return SESSION_E_SEG_CREATE;
+ }
+
+ fifo_segment_init (eqs);
+
+ /* Fifo segment filled only with mqs */
+ eqs->h->n_mqs = n_mqs;
+ vec_validate (app->rx_mqs, n_mqs - 1);
+
+ for (i = 0; i < n_mqs; i++)
+ {
+ app->rx_mqs[i].mq = fifo_segment_msg_q_alloc (eqs, i, cfg);
+ if (svm_msg_q_alloc_eventfd (app->rx_mqs[i].mq))
+ {
+ clib_warning ("eventfd returned");
+ fifo_segment_cleanup (eqs);
+ ssvm_delete (&eqs->ssvm);
+ return SESSION_E_EVENTFD_ALLOC;
+ }
+ app_rx_mqs_epoll_add (app, &app->rx_mqs[i]);
+ app->rx_mqs[i].app_index = app->app_index;
+ }
+
+ return 0;
+}
+
+u8
+application_use_private_rx_mqs (void)
+{
+ return session_main.use_private_rx_mqs;
+}
+
+fifo_segment_t *
+application_get_rx_mqs_segment (application_t *app)
+{
+ if (application_use_private_rx_mqs ())
+ return &app->rx_mqs_segment;
+ return session_main_get_evt_q_segment ();
+}
+
+void
+application_enable_rx_mqs_nodes (u8 is_en)
+{
+ u8 state = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED;
+
+ foreach_vlib_main ()
+ vlib_node_set_state (this_vlib_main, appsl_rx_mqs_input_node.index, state);
+}
+
static application_t *
application_alloc (void)
{
@@ -596,6 +852,20 @@ application_free (application_t * app)
pool_free (app->worker_maps);
/*
+ * Free rx mqs if allocated
+ */
+ if (app->rx_mqs)
+ {
+ int i;
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ app_rx_mqs_epoll_del (app, &app->rx_mqs[i]);
+
+ fifo_segment_cleanup (&app->rx_mqs_segment);
+ ssvm_delete (&app->rx_mqs_segment.ssvm);
+ vec_free (app->rx_mqs);
+ }
+
+ /*
* Cleanup remaining state
*/
if (application_is_builtin (app))
@@ -875,8 +1145,12 @@ vnet_application_attach (vnet_app_attach_args_t * a)
a->segment_handle = segment_manager_segment_handle (sm, fs);
segment_manager_segment_reader_unlock (sm);
+
+ if (!application_is_builtin (app) && application_use_private_rx_mqs ())
+ rv = app_rx_mqs_alloc (app);
+
vec_free (app_name);
- return 0;
+ return rv;
}
/**
@@ -1537,6 +1811,8 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
{
app_worker_map_t *map;
app_worker_t *wrk;
+ int i;
+
/* *INDENT-OFF* */
pool_foreach (map, app->worker_maps) {
wrk = app_worker_get (map->wrk_index);
@@ -1545,6 +1821,10 @@ appliction_format_app_mq (vlib_main_t * vm, application_t * app)
wrk->event_queue);
}
/* *INDENT-ON* */
+
+ for (i = 0; i < vec_len (app->rx_mqs); i++)
+ vlib_cli_output (vm, "[A%d][R%d]%U", app->app_index, i, format_svm_msg_q,
+ app->rx_mqs[i].mq);
}
static clib_error_t *
@@ -1731,10 +2011,18 @@ vnet_app_del_cert_key_pair (u32 index)
clib_error_t *
application_init (vlib_main_t * vm)
{
+ app_main_t *am = &app_main;
+ u32 n_workers;
+
+ n_workers = vlib_num_workers ();
+
/* Index 0 was originally used by legacy apis, maintain as invalid */
(void) app_cert_key_pair_alloc ();
- app_main.last_crypto_engine = CRYPTO_ENGINE_LAST;
- app_main.app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+ am->last_crypto_engine = CRYPTO_ENGINE_LAST;
+ am->app_by_name = hash_create_vec (0, sizeof (u8), sizeof (uword));
+
+ vec_validate (am->wrk, n_workers);
+
return 0;
}
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index b6f957ab871..0bfd4d1d813 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -92,6 +92,22 @@ typedef struct app_listener_
the app listener */
} app_listener_t;
+typedef enum app_rx_mq_flags_
+{
+ APP_RX_MQ_F_PENDING = 1 << 0,
+ APP_RX_MQ_F_POSTPONED = 1 << 1,
+} app_rx_mq_flags_t;
+
+typedef struct app_rx_mq_elt_
+{
+ struct app_rx_mq_elt_ *next;
+ struct app_rx_mq_elt_ *prev;
+ svm_msg_q_t *mq;
+ uword file_index;
+ u32 app_index;
+ u8 flags;
+} app_rx_mq_elt_t;
+
typedef struct application_
{
/** App index in app pool */
@@ -127,8 +143,38 @@ typedef struct application_
char quic_iv[17];
u8 quic_iv_set;
+ /** Segment where rx mqs were allocated */
+ fifo_segment_t rx_mqs_segment;
+
+ /**
+ * Fixed vector of rx mqs that can be a part of pending_rx_mqs
+ * linked list maintained by the app sublayer for each worker
+ */
+ app_rx_mq_elt_t *rx_mqs;
} application_t;
+typedef struct app_rx_mq_handle_
+{
+ union
+ {
+ struct
+ {
+ u32 app_index;
+ u32 thread_index;
+ };
+ u64 as_u64;
+ };
+} __attribute__ ((aligned (sizeof (u64)))) app_rx_mq_handle_t;
+
+/**
+ * App sublayer per vpp worker state
+ */
+typedef struct asl_wrk_
+{
+ /** Linked list of mqs with pending messages */
+ app_rx_mq_elt_t *pending_rx_mqs;
+} appsl_wrk_t;
+
typedef struct app_main_
{
/**
@@ -155,6 +201,11 @@ typedef struct app_main_
* Last registered crypto engine type
*/
crypto_engine_type_t last_crypto_engine;
+
+ /**
+ * App sublayer per-worker state
+ */
+ appsl_wrk_t *wrk;
} app_main_t;
typedef struct app_init_args_
@@ -239,6 +290,11 @@ segment_manager_props_t *application_get_segment_manager_properties (u32
segment_manager_props_t
* application_segment_manager_properties (application_t * app);
+svm_msg_q_t *application_rx_mq_get (application_t *app, u32 mq_index);
+u8 application_use_private_rx_mqs (void);
+fifo_segment_t *application_get_rx_mqs_segment (application_t *app);
+void application_enable_rx_mqs_nodes (u8 is_en);
+
/*
* App worker
*/
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 2400a19a351..c447557f8b0 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1776,6 +1776,9 @@ session_node_enable_disable (u8 is_en)
}
vlib_node_set_state (this_vlib_main, session_queue_node.index, state);
}
+
+ if (session_main.use_private_rx_mqs)
+ application_enable_rx_mqs_nodes (is_en);
}
clib_error_t *
@@ -1808,6 +1811,7 @@ session_main_init (vlib_main_t * vm)
smm->is_enabled = 0;
smm->session_enable_asap = 0;
smm->poll_main = 0;
+ smm->use_private_rx_mqs = 0;
smm->session_baseva = HIGH_SEGMENT_BASEVA;
#if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1927,6 +1931,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input)
appns_sapi_enable ();
else if (unformat (input, "poll-main"))
smm->poll_main = 1;
+ else if (unformat (input, "use-private-rx-mqs"))
+ smm->use_private_rx_mqs = 1;
else
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e8afcd03d69..aba8a1c3fd1 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -174,6 +174,9 @@ typedef struct session_main_
/** Poll session node in main thread */
u8 poll_main;
+ /** Allocate private rx mqs for external apps */
+ u8 use_private_rx_mqs;
+
/** vpp fifo event queue configured length */
u32 configured_event_queue_length;
@@ -296,6 +299,8 @@ session_evt_alloc_old (session_worker_t * wrk)
return elt;
}
+int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
+
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
void session_free_w_fifos (session_t * s);
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 3ca3c2a2692..5910cd366fb 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -143,7 +143,7 @@ mq_send_session_accepted_cb (session_t * s)
m.segment_handle = session_segment_handle (s);
m.flags = s->flags;
- eq_seg = session_main_get_evt_q_segment ();
+ eq_seg = application_get_rx_mqs_segment (app);
if (session_has_transport (s))
{
@@ -271,6 +271,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
+ application_t *app;
+
+ app_wrk = app_worker_get (app_wrk_index);
m.context = api_context;
m.retval = err;
@@ -278,7 +281,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
if (err)
goto snd_msg;
- eq_seg = session_main_get_evt_q_segment ();
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
if (session_has_transport (s))
{
@@ -322,7 +326,6 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_wrk = app_worker_get (app_wrk_index);
app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -348,9 +351,12 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
+ application_t *app;
app_listener_t *al;
session_t *ls = 0;
+ app_wrk = app_worker_get (app_wrk_index);
+
m.context = api_context;
m.retval = rv;
@@ -368,8 +374,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
m.lcl_port = tep.port;
m.lcl_is_ip4 = tep.is_ip4;
clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
-
- eq_seg = session_main_get_evt_q_segment ();
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
@@ -382,7 +388,6 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_wrk = app_worker_get (app_wrk_index);
app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
@@ -429,10 +434,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
app_worker_t *app_wrk;
session_event_t *evt;
svm_msg_q_t *app_mq;
+ application_t *app;
u32 thread_index;
thread_index = session_thread_from_handle (new_sh);
- eq_seg = session_main_get_evt_q_segment ();
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_mq = app_wrk->event_queue;
+ app = application_get (app_wrk->app_index);
+ eq_seg = application_get_rx_mqs_segment (app);
m.handle = session_handle (s);
m.new_handle = new_sh;
@@ -440,8 +449,6 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
m.segment_handle = SESSION_INVALID_HANDLE;
- app_wrk = app_worker_get (s->app_wrk_index);
- app_mq = app_wrk->event_queue;
if (mq_try_lock_and_alloc_msg (app_mq, msg))
return;
@@ -604,17 +611,20 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
static void
vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
{
- int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
- vl_api_app_attach_reply_t *rmp;
- fifo_segment_t *segp, *evt_q_segment = 0;
+ int rv = 0, *fds = 0, n_fds = 0, n_workers, i;
+ fifo_segment_t *segp, *rx_mqs_seg = 0;
vnet_app_attach_args_t _a, *a = &_a;
+ vl_api_app_attach_reply_t *rmp;
u8 fd_flags = 0, ctrl_thread;
vl_api_registration_t *reg;
+ svm_msg_q_t *rx_mq;
+ application_t *app;
reg = vl_api_client_index_to_registration (mp->client_index);
if (!reg)
return;
+ n_workers = vlib_num_workers ();
if (!session_main_is_enabled () || appns_sapi_enabled ())
{
rv = VNET_API_ERROR_FEATURE_DISABLED;
@@ -645,13 +655,16 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
}
vec_free (a->namespace_id);
- /* Send event queues segment */
- if ((evt_q_segment = session_main_get_evt_q_segment ()))
- {
- fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
- fds[n_fds] = evt_q_segment->ssvm.fd;
- n_fds += 1;
- }
+ vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
+ /* Send rx mqs segment */
+ app = application_get (a->app_index);
+ rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = rx_mqs_seg->ssvm.fd;
+ n_fds += 1;
+
/* Send fifo segment fd if needed */
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
{
@@ -666,17 +679,27 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
n_fds += 1;
}
+ if (application_use_private_rx_mqs ())
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+ for (i = 0; i < n_workers + 1; i++)
+ {
+ rx_mq = application_rx_mq_get (app, i);
+ fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+ n_fds += 1;
+ }
+ }
+
done:
/* *INDENT-OFF* */
REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
if (!rv)
{
- ctrl_thread = vlib_num_workers () ? 1 : 0;
+ ctrl_thread = n_workers ? 1 : 0;
segp = (fifo_segment_t *) a->segment;
rmp->app_index = clib_host_to_net_u32 (a->app_index);
rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
- rmp->vpp_ctrl_mq =
- fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+ rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
@@ -692,6 +715,7 @@ done:
if (n_fds)
session_send_fds (reg, fds, n_fds);
+ vec_free (fds);
}
static void
@@ -1268,15 +1292,16 @@ static void
session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
app_sapi_attach_msg_t * mp)
{
- int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+ int rv = 0, *fds = 0, n_fds = 0, i, n_workers;
vnet_app_attach_args_t _a, *a = &_a;
app_sapi_attach_reply_msg_t *rmp;
- fifo_segment_t *evt_q_segment;
u8 fd_flags = 0, ctrl_thread;
app_ns_api_handle_t *handle;
+ fifo_segment_t *rx_mqs_seg;
app_sapi_msg_t msg = { 0 };
app_worker_t *app_wrk;
application_t *app;
+ svm_msg_q_t *rx_mq;
/* Make sure name is null terminated */
mp->name[63] = 0;
@@ -1295,13 +1320,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
goto done;
}
+ n_workers = vlib_num_workers ();
+ vec_validate (fds, 3 /* segs + tx evtfd */ + n_workers);
+
/* Send event queues segment */
- if ((evt_q_segment = session_main_get_evt_q_segment ()))
- {
- fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
- fds[n_fds] = evt_q_segment->ssvm.fd;
- n_fds += 1;
- }
+ app = application_get (a->app_index);
+ rx_mqs_seg = application_get_rx_mqs_segment (app);
+
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = rx_mqs_seg->ssvm.fd;
+ n_fds += 1;
+
/* Send fifo segment fd if needed */
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
{
@@ -1316,6 +1345,17 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
n_fds += 1;
}
+ if (application_use_private_rx_mqs ())
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_EVENTFD;
+ for (i = 0; i < n_workers + 1; i++)
+ {
+ rx_mq = application_rx_mq_get (app, i);
+ fds[n_fds] = svm_msg_q_get_eventfd (rx_mq);
+ n_fds += 1;
+ }
+ }
+
done:
msg.type = APP_SAPI_MSG_TYPE_ATTACH_REPLY;
@@ -1323,12 +1363,11 @@ done:
rmp->retval = rv;
if (!rv)
{
- ctrl_thread = vlib_num_workers ()? 1 : 0;
+ ctrl_thread = n_workers ? 1 : 0;
rmp->app_index = a->app_index;
rmp->app_mq =
fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
- rmp->vpp_ctrl_mq =
- fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
+ rmp->vpp_ctrl_mq = fifo_segment_msg_q_offset (rx_mqs_seg, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
@@ -1339,13 +1378,13 @@ done:
/* Update app index for socket */
handle = (app_ns_api_handle_t *) & cs->private_data;
- app = application_get (a->app_index);
app_wrk = application_get_worker (app, 0);
handle->aah_app_wrk_index = app_wrk->wrk_index;
}
clib_socket_sendmsg (cs, &msg, sizeof (msg), fds, n_fds);
vec_free (a->name);
+ vec_free (fds);
}
static void
diff --git a/src/vnet/session/session_debug.h b/src/vnet/session/session_debug.h
index a42d90d471c..9e49a35dbe6 100644
--- a/src/vnet/session/session_debug.h
+++ b/src/vnet/session/session_debug.h
@@ -237,14 +237,14 @@ extern session_dbg_main_t session_dbg_main;
#if SESSION_CLOCKS_EVT_DBG
-#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...) \
- session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
- sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff; \
-
-#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _dq, _args...) \
- session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
- sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _dq * _cnt; \
- sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff; \
+#define SESSION_EVT_DSP_CNTRS_UPDATE_TIME_HANDLER(_wrk, _diff, _args...) \
+ session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
+ sde->counters[SESS_Q_CLK_UPDATE_TIME].f64 += _diff;
+
+#define SESSION_EVT_DSP_CNTRS_MQ_DEQ_HANDLER(_wrk, _diff, _cnt, _args...) \
+ session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
+ sde->counters[SESS_Q_CNT_MQ_EVTS].u64 += _cnt; \
+ sde->counters[SESS_Q_CLK_MQ_DEQ].f64 += _diff;
#define SESSION_EVT_DSP_CNTRS_CTRL_EVTS_HANDLER(_wrk, _diff, _args...) \
session_dbg_evts_t *sde = &session_dbg_main.wrk[_wrk->vm->thread_index]; \
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index ccf93cbbf61..f3713d00cd1 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -1399,19 +1399,35 @@ session_flush_pending_tx_buffers (session_worker_t * wrk,
vec_reset_length (wrk->pending_tx_nexts);
}
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ u32 i, n_to_dequeue = 0;
+ session_event_t *evt;
+
+ n_to_dequeue = svm_msg_q_size (mq);
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ svm_msg_q_sub_raw (mq, msg);
+ evt = svm_msg_q_msg_data (mq, msg);
+ session_evt_add_to_list (wrk, evt);
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ return n_to_dequeue;
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
+ u32 thread_index = vm->thread_index, __clib_unused n_evts;
+ session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
session_main_t *smm = vnet_get_session_main ();
- u32 thread_index = vm->thread_index, n_to_dequeue;
session_worker_t *wrk = &smm->wrk[thread_index];
- session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
clib_llist_index_t ei, next_ei, old_ti;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- int i = 0, n_tx_packets;
- session_event_t *evt;
- svm_msg_q_t *mq;
+ int n_tx_packets;
SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
@@ -1426,25 +1442,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
/*
- * Dequeue and handle new events
+ * Dequeue new internal mq events
*/
- /* Try to dequeue what is available. Don't wait for lock.
- * XXX: we may need priorities here */
- mq = wrk->vpp_event_queue;
- n_to_dequeue = svm_msg_q_size (mq);
- if (n_to_dequeue)
- {
- for (i = 0; i < n_to_dequeue; i++)
- {
- svm_msg_q_sub_raw (mq, msg);
- evt = svm_msg_q_msg_data (mq, msg);
- session_evt_add_to_list (wrk, evt);
- svm_msg_q_free_msg (mq, msg);
- }
- }
-
- SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+ n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+ SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
/*
* Handle control events
@@ -1452,12 +1454,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
- /* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
session_event_dispatch_ctrl (wrk, elt);
}));
- /* *INDENT-ON* */
SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 7a9f687e4ab..32cb9530bd9 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -449,37 +449,38 @@ typedef struct session_dgram_header_
STATIC_ASSERT (sizeof (session_dgram_hdr_t) == (SESSION_CONN_ID_LEN + 8),
"session conn id wrong length");
-#define foreach_session_error \
- _(NONE, "no error") \
- _(UNKNOWN, "generic/unknown error") \
- _(REFUSED, "refused") \
- _(TIMEDOUT, "timedout") \
- _(ALLOC, "obj/memory allocation error") \
- _(OWNER, "object not owned by application") \
- _(NOROUTE, "no route") \
- _(NOINTF, "no resolving interface") \
- _(NOIP, "no ip for lcl interface") \
- _(NOPORT, "no lcl port") \
- _(NOSUPPORT, "not supported") \
- _(NOLISTEN, "not listening") \
- _(NOSESSION, "session does not exist") \
- _(NOAPP, "app not attached") \
- _(PORTINUSE, "lcl port in use") \
- _(IPINUSE, "ip in use") \
- _(ALREADY_LISTENING, "ip port pair already listened on") \
- _(INVALID_RMT_IP, "invalid remote ip") \
- _(INVALID_APPWRK, "invalid app worker") \
- _(INVALID_NS, "invalid namespace") \
- _(SEG_NO_SPACE, "Couldn't allocate a fifo pair") \
- _(SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair") \
- _(SEG_CREATE, "Couldn't create a new segment") \
- _(FILTERED, "session filtered") \
- _(SCOPE, "scope not supported") \
- _(BAPI_NO_FD, "bapi doesn't have a socket fd") \
- _(BAPI_SEND_FD, "couldn't send fd over bapi socket fd") \
- _(BAPI_NO_REG, "app bapi registration not found") \
- _(MQ_MSG_ALLOC, "failed to alloc mq msg") \
- _(TLS_HANDSHAKE, "failed tls handshake") \
+#define foreach_session_error \
+ _ (NONE, "no error") \
+ _ (UNKNOWN, "generic/unknown error") \
+ _ (REFUSED, "refused") \
+ _ (TIMEDOUT, "timedout") \
+ _ (ALLOC, "obj/memory allocation error") \
+ _ (OWNER, "object not owned by application") \
+ _ (NOROUTE, "no route") \
+ _ (NOINTF, "no resolving interface") \
+ _ (NOIP, "no ip for lcl interface") \
+ _ (NOPORT, "no lcl port") \
+ _ (NOSUPPORT, "not supported") \
+ _ (NOLISTEN, "not listening") \
+ _ (NOSESSION, "session does not exist") \
+ _ (NOAPP, "app not attached") \
+ _ (PORTINUSE, "lcl port in use") \
+ _ (IPINUSE, "ip in use") \
+ _ (ALREADY_LISTENING, "ip port pair already listened on") \
+ _ (INVALID_RMT_IP, "invalid remote ip") \
+ _ (INVALID_APPWRK, "invalid app worker") \
+ _ (INVALID_NS, "invalid namespace") \
+ _ (SEG_NO_SPACE, "Couldn't allocate a fifo pair") \
+ _ (SEG_NO_SPACE2, "Created segment, couldn't allocate a fifo pair") \
+ _ (SEG_CREATE, "Couldn't create a new segment") \
+ _ (FILTERED, "session filtered") \
+ _ (SCOPE, "scope not supported") \
+ _ (BAPI_NO_FD, "bapi doesn't have a socket fd") \
+ _ (BAPI_SEND_FD, "couldn't send fd over bapi socket fd") \
+ _ (BAPI_NO_REG, "app bapi registration not found") \
+ _ (MQ_MSG_ALLOC, "failed to alloc mq msg") \
+ _ (TLS_HANDSHAKE, "failed tls handshake") \
+ _ (EVENTFD_ALLOC, "failed to alloc eventfd")
typedef enum session_error_p_
{