aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/application.c141
-rw-r--r--src/vnet/session/application.h7
-rw-r--r--src/vnet/session/application_interface.h83
-rw-r--r--src/vnet/session/segment_manager.c43
-rw-r--r--src/vnet/session/segment_manager.h9
-rw-r--r--src/vnet/session/session.c192
-rw-r--r--src/vnet/session/session.h47
-rwxr-xr-xsrc/vnet/session/session_api.c8
-rw-r--r--src/vnet/session/session_node.c73
9 files changed, 424 insertions, 179 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 60019dd7961..6041b49712d 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -807,6 +807,143 @@ application_get_segment_manager_properties (u32 app_index)
return &app->sm_properties;
}
+static inline int
+app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
+{
+ if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
+ {
+ if (lock)
+ {
+ svm_msg_q_add_w_lock (mq, msg);
+ svm_msg_q_unlock (mq);
+ }
+ else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+ {
+ clib_warning ("msg q add returned");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+ }
+ else
+ {
+ clib_warning ("evt q full");
+ svm_msg_q_free_msg (mq, msg);
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+ return 0;
+}
+
+static inline int
+app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
+{
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+
+ if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ {
+ /* Session is closed so app will never clean up. Flush rx fifo */
+ svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+ return 0;
+ }
+
+ /* Built-in app? Hand event to the callback... */
+ if (app->cb_fns.builtin_app_rx_callback)
+ return app->cb_fns.builtin_app_rx_callback (s);
+
+ /* If no need for event, return */
+ if (!svm_fifo_set_event (s->server_rx_fifo))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = s->server_rx_fifo;
+ evt->event_type = FIFO_EVENT_APP_RX;
+
+ return app_enqueue_evt (mq, &msg, lock);
+}
+
+static inline int
+app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
+{
+ svm_msg_q_t *mq;
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+
+ if (application_is_builtin (app))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = FIFO_EVENT_APP_TX;
+ evt->fifo = s->server_tx_fifo;
+
+ return app_enqueue_evt (mq, &msg, lock);
+}
+
+/* *INDENT-OFF* */
+typedef int (app_send_evt_handler_fn) (application_t *app,
+ stream_session_t *s,
+ u8 lock);
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+ app_send_io_evt_rx,
+ app_send_io_evt_tx,
+};
+/* *INDENT-ON* */
+
+/**
+ * Send event to application
+ *
+ * Logic from queue perspective is non-blocking. That is, if there's
+ * not enough space to enqueue a message, we return. However, if the lock
+ * flag is set, we do wait for queue mutex.
+ */
+int
+application_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
+ return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
+}
+
+int
+application_lock_and_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+}
+
local_session_t *
application_alloc_local_session (application_t * app)
{
@@ -949,14 +1086,14 @@ application_local_session_connect (u32 table_index, application_t * client,
svm_fifo_segment_private_t *seg;
segment_manager_t *sm;
local_session_t *ls;
- svm_queue_t *sq, *cq;
+ svm_msg_q_t *sq, *cq;
ls = application_alloc_local_session (server);
props = application_segment_manager_properties (server);
cprops = application_segment_manager_properties (client);
evt_q_elts = props->evt_q_size + cprops->evt_q_size;
- evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+ evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
has_transport = session_has_transport ((stream_session_t *) ll);
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index aad7089ba6f..f6c81275826 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -72,7 +72,7 @@ typedef struct _application
u32 ns_index;
/** Application listens for events on this svm queue */
- svm_queue_t *event_queue;
+ svm_msg_q_t *event_queue;
/*
* Callbacks: shoulder-taps for the server/client
@@ -207,6 +207,11 @@ int application_local_session_disconnect_w_index (u32 app_index,
u32 ls_index);
void application_local_sessions_del (application_t * app);
+int application_send_event (application_t * app, stream_session_t * s,
+ u8 evt);
+int application_lock_and_send_event (application_t * app,
+ stream_session_t * s, u8 evt_type);
+
always_inline u32
local_session_id (local_session_t * ll)
{
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 2c171487e70..50c043493f2 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -136,7 +136,7 @@ typedef enum
_(IS_BUILTIN, "Application is builtin") \
_(IS_PROXY, "Application is proxying") \
_(USE_GLOBAL_SCOPE, "App can use global session scope") \
- _(USE_LOCAL_SCOPE, "App can use local session scope")
+ _(USE_LOCAL_SCOPE, "App can use local session scope") \
typedef enum _app_options
{
@@ -187,7 +187,7 @@ typedef struct app_session_transport_
_(volatile u8, session_state) /**< session state */ \
_(u32, session_index) /**< index in owning pool */ \
_(app_session_transport_t, transport) /**< transport info */ \
- _(svm_queue_t, *vpp_evt_q) /**< vpp event queue */ \
+ _(svm_msg_q_t, *vpp_evt_q) /**< vpp event queue */ \
_(u8, is_dgram) /**< flag for dgram mode */ \
typedef struct
@@ -197,13 +197,73 @@ typedef struct
#undef _
} app_session_t;
+/**
+ * Send fifo io event to vpp worker thread
+ *
+ * Because there may be multiple writers to one of vpp's queues, this
+ * protects message allocation and enqueueing.
+ *
+ * @param mq vpp message queue
+ * @param f fifo for which the event is sent
+ * @param evt_type type of event
+ * @param noblock flag to indicate is request is blocking or not
+ * @return 0 if success, negative integer otherwise
+ */
+static inline int
+app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
+ u8 noblock)
+{
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+
+ if (noblock)
+ {
+ if (svm_msg_q_try_lock (mq))
+ return -1;
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = f;
+ evt->event_type = evt_type;
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ else
+ {
+ svm_msg_q_lock (mq);
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ while (svm_msg_q_msg_is_invalid (&msg))
+ {
+ svm_msg_q_wait (mq);
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = f;
+ evt->event_type = evt_type;
+ if (svm_msg_q_is_full (mq))
+ svm_msg_q_wait (mq);
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+}
+
always_inline int
app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
- svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
+ svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
{
u32 max_enqueue, actual_write;
session_dgram_hdr_t hdr;
- session_fifo_event_t evt;
int rv;
max_enqueue = svm_fifo_max_enqueue (f);
@@ -225,11 +285,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
{
if (svm_fifo_set_event (f))
- {
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
- }
+ app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
}
ASSERT (rv);
return rv;
@@ -243,20 +299,15 @@ app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
}
always_inline int
-app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data,
+app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
u32 len, u8 noblock)
{
- session_fifo_event_t evt;
int rv;
if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
{
if (svm_fifo_set_event (f))
- {
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
- }
+ app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
}
return rv;
}
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 31bb0c3aab5..b00bcd5bbc8 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -599,25 +599,52 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
segment_manager_segment_reader_unlock (sm);
}
+u32
+segment_manager_evt_q_expected_size (u32 q_len)
+{
+ u32 fifo_evt_size, notif_q_size, q_hdrs;
+ u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
+
+ fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t));
+ notif_q_size = clib_max (16, q_len >> 4);
+
+ msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
+ fifo_evt_ring_sz = q_len * fifo_evt_size;
+ session_ntf_ring_sz = notif_q_size * 256;
+ q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t);
+
+ return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs);
+}
+
/**
* Allocates shm queue in the first segment
*
* Must be called with lock held
*/
-svm_queue_t *
+svm_msg_q_t *
segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
u32 queue_size)
{
- ssvm_shared_header_t *sh;
- svm_queue_t *q;
+ u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_t *q;
void *oldheap;
- sh = segment->ssvm.sh;
+ fifo_evt_size = sizeof (session_fifo_event_t);
+ notif_q_size = clib_max (16, queue_size >> 4);
+ /* *INDENT-OFF* */
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {queue_size, fifo_evt_size, 0},
+ {notif_q_size, session_evt_size, 0}
+ };
+ /* *INDENT-ON* */
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = queue_size;
+ cfg->ring_cfgs = rc;
- oldheap = ssvm_push_heap (sh);
- q = svm_queue_init (queue_size, sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0 /* signal when queue non-empty */ );
+ oldheap = ssvm_push_heap (segment->ssvm.sh);
+ q = svm_msg_q_alloc (cfg);
ssvm_pop_heap (oldheap);
return q;
}
diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h
index 62e5e97e703..73cb4827a8b 100644
--- a/src/vnet/session/segment_manager.h
+++ b/src/vnet/session/segment_manager.h
@@ -17,7 +17,7 @@
#include <vnet/vnet.h>
#include <svm/svm_fifo_segment.h>
-#include <svm/queue.h>
+#include <svm/message_queue.h>
#include <vlibmemory/api.h>
#include <vppinfra/lock.h>
#include <vppinfra/valloc.h>
@@ -61,7 +61,7 @@ typedef struct _segment_manager
/**
* App event queue allocated in first segment
*/
- svm_queue_t *event_queue;
+ svm_msg_q_t *event_queue;
} segment_manager_t;
#define segment_manager_foreach_segment_w_lock(VAR, SM, BODY) \
@@ -114,7 +114,7 @@ segment_manager_index (segment_manager_t * sm)
return sm - segment_manager_main.segment_managers;
}
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
segment_manager_event_queue (segment_manager_t * sm)
{
return sm->event_queue;
@@ -152,7 +152,8 @@ int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs,
svm_fifo_t ** tx_fifo);
void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
svm_fifo_t * tx_fifo);
-svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
+u32 segment_manager_evt_q_expected_size (u32 q_size);
+svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
u32 queue_size);
void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
void segment_manager_app_detach (segment_manager_t * sm);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 26bc70e6fd7..38a0521af94 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -27,49 +27,90 @@
session_manager_main_t session_manager_main;
extern transport_proto_vft_t *tp_vfts;
-static void
-session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
- u32 thread_index, void *fp, void *rpc_args)
+static inline int
+session_send_evt_to_thread (void *data, void *args, u32 thread_index,
+ session_evt_type_t evt_type)
{
- session_fifo_event_t evt = { {0}, };
- svm_queue_t *q;
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
u32 tries = 0, max_tries;
- evt.event_type = evt_type;
- if (evt_type == FIFO_EVENT_RPC)
- {
- evt.rpc_args.fp = fp;
- evt.rpc_args.arg = rpc_args;
- }
- else
- evt.session_handle = session_handle;
-
- q = session_manager_get_vpp_event_queue (thread_index);
- while (svm_queue_add (q, (u8 *) & evt, 1))
+ mq = session_manager_get_vpp_event_queue (thread_index);
+ while (svm_msg_q_try_lock (mq))
{
max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
- break;
+ return -1;
}
}
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = evt_type;
+ switch (evt_type)
+ {
+ case FIFO_EVENT_RPC:
+ evt->rpc_args.fp = data;
+ evt->rpc_args.arg = args;
+ break;
+ case FIFO_EVENT_APP_TX:
+ case FIFO_EVENT_BUILTIN_RX:
+ evt->fifo = data;
+ break;
+ case FIFO_EVENT_DISCONNECT:
+ evt->session_handle = session_handle ((stream_session_t *) data);
+ break;
+ default:
+ clib_warning ("evt unhandled!");
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
}
-void
-session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index)
+int
+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type)
{
- session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+ return session_send_evt_to_thread (f, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+ session_evt_type_t evt_type)
+{
+ /* only event supported for now is disconnect */
+ ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+ return session_send_evt_to_thread (s, 0, s->thread_index,
+ FIFO_EVENT_DISCONNECT);
}
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
- rpc_args);
+ session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
else
{
void (*fnp) (void *) = fp;
@@ -440,24 +481,15 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
/**
* Notify session peer that new data has been enqueued.
*
- * @param s Stream session for which the event is to be generated.
- * @param block Flag to indicate if call should block if event queue is full.
+ * @param s Stream session for which the event is to be generated.
+ * @param lock Flag to indicate if call should lock message queue.
*
- * @return 0 on succes or negative number if failed to send notification.
+ * @return 0 on success or negative number if failed to send notification.
*/
-static int
-session_enqueue_notify (stream_session_t * s, u8 block)
+static inline int
+session_enqueue_notify (stream_session_t * s, u8 lock)
{
application_t *app;
- session_fifo_event_t evt;
- svm_queue_t *q;
-
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
- {
- /* Session is closed so app will never clean up. Flush rx fifo */
- svm_fifo_dequeue_drop_all (s->server_rx_fifo);
- return 0;
- }
app = application_get_if_valid (s->app_index);
if (PREDICT_FALSE (app == 0))
@@ -466,68 +498,32 @@ session_enqueue_notify (stream_session_t * s, u8 block)
return 0;
}
- /* Built-in app? Hand event to the callback... */
- if (app->cb_fns.builtin_app_rx_callback)
- return app->cb_fns.builtin_app_rx_callback (s);
-
- /* If no event, send one */
- if (svm_fifo_set_event (s->server_rx_fifo))
- {
- /* Fabricate event */
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_APP_RX;
-
- /* Add event to server's event queue */
- q = app->event_queue;
-
- /* Based on request block (or not) for lack of space */
- if (block || PREDICT_TRUE (q->cursize < q->maxsize))
- svm_queue_add (app->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
- {
- clib_warning ("fifo full");
- return -1;
- }
- }
-
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
- ed->data[0] = evt.event_type;
+ ed->data[0] = FIFO_EVENT_APP_RX;
ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
- return 0;
+ if (lock)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_RX);
}
int
session_dequeue_notify (stream_session_t * s)
{
application_t *app;
- svm_queue_t *q;
app = application_get_if_valid (s->app_index);
if (PREDICT_FALSE (!app))
return -1;
- if (application_is_builtin (app))
- return 0;
+ if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
- q = app->event_queue;
- if (PREDICT_TRUE (q->cursize < q->maxsize))
- {
- session_fifo_event_t evt = {
- .event_type = FIFO_EVENT_APP_TX,
- .fifo = s->server_tx_fifo
- };
- svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
- }
- else
- {
- return -1;
- }
- return 0;
+ return application_send_event (app, s, FIFO_EVENT_APP_TX);
}
/**
@@ -542,16 +538,24 @@ int
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
- u32 *indices;
+ transport_service_type_t tp_service;
+ int i, errors = 0, lock;
stream_session_t *s;
- int i, errors = 0;
+ u32 *indices;
indices = smm->session_to_enqueue[transport_proto][thread_index];
+ tp_service = transport_protocol_service_type (transport_proto);
+ lock = tp_service == TRANSPORT_SERVICE_CL;
for (i = 0; i < vec_len (indices); i++)
{
s = session_get_if_valid (indices[i], thread_index);
- if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+ if (PREDICT_FALSE (!s))
+ {
+ errors++;
+ continue;
+ }
+ if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
errors++;
}
@@ -1118,9 +1122,7 @@ stream_session_disconnect (stream_session_t * s)
evt->event_type = FIFO_EVENT_DISCONNECT;
}
else
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT,
- s->thread_index);
+ session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
}
/**
@@ -1231,8 +1233,18 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
{
- smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
- vpp_pid, 0);
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ u32 notif_q_size = clib_max (16, evt_q_length >> 4);
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {evt_q_length, evt_size, 0}
+ ,
+ {notif_q_size, 256, 0}
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+ smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
}
if (smm->evt_qs_use_memfd_seg)
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index fe3477b63e3..879b3823e5d 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -38,10 +38,10 @@ typedef enum
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_RPC,
-} fifo_event_type_t;
+} session_evt_type_t;
static inline const char *
-fifo_event_type_str (fifo_event_type_t et)
+fifo_event_type_str (session_evt_type_t et)
{
switch (et)
{
@@ -62,6 +62,13 @@ fifo_event_type_str (fifo_event_type_t et)
}
}
+typedef enum
+{
+ SESSION_MQ_IO_EVT_RING,
+ SESSION_MQ_CTRL_EVT_RING,
+ SESSION_MQ_N_RINGS
+} session_mq_rings_e;
+
#define foreach_session_input_error \
_(NO_SESSION, "No session drops") \
_(NO_LISTENER, "No listener for dst port drops") \
@@ -86,23 +93,30 @@ typedef struct
{
void *fp;
void *arg;
-} rpc_args_t;
+} session_rpc_args_t;
typedef u64 session_handle_t;
/* *INDENT-OFF* */
-typedef CLIB_PACKED (struct {
+typedef struct
+{
+ u8 event_type;
+ u8 postponed;
union
+ {
+ svm_fifo_t *fifo;
+ session_handle_t session_handle;
+ session_rpc_args_t rpc_args;
+ struct
{
- svm_fifo_t * fifo;
- session_handle_t session_handle;
- rpc_args_t rpc_args;
+ u8 data[0];
};
- u8 event_type;
- u8 postponed;
-}) session_fifo_event_t;
+ };
+} __clib_packed session_fifo_event_t;
/* *INDENT-ON* */
+#define SESSION_MSG_NULL { }
+
typedef struct session_dgram_pre_hdr_
{
u32 data_length;
@@ -193,7 +207,7 @@ struct _session_manager_main
session_tx_context_t *ctx;
/** vpp fifo event queue */
- svm_queue_t **vpp_event_queues;
+ svm_msg_q_t **vpp_event_queues;
/** Event queues memfd segment initialized only if so configured */
ssvm_private_t evt_qs_segment;
@@ -533,9 +547,12 @@ int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
void stream_session_disconnect_transport (stream_session_t * s);
void stream_session_cleanup (stream_session_t * s);
-void session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index);
+int session_send_io_evt_to_thread (svm_fifo_t * f,
+ session_evt_type_t evt_type);
+int session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type);
+void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
+ void *rpc_args);
ssvm_private_t *session_manager_get_evt_q_segment (void);
u8 *format_stream_session (u8 * s, va_list * args);
@@ -549,7 +566,7 @@ void session_register_transport (transport_proto_t transport_proto,
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
session_manager_get_vpp_event_queue (u32 thread_index)
{
return session_manager_main.vpp_event_queues[thread_index];
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 1a41dbd9e08..f9fddea6148 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -155,7 +155,7 @@ send_session_accept_callback (stream_session_t * s)
vl_api_registration_t *reg;
transport_connection_t *tc;
stream_session_t *listener;
- svm_queue_t *vpp_queue;
+ svm_msg_q_t *vpp_queue;
reg = vl_mem_api_client_index_to_registration (server->api_client_index);
if (!reg)
@@ -300,7 +300,7 @@ send_session_connected_callback (u32 app_index, u32 api_context,
vl_api_connect_session_reply_t *mp;
transport_connection_t *tc;
vl_api_registration_t *reg;
- svm_queue_t *vpp_queue;
+ svm_msg_q_t *vpp_queue;
application_t *app;
app = application_get (app_index);
@@ -485,7 +485,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
vl_api_bind_uri_reply_t *rmp;
stream_session_t *s;
application_t *app = 0;
- svm_queue_t *vpp_evt_q;
+ svm_msg_q_t *vpp_evt_q;
int rv;
if (session_manager_is_enabled () == 0)
@@ -759,7 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
stream_session_t *s;
transport_connection_t *tc = 0;
ip46_address_t *ip46;
- svm_queue_t *vpp_evt_q;
+ svm_msg_q_t *vpp_evt_q;
if (session_manager_is_enabled () == 0)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 85fd28db7a9..350282bd902 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
+ u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
session_fifo_event_t *pending_events, *e;
session_fifo_event_t *fifo_events;
- u32 n_to_dequeue, n_events;
- svm_queue_t *q;
- application_t *app;
- int n_tx_packets = 0;
- u32 thread_index = vm->thread_index;
- int i, rv;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
+ int n_tx_packets = 0, i, rv;
+ application_t *app;
+ svm_msg_q_t *mq;
void (*fp) (void *);
SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
@@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
transport_update_time (now, thread_index);
/*
- * Get vpp queue events
+ * Get vpp queue events that we can dequeue without blocking
*/
- q = smm->vpp_event_queues[thread_index];
- if (PREDICT_FALSE (q == 0))
- return 0;
-
+ mq = smm->vpp_event_queues[thread_index];
fifo_events = smm->free_event_vector[thread_index];
-
- /* min number of events we can dequeue without blocking */
- n_to_dequeue = q->cursize;
+ n_to_dequeue = svm_msg_q_size (mq);
pending_events = smm->pending_event_vector[thread_index];
if (!n_to_dequeue && !vec_len (pending_events)
@@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
}
/* See you in the next life, don't be late
- * XXX: we may need priorities here
- */
- if (pthread_mutex_trylock (&q->mutex))
+ * XXX: we may need priorities here */
+ if (svm_msg_q_try_lock (mq))
return 0;
for (i = 0; i < n_to_dequeue; i++)
{
vec_add2 (fifo_events, e, 1);
- svm_queue_sub_raw (q, (u8 *) e);
+ svm_msg_q_sub_w_lock (mq, msg);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+ svm_msg_q_free_msg (mq, msg);
}
- /* The other side of the connection is not polling */
- if (q->cursize < (q->maxsize / 8))
- (void) pthread_cond_broadcast (&q->condvar);
- pthread_mutex_unlock (&q->mutex);
+ svm_msg_q_unlock (mq);
vec_append (fifo_events, pending_events);
vec_append (fifo_events, smm->pending_disconnects[thread_index]);
@@ -760,19 +752,20 @@ dump_thread_0_event_queue (void)
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_fifo_event_t _e, *e = &_e;
+ svm_msg_q_ring_t *ring;
stream_session_t *s0;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_t *mq;
int i, index;
- i8 *headp;
-
- svm_queue_t *q;
- q = smm->vpp_event_queues[my_thread_index];
- index = q->head;
+ mq = smm->vpp_event_queues[my_thread_index];
+ index = mq->q->head;
- for (i = 0; i < q->cursize; i++)
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
switch (e->event_type)
{
@@ -805,7 +798,7 @@ dump_thread_0_event_queue (void)
index++;
- if (index == q->maxsize)
+ if (index == mq->q->maxsize)
index = 0;
}
}
@@ -844,10 +837,11 @@ u8
session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- svm_queue_t *q;
+ svm_msg_q_t *mq;
session_fifo_event_t *pending_event_vector, *evt;
int i, index, found = 0;
- i8 *headp;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_ring_t *ring;
u8 thread_index;
ASSERT (e);
@@ -855,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
/*
* Search evt queue
*/
- q = smm->vpp_event_queues[thread_index];
- index = q->head;
- for (i = 0; i < q->cursize; i++)
+ mq = smm->vpp_event_queues[thread_index];
+ index = mq->q->head;
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
found = session_node_cmp_event (e, f);
if (found)
return 1;
- if (++index == q->maxsize)
+ if (++index == mq->q->maxsize)
index = 0;
}
/*