aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2021-11-22 21:19:01 -0800
committerDave Barach <openvpp@barachs.net>2021-12-16 20:53:53 +0000
commit20c242316cbbfa353d4f338ebee7ac715956b6aa (patch)
tree4eb3688d5a66cd9a675404915e55b7d56d765a48 /src/vnet/session
parent3c9dee27ca33ab88080a03db75d83f09fcc5c589 (diff)
session: app mq congestion detection
Detect mq congestion and handle it by queueing messages in a fifo and postponing handling via rpcs. App workers with congested mqs cannot accept nor connect additional sessions. Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I401d971a1a53896758b88fc60f158cbc31e0c7cb
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/application.h23
-rw-r--r--src/vnet/session/application_worker.c325
-rw-r--r--src/vnet/session/session_api.c267
3 files changed, 349 insertions, 266 deletions
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index b3201b9833e..54a616a99b1 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -29,6 +29,16 @@
#define APP_DBG(_fmt, _args...)
#endif
+typedef struct app_wrk_postponed_msg_
+{
+ u32 len;
+ u8 event_type;
+ u8 ring;
+ u8 is_sapi;
+ int fd;
+ u8 data[SESSION_CTRL_MSG_MAX_SIZE];
+} app_wrk_postponed_msg_t;
+
typedef struct app_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -59,6 +69,9 @@ typedef struct app_worker_
/** API index for the worker. Needed for multi-process apps */
u32 api_client_index;
+ /** Set if mq is congested */
+ u8 mq_congested;
+
u8 app_is_builtin;
/** Pool of half-open session handles. Tracked in case worker detaches */
@@ -69,6 +82,12 @@ typedef struct app_worker_
/** Vector of detached listener segment managers */
u32 *detached_seg_managers;
+
+ /** Fifo of messages postponed because of mq congestion */
+ app_wrk_postponed_msg_t *postponed_mq_msgs;
+
+ /** Lock to add/sub message from ref @postponed_mq_msgs */
+ clib_spinlock_t postponed_mq_msgs_lock;
} app_worker_t;
typedef struct app_worker_map_
@@ -339,6 +358,10 @@ int app_worker_del_segment_notify (app_worker_t * app_wrk,
u32 app_worker_n_listeners (app_worker_t * app);
session_t *app_worker_first_listener (app_worker_t * app,
u8 fib_proto, u8 transport_proto);
+void app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
+ u32 msg_len, int fd);
+void app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
+ u32 msg_len);
int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt);
int app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
u8 evt_type);
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index 54377604803..6bbad045d0d 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -33,6 +33,7 @@ app_worker_alloc (application_t * app)
app_wrk->wrk_map_index = ~0;
app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
+ clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock);
APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
return app_wrk;
}
@@ -126,6 +127,7 @@ app_worker_free (app_worker_t * app_wrk)
}
vec_free (app_wrk->detached_seg_managers);
clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
+ clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock);
if (CLIB_DEBUG)
clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
@@ -338,8 +340,10 @@ app_worker_init_accepted (session_t * s)
listener = listen_session_get_from_handle (s->listener_handle);
app_wrk = application_listener_select_worker (listener);
- s->app_wrk_index = app_wrk->wrk_index;
+ if (PREDICT_FALSE (app_wrk->mq_congested))
+ return -1;
+ s->app_wrk_index = app_wrk->wrk_index;
app = application_get (app_wrk->app_index);
if (app->cb_fns.fifo_tuning_callback)
s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
@@ -510,6 +514,9 @@ int
app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
session_handle_t *rsh)
{
+ if (PREDICT_FALSE (app_wrk->mq_congested))
+ return SESSION_E_REFUSED;
+
sep->app_wrk_index = app_wrk->wrk_index;
return session_open (sep, rsh);
@@ -614,12 +621,240 @@ app_worker_application_is_builtin (app_worker_t * app_wrk)
return app_wrk->app_is_builtin;
}
+static int
+app_wrk_send_fd (app_worker_t *app_wrk, int fd)
+{
+ if (!appns_sapi_enabled ())
+ {
+ vl_api_registration_t *reg;
+ clib_error_t *error;
+
+ reg =
+ vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
+ if (!reg)
+ {
+ clib_warning ("no api registration for client: %u",
+ app_wrk->api_client_index);
+ return -1;
+ }
+
+ if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
+ return -1;
+
+ error = vl_api_send_fd_msg (reg, &fd, 1);
+ if (error)
+ {
+ clib_error_report (error);
+ return -1;
+ }
+
+ return 0;
+ }
+
+ app_sapi_msg_t smsg = { 0 };
+ app_namespace_t *app_ns;
+ clib_error_t *error;
+ application_t *app;
+ clib_socket_t *cs;
+ u32 cs_index;
+
+ app = application_get (app_wrk->app_index);
+ app_ns = app_namespace_get (app->ns_index);
+ cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
+ cs = appns_sapi_get_socket (app_ns, cs_index);
+ if (PREDICT_FALSE (!cs))
+ return -1;
+
+ /* There's no payload for the message only the type */
+ smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
+ error = clib_socket_sendmsg (cs, &smsg, sizeof (smsg), &fd, 1);
+ if (error)
+ {
+ clib_error_report (error);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int
+mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
+ svm_msg_q_msg_t *msg)
+{
+ int rv, n_try = 0;
+
+ while (n_try < 5)
+ {
+ rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
+ if (!rv)
+ return 0;
+ /*
+ * Break the loop if mq is full, usually this is because the
+ * app has crashed or is hanging on somewhere.
+ */
+ if (rv != -1)
+ break;
+ n_try += 1;
+ usleep (1);
+ }
+
+ return -1;
+}
+
+typedef union app_wrk_mq_rpc_args_
+{
+ struct
+ {
+ u32 thread_index;
+ u32 app_wrk_index;
+ };
+ uword as_uword;
+} app_wrk_mq_rpc_ags_t;
+
+static int
+app_wrk_handle_mq_postponed_msgs (void *arg)
+{
+ svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
+ app_wrk_postponed_msg_t *pm;
+ app_wrk_mq_rpc_ags_t args;
+ u32 max_msg, n_msg = 0;
+ app_worker_t *app_wrk;
+ session_event_t *evt;
+ svm_msg_q_t *mq;
+
+ args.as_uword = pointer_to_uword (arg);
+ app_wrk = app_worker_get_if_valid (args.app_wrk_index);
+ if (!app_wrk)
+ return 0;
+
+ mq = app_wrk->event_queue;
+
+ clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
+
+ max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs));
+
+ while (n_msg < max_msg)
+ {
+ pm = clib_fifo_head (app_wrk->postponed_mq_msgs);
+ if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg))
+ break;
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = pm->event_type;
+ clib_memcpy_fast (evt->data, pm->data, pm->len);
+
+ if (pm->fd != -1)
+ app_wrk_send_fd (app_wrk, pm->fd);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
+
+ clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1);
+ n_msg += 1;
+ }
+
+ if (!clib_fifo_elts (app_wrk->postponed_mq_msgs))
+ {
+ app_wrk->mq_congested = 0;
+ }
+ else
+ {
+ session_send_rpc_evt_to_thread_force (
+ args.thread_index, app_wrk_handle_mq_postponed_msgs,
+ uword_to_pointer (args.as_uword, void *));
+ }
+
+ clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
+
+ return 0;
+}
+
+static void
+app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring,
+ u8 evt_type, void *msg, u32 msg_len, int fd)
+{
+ app_wrk_postponed_msg_t *pm;
+
+ clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
+
+ app_wrk->mq_congested = 1;
+
+ clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm);
+ clib_memcpy_fast (pm->data, msg, msg_len);
+ pm->event_type = evt_type;
+ pm->ring = ring;
+ pm->len = msg_len;
+ pm->fd = fd;
+
+ if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1)
+ {
+ app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (),
+ .app_wrk_index = app_wrk->wrk_index };
+
+ session_send_rpc_evt_to_thread_force (
+ args.thread_index, app_wrk_handle_mq_postponed_msgs,
+ uword_to_pointer (args.as_uword, void *));
+ }
+
+ clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
+}
+
+always_inline void
+app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
+ u32 msg_len, int fd)
+{
+ svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
+ svm_msg_q_t *mq = app_wrk->event_queue;
+ session_event_t *evt;
+ int rv;
+
+ if (PREDICT_FALSE (app_wrk->mq_congested))
+ goto handle_congestion;
+
+ rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg);
+ if (PREDICT_FALSE (rv))
+ goto handle_congestion;
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = evt_type;
+ clib_memcpy_fast (evt->data, msg, msg_len);
+
+ if (fd != -1)
+ app_wrk_send_fd (app_wrk, fd);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
+
+ return;
+
+handle_congestion:
+
+ app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type,
+ msg, msg_len, fd);
+}
+
+void
+app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
+ u32 msg_len, int fd)
+{
+ app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, fd);
+}
+
+void
+app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
+ u32 msg_len)
+{
+ app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
+}
+
static inline int
app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
{
+ svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
session_event_t *evt;
- svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
+ u32 app_session;
+ int rv;
if (app_worker_application_is_builtin (app_wrk))
return app_worker_builtin_rx (app_wrk, s);
@@ -627,68 +862,72 @@ app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
if (svm_fifo_has_event (s->rx_fifo))
return 0;
+ app_session = s->rx_fifo->shr->client_session_index;
mq = app_wrk->event_queue;
- svm_msg_q_lock (mq);
- if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
- {
- clib_warning ("evt q full");
- svm_msg_q_unlock (mq);
- return -1;
- }
+ if (PREDICT_FALSE (app_wrk->mq_congested))
+ goto handle_congestion;
- if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
- {
- clib_warning ("evt q rings full");
- svm_msg_q_unlock (mq);
- return -1;
- }
+ rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
- msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
- evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
- evt->session_index = s->rx_fifo->shr->client_session_index;
+ if (PREDICT_FALSE (rv))
+ goto handle_congestion;
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
evt->event_type = SESSION_IO_EVT_RX;
+ evt->session_index = app_session;
(void) svm_fifo_set_event (s->rx_fifo);
- svm_msg_q_add_and_unlock (mq, &msg);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
return 0;
+
+handle_congestion:
+
+ app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
+ SESSION_IO_EVT_RX, &app_session,
+ sizeof (app_session), -1);
+ return -1;
}
static inline int
app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
{
- svm_msg_q_t *mq;
+ svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
session_event_t *evt;
- svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+ u32 app_session;
+ int rv;
if (app_worker_application_is_builtin (app_wrk))
return app_worker_builtin_tx (app_wrk, s);
+ app_session = s->tx_fifo->shr->client_session_index;
mq = app_wrk->event_queue;
- svm_msg_q_lock (mq);
- if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
- {
- clib_warning ("evt q full");
- svm_msg_q_unlock (mq);
- return -1;
- }
+ if (PREDICT_FALSE (app_wrk->mq_congested))
+ goto handle_congestion;
- if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
- {
- clib_warning ("evt q rings full");
- svm_msg_q_unlock (mq);
- return -1;
- }
+ rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
+
+ if (PREDICT_FALSE (rv))
+ goto handle_congestion;
- msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
- evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt = svm_msg_q_msg_data (mq, mq_msg);
evt->event_type = SESSION_IO_EVT_TX;
- evt->session_index = s->tx_fifo->shr->client_session_index;
+ evt->session_index = app_session;
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
- svm_msg_q_add_and_unlock (mq, &msg);
return 0;
+
+handle_congestion:
+
+ app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
+ SESSION_IO_EVT_TX, &app_session,
+ sizeof (app_session), -1);
+ return -1;
}
/* *INDENT-OFF* */
@@ -764,10 +1003,12 @@ format_app_worker (u8 * s, va_list * args)
app_worker_t *app_wrk = va_arg (*args, app_worker_t *);
u32 indent = 1;
- s = format (s, "%U wrk-index %u app-index %u map-index %u "
- "api-client-index %d\n", format_white_space, indent,
- app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index,
- app_wrk->api_client_index);
+ s = format (s,
+ "%U wrk-index %u app-index %u map-index %u "
+ "api-client-index %d mq-cong %u\n",
+ format_white_space, indent, app_wrk->wrk_index,
+ app_wrk->app_index, app_wrk->wrk_map_index,
+ app_wrk->api_client_index, app_wrk->mq_congested);
return s;
}
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 767a24aa170..f6170debba7 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -82,40 +82,12 @@ session_send_fds (vl_api_registration_t * reg, int fds[], int n_fds)
}
static int
-mq_try_lock_and_alloc_msg (svm_msg_q_t * app_mq, svm_msg_q_msg_t * msg)
-{
- int rv;
- u8 try = 0;
- while (try < 100)
- {
- rv = svm_msg_q_lock_and_alloc_msg_w_ring (app_mq,
- SESSION_MQ_CTRL_EVT_RING,
- SVM_Q_NOWAIT, msg);
- if (!rv)
- return 0;
- /*
- * Break the loop if mq is full, usually this is because the
- * app has crashed or is hanging on somewhere.
- */
- if (rv != -1)
- break;
- try++;
- usleep (1);
- }
- clib_warning ("failed to alloc msg");
- return -1;
-}
-
-static int
mq_send_session_accepted_cb (session_t * s)
{
app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
- svm_msg_q_msg_t _msg, *msg = &_msg;
session_accepted_msg_t m = { 0 };
- svm_msg_q_t *app_mq;
fifo_segment_t *eq_seg;
session_t *listener;
- session_event_t *evt;
application_t *app;
app = application_get (app_wrk->app_index);
@@ -164,15 +136,7 @@ mq_send_session_accepted_cb (session_t * s)
m.mq_index = s->thread_index;
}
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return SESSION_E_MQ_MSG_ALLOC;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_ACCEPTED;
- clib_memcpy_fast (evt->data, &m, sizeof (m));
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_ACCEPTED, &m, sizeof (m));
return 0;
}
@@ -181,21 +145,12 @@ static inline void
mq_send_session_close_evt (app_worker_t * app_wrk, session_handle_t sh,
session_evt_type_t evt_type)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_disconnected_msg_t *mp;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
+ session_disconnected_msg_t m = { 0 };
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = evt_type;
- mp = (session_disconnected_msg_t *) evt->data;
- mp->handle = sh;
- mp->context = app_wrk->api_client_index;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ m.handle = sh;
+ m.context = app_wrk->api_client_index;
+
+ app_wrk_send_ctrl_evt (app_wrk, evt_type, &m, sizeof (m));
}
static inline void
@@ -249,13 +204,10 @@ int
mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
session_t * s, session_error_t err)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
session_connected_msg_t m = { 0 };
- svm_msg_q_t *app_mq;
transport_connection_t *tc;
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
- session_event_t *evt;
application_t *app;
app_wrk = app_worker_get (app_wrk_index);
@@ -318,17 +270,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_mq = app_wrk->event_queue;
-
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return SESSION_E_MQ_MSG_ALLOC;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_CONNECTED;
- clib_memcpy_fast (evt->data, &m, sizeof (m));
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CONNECTED, &m, sizeof (m));
- svm_msg_q_add_and_unlock (app_mq, msg);
return 0;
}
@@ -336,13 +279,10 @@ int
mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
session_handle_t handle, int rv)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
session_bound_msg_t m = { 0 };
- svm_msg_q_t *app_mq;
transport_endpoint_t tep;
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
- session_event_t *evt;
application_t *app;
app_listener_t *al;
session_t *ls = 0;
@@ -381,17 +321,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
snd_msg:
- app_mq = app_wrk->event_queue;
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_BOUND, &m, sizeof (m));
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return SESSION_E_MQ_MSG_ALLOC;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_BOUND;
- clib_memcpy_fast (evt->data, &m, sizeof (m));
-
- svm_msg_q_add_and_unlock (app_mq, msg);
return 0;
}
@@ -399,40 +330,26 @@ void
mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
u32 context, int rv)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_unlisten_reply_msg_t *ump;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
-
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
+ session_unlisten_reply_msg_t m = { 0 };
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
- ump = (session_unlisten_reply_msg_t *) evt->data;
- ump->context = context;
- ump->handle = sh;
- ump->retval = rv;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ m.context = context;
+ m.handle = sh;
+ m.retval = rv;
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_UNLISTEN_REPLY, &m,
+ sizeof (m));
}
static void
mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
session_migrated_msg_t m = { 0 };
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
- session_event_t *evt;
- svm_msg_q_t *app_mq;
application_t *app;
u32 thread_index;
thread_index = session_thread_from_handle (new_sh);
app_wrk = app_worker_get (s->app_wrk_index);
- app_mq = app_wrk->event_queue;
app = application_get (app_wrk->app_index);
eq_seg = application_get_rx_mqs_segment (app);
@@ -442,27 +359,15 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
m.segment_handle = SESSION_INVALID_HANDLE;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_MIGRATED;
- clib_memcpy_fast (evt->data, &m, sizeof (m));
-
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_MIGRATED, &m, sizeof (m));
}
static int
mq_send_add_segment_cb (u32 app_wrk_index, u64 segment_handle)
{
- int fds[SESSION_N_FD_TYPE], n_fds = 0;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_app_add_segment_msg_t *mp;
+ session_app_add_segment_msg_t m = { 0 };
vl_api_registration_t *reg;
app_worker_t *app_wrk;
- session_event_t *evt;
- svm_msg_q_t *app_mq;
fifo_segment_t *fs;
ssvm_private_t *sp;
u8 fd_flags = 0;
@@ -488,29 +393,16 @@ mq_send_add_segment_cb (u32 app_wrk_index, u64 segment_handle)
}
fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
- fds[n_fds] = sp->fd;
- n_fds += 1;
}
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return -1;
-
- if (n_fds)
- session_send_fds (reg, fds, n_fds);
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT;
- mp = (session_app_add_segment_msg_t *) evt->data;
- clib_memset (mp, 0, sizeof (*mp));
- mp->segment_size = sp->ssvm_size;
- mp->fd_flags = fd_flags;
- mp->segment_handle = segment_handle;
- strncpy ((char *) mp->segment_name, (char *) sp->name,
- sizeof (mp->segment_name) - 1);
+ m.segment_size = sp->ssvm_size;
+ m.fd_flags = fd_flags;
+ m.segment_handle = segment_handle;
+ strncpy ((char *) m.segment_name, (char *) sp->name,
+ sizeof (m.segment_name) - 1);
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt_fd (app_wrk, SESSION_CTRL_EVT_APP_ADD_SEGMENT, &m,
+ sizeof (m), sp->fd);
return 0;
}
@@ -518,12 +410,9 @@ mq_send_add_segment_cb (u32 app_wrk_index, u64 segment_handle)
static int
mq_send_del_segment_cb (u32 app_wrk_index, u64 segment_handle)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_app_del_segment_msg_t *mp;
+ session_app_del_segment_msg_t m = { 0 };
vl_api_registration_t *reg;
app_worker_t *app_wrk;
- session_event_t *evt;
- svm_msg_q_t *app_mq;
app_wrk = app_worker_get (app_wrk_index);
reg = vl_mem_api_client_index_to_registration (app_wrk->api_client_index);
@@ -533,17 +422,10 @@ mq_send_del_segment_cb (u32 app_wrk_index, u64 segment_handle)
return -1;
}
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return -1;
+ m.segment_handle = segment_handle;
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT;
- mp = (session_app_del_segment_msg_t *) evt->data;
- clib_memset (mp, 0, sizeof (*mp));
- mp->segment_handle = segment_handle;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_APP_DEL_SEGMENT, &m,
+ sizeof (m));
return 0;
}
@@ -551,10 +433,7 @@ mq_send_del_segment_cb (u32 app_wrk_index, u64 segment_handle)
static void
mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_cleanup_msg_t *mp;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
+ session_cleanup_msg_t m = { 0 };
app_worker_t *app_wrk;
/* Propagate transport cleanup notifications only if app didn't close */
@@ -566,17 +445,10 @@ mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf)
if (!app_wrk)
return;
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
+ m.handle = session_handle (s);
+ m.type = ntf;
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_CLEANUP;
- mp = (session_cleanup_msg_t *) evt->data;
- mp->handle = session_handle (s);
- mp->type = ntf;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CLEANUP, &m, sizeof (m));
}
static session_cb_vft_t session_mq_cb_vft = {
@@ -1245,36 +1117,11 @@ VL_MSG_API_REAPER_FUNCTION (application_reaper_cb);
* Socket api functions
*/
-static void
-sapi_send_fds (app_worker_t * app_wrk, int *fds, int n_fds)
-{
- app_sapi_msg_t smsg = { 0 };
- app_namespace_t *app_ns;
- application_t *app;
- clib_socket_t *cs;
- u32 cs_index;
-
- app = application_get (app_wrk->app_index);
- app_ns = app_namespace_get (app->ns_index);
- cs_index = appns_sapi_handle_sock_index (app_wrk->api_client_index);
- cs = appns_sapi_get_socket (app_ns, cs_index);
- if (PREDICT_FALSE (!cs))
- return;
-
- /* There's no payload for the message only the type */
- smsg.type = APP_SAPI_MSG_TYPE_SEND_FDS;
- clib_socket_sendmsg (cs, &smsg, sizeof (smsg), fds, n_fds);
-}
-
static int
mq_send_add_segment_sapi_cb (u32 app_wrk_index, u64 segment_handle)
{
- int fds[SESSION_N_FD_TYPE], n_fds = 0;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_app_add_segment_msg_t *mp;
+ session_app_add_segment_msg_t m = { 0 };
app_worker_t *app_wrk;
- session_event_t *evt;
- svm_msg_q_t *app_mq;
fifo_segment_t *fs;
ssvm_private_t *sp;
u8 fd_flags = 0;
@@ -1286,33 +1133,15 @@ mq_send_add_segment_sapi_cb (u32 app_wrk_index, u64 segment_handle)
ASSERT (ssvm_type (sp) == SSVM_SEGMENT_MEMFD);
fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
- fds[n_fds] = sp->fd;
- n_fds += 1;
-
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return -1;
- /*
- * Send the fd over api socket
- */
- sapi_send_fds (app_wrk, fds, n_fds);
+ m.segment_size = sp->ssvm_size;
+ m.fd_flags = fd_flags;
+ m.segment_handle = segment_handle;
+ strncpy ((char *) m.segment_name, (char *) sp->name,
+ sizeof (m.segment_name) - 1);
- /*
- * Send the actual message over mq
- */
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT;
- mp = (session_app_add_segment_msg_t *) evt->data;
- clib_memset (mp, 0, sizeof (*mp));
- mp->segment_size = sp->ssvm_size;
- mp->fd_flags = fd_flags;
- mp->segment_handle = segment_handle;
- strncpy ((char *) mp->segment_name, (char *) sp->name,
- sizeof (mp->segment_name) - 1);
-
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt_fd (app_wrk, SESSION_CTRL_EVT_APP_ADD_SEGMENT, &m,
+ sizeof (m), sp->fd);
return 0;
}
@@ -1320,25 +1149,15 @@ mq_send_add_segment_sapi_cb (u32 app_wrk_index, u64 segment_handle)
static int
mq_send_del_segment_sapi_cb (u32 app_wrk_index, u64 segment_handle)
{
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_app_del_segment_msg_t *mp;
+ session_app_del_segment_msg_t m = { 0 };
app_worker_t *app_wrk;
- session_event_t *evt;
- svm_msg_q_t *app_mq;
app_wrk = app_worker_get (app_wrk_index);
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return -1;
+ m.segment_handle = segment_handle;
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT;
- mp = (session_app_del_segment_msg_t *) evt->data;
- clib_memset (mp, 0, sizeof (*mp));
- mp->segment_handle = segment_handle;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_APP_DEL_SEGMENT, &m,
+ sizeof (m));
return 0;
}