aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2020-12-11 13:58:12 -0800
committerFlorin Coras <fcoras@cisco.com>2020-12-29 12:11:07 -0800
commitb462418890240b2e38dbf522f9dd0196b79e0fa8 (patch)
tree3ae26a22edb23da8d40f2c2b54eb96b056bca565 /src
parent04943b4c42db300d0d895644f32da79a6d411c51 (diff)
svm: allow mq attachments at random offsets
Type: feature Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ic373cd2c11272da539eb4b0db27227f36f2f9688
Diffstat (limited to 'src')
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo.c16
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_bapi.c50
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_common.c12
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_common.h7
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c3
-rw-r--r--src/plugins/unittest/session_test.c25
-rw-r--r--src/svm/fifo_segment.c69
-rw-r--r--src/svm/fifo_segment.h33
-rw-r--r--src/svm/fifo_types.h1
-rw-r--r--src/svm/message_queue.c136
-rw-r--r--src/svm/message_queue.h28
-rw-r--r--src/svm/svm_fifo.c1
-rw-r--r--src/vcl/vcl_bapi.c16
-rw-r--r--src/vcl/vcl_private.c44
-rw-r--r--src/vcl/vcl_private.h5
-rw-r--r--src/vcl/vcl_sapi.c15
-rw-r--r--src/vcl/vppcom.c25
-rw-r--r--src/vnet/session/application_interface.h1
-rw-r--r--src/vnet/session/segment_manager.c4
-rw-r--r--src/vnet/session/session.c7
-rw-r--r--src/vnet/session/session_api.c82
-rw-r--r--src/vnet/session/session_debug.c3
-rw-r--r--src/vnet/session/session_node.c4
23 files changed, 408 insertions, 179 deletions
diff --git a/src/plugins/hs_apps/sapi/vpp_echo.c b/src/plugins/hs_apps/sapi/vpp_echo.c
index a47a4d455d8..19b5808c550 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo.c
@@ -556,16 +556,14 @@ session_accepted_handler (session_accepted_msg_t * mp)
session = echo_session_new (em);
if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
- mp->server_tx_fifo, session))
+ mp->server_tx_fifo, mp->vpp_event_queue_address,
+ session))
{
ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
"accepted wait_for_segment_allocation errored");
return;
}
- session->vpp_evt_q =
- uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-
session->vpp_session_handle = mp->handle;
/* session->transport needed by app_send_dgram */
@@ -617,14 +615,14 @@ session_connected_handler (session_connected_msg_t * mp)
session = echo_session_new (em);
if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
- mp->server_tx_fifo, session))
+ mp->server_tx_fifo, mp->vpp_event_queue_address,
+ session))
{
ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
"connected wait_for_segment_allocation errored");
return;
}
- session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_msg_q_t *);
+
session->vpp_session_handle = mp->handle;
session->start = clib_time_now (&em->clib_time);
session->listener_index = listener_index;
@@ -806,7 +804,7 @@ echo_process_rpcs (echo_main_t * em)
{
echo_rpc_msg_t *rpc;
svm_msg_q_msg_t msg;
- svm_msg_q_t *mq = em->rpc_msq_queue;
+ svm_msg_q_t *mq = &em->rpc_msq_queue;
while (em->state < STATE_DATA_DONE && !em->time_to_stop)
{
@@ -1321,7 +1319,7 @@ main (int argc, char **argv)
cfg->n_rings = 1;
cfg->q_nitems = rpc_queue_size;
cfg->ring_cfgs = rc;
- em->rpc_msq_queue = svm_msg_q_alloc (cfg);
+ svm_msg_q_attach (&em->rpc_msq_queue, svm_msg_q_alloc (cfg));
signal (SIGINT, stop_signal);
signal (SIGQUIT, stop_signal);
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
index c643cec2ce3..6ad825d7872 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo_bapi.c
@@ -264,12 +264,12 @@ echo_segment_detach (u64 segment_handle)
int
echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
- echo_session_t *s)
+ uword mq_offset, echo_session_t *s)
{
svm_fifo_shared_t *rx_fifo, *tx_fifo;
echo_main_t *em = &echo_main;
+ u32 fs_index, eqs_index;
fifo_segment_t *fs;
- u32 fs_index;
fs_index = echo_segment_lookup (segment_handle);
if (fs_index == (u32) ~0)
@@ -279,6 +279,12 @@ echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
return -1;
}
+ if (mq_offset != (uword) ~0)
+ {
+ eqs_index = echo_segment_lookup (ECHO_MQ_SEG_HANDLE);
+ ASSERT (eqs_index != (u32) ~0);
+ }
+
rx_fifo = uword_to_pointer (rxf_offset, svm_fifo_shared_t *);
tx_fifo = uword_to_pointer (txf_offset, svm_fifo_shared_t *);
rx_fifo->client_session_index = s->session_index;
@@ -290,6 +296,39 @@ echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
s->rx_fifo = fifo_segment_alloc_fifo_w_shared (fs, rx_fifo);
s->tx_fifo = fifo_segment_alloc_fifo_w_shared (fs, tx_fifo);
+ if (mq_offset != (uword) ~0)
+ {
+ fs = fifo_segment_get_segment (&em->segment_main, eqs_index);
+ s->vpp_evt_q =
+ fifo_segment_msg_q_attach (fs, mq_offset, rx_fifo->slice_index);
+ }
+
+ clib_spinlock_unlock (&em->segment_handles_lock);
+
+ return 0;
+}
+
+int
+echo_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+ svm_msg_q_t **mq)
+{
+ echo_main_t *em = &echo_main;
+ fifo_segment_t *fs;
+ u32 fs_index;
+
+ fs_index = echo_segment_lookup (segment_handle);
+ if (fs_index == (u32) ~0)
+ {
+ ECHO_LOG (0, "ERROR: mq segment %lx for is not attached!",
+ segment_handle);
+ return -1;
+ }
+
+ clib_spinlock_lock (&em->segment_handles_lock);
+
+ fs = fifo_segment_get_segment (&em->segment_main, fs_index);
+ *mq = fifo_segment_msg_q_attach (fs, mq_offset, mq_index);
+
clib_spinlock_unlock (&em->segment_handles_lock);
return 0;
@@ -338,8 +377,6 @@ static void
em->state = STATE_CLEANED_CERT_KEY;
}
-#define ECHO_MQ_SEG_HANDLE ((u64) ~0 - 1)
-
static void
vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
{
@@ -364,8 +401,6 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
ECHO_FAIL (ECHO_FAIL_VL_API_NULL_APP_MQ, "NULL app_mq");
return;
}
- em->app_mq = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
- em->ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
if (mp->n_fds)
{
@@ -385,6 +420,8 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
"svm_fifo_segment_attach failed on SSVM_SEGMENT_MEMFD");
goto failed;
}
+ echo_segment_attach_mq (ECHO_MQ_SEG_HANDLE, mp->vpp_ctrl_mq,
+ mp->vpp_ctrl_mq_thread, &em->ctrl_mq);
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
{
@@ -401,6 +438,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
}
vec_free (segment_name);
}
+ echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq);
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]);
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_common.c b/src/plugins/hs_apps/sapi/vpp_echo_common.c
index 497f56c3e1e..e24629b783e 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_common.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo_common.c
@@ -543,23 +543,23 @@ echo_send_rpc (echo_main_t * em, void *fp, echo_rpc_args_t * args)
{
svm_msg_q_msg_t msg;
echo_rpc_msg_t *evt;
- if (PREDICT_FALSE (svm_msg_q_lock (em->rpc_msq_queue)))
+ if (PREDICT_FALSE (svm_msg_q_lock (&em->rpc_msq_queue)))
{
ECHO_FAIL (ECHO_FAIL_RPC_SIZE, "RPC lock failed");
return -1;
}
- if (PREDICT_FALSE (svm_msg_q_ring_is_full (em->rpc_msq_queue, 0)))
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (&em->rpc_msq_queue, 0)))
{
- svm_msg_q_unlock (em->rpc_msq_queue);
+ svm_msg_q_unlock (&em->rpc_msq_queue);
ECHO_FAIL (ECHO_FAIL_RPC_SIZE, "RPC ring is full");
return -2;
}
- msg = svm_msg_q_alloc_msg_w_ring (em->rpc_msq_queue, 0);
- evt = (echo_rpc_msg_t *) svm_msg_q_msg_data (em->rpc_msq_queue, &msg);
+ msg = svm_msg_q_alloc_msg_w_ring (&em->rpc_msq_queue, 0);
+ evt = (echo_rpc_msg_t *) svm_msg_q_msg_data (&em->rpc_msq_queue, &msg);
evt->fp = fp;
clib_memcpy (&evt->args, args, sizeof (evt->args));
- svm_msg_q_add_and_unlock (em->rpc_msq_queue, &msg);
+ svm_msg_q_add_and_unlock (&em->rpc_msq_queue, &msg);
return 0;
}
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_common.h b/src/plugins/hs_apps/sapi/vpp_echo_common.h
index cd2bbb6038c..dc5f7dfb9b5 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_common.h
+++ b/src/plugins/hs_apps/sapi/vpp_echo_common.h
@@ -38,6 +38,7 @@
#define TIMEOUT 10.0
#define LOGGING_BATCH (100)
#define LOG_EVERY_N_IDLE_CYCLES (1e8)
+#define ECHO_MQ_SEG_HANDLE ((u64) ~0 - 1)
#define foreach_echo_fail_code \
_(ECHO_FAIL_NONE, "ECHO_FAIL_NONE") \
@@ -300,7 +301,7 @@ typedef struct
uword *shared_segment_handles; /* Hash table : segment_names -> 1 */
clib_spinlock_t segment_handles_lock; /* Hash table lock */
echo_proto_cb_vft_t *proto_cb_vft;
- svm_msg_q_t *rpc_msq_queue; /* MQ between quic_echo threads */
+ svm_msg_q_t rpc_msq_queue; /* MQ between quic_echo threads */
fifo_segment_main_t segment_main;
/* State of the connection, shared between msg RX thread and main thread */
@@ -444,7 +445,9 @@ int echo_segment_attach (u64 segment_handle, char *name,
u32 echo_segment_lookup (u64 segment_handle);
void echo_segment_detach (u64 segment_handle);
int echo_attach_session (uword segment_handle, uword rxf_offset,
- uword txf_offset, echo_session_t *s);
+ uword mq_offset, uword txf_offset, echo_session_t *s);
+int echo_segment_attach_mq (uword segment_handle, uword mq_offset,
+ u32 mq_index, svm_msg_q_t **mq);
/* Binary API */
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c b/src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c
index 9689a83d76b..10dfcf02067 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c
@@ -132,14 +132,13 @@ udp_echo_bound_uri_cb (session_bound_msg_t * mp, echo_session_t * session)
return;
if (echo_attach_session (mp->segment_handle, mp->rx_fifo, mp->tx_fifo,
- session))
+ mp->vpp_evt_q, session))
{
ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
"accepted wait_for_segment_allocation errored");
return;
}
- session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
session->transport.is_ip4 = mp->lcl_is_ip4;
clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
sizeof (ip46_address_t));
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index f54ed9ff1ac..68605b2cbd7 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -1909,8 +1909,9 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
svm_msg_q_msg_t msg1, msg2, msg[12];
int __clib_unused verbose, i, rv;
- svm_msg_q_t *mq;
+ svm_msg_q_shared_t *smq;
svm_msg_q_ring_t *ring;
+ svm_msg_q_t _mq = { 0 }, *mq = &_mq;
u8 *rings_ptr;
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1933,28 +1934,30 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
cfg->q_nitems = 16;
cfg->ring_cfgs = rc;
- mq = svm_msg_q_alloc (cfg);
+ smq = svm_msg_q_alloc (cfg);
+ svm_msg_q_attach (mq, smq);
SESSION_TEST (mq != 0, "svm_msg_q_alloc");
SESSION_TEST (vec_len (mq->rings) == 2, "ring allocation");
- rings_ptr = (u8 *) mq->rings + vec_bytes (mq->rings);
+ rings_ptr = (u8 *) mq->rings[0].shr->data;
vec_foreach (ring, mq->rings)
{
- SESSION_TEST (ring->data == rings_ptr, "ring data");
+ SESSION_TEST (ring->shr->data == rings_ptr, "ring data");
rings_ptr += (uword) ring->nitems * ring->elsize;
+ rings_ptr += sizeof (svm_msg_q_ring_shared_t);
}
msg1 = svm_msg_q_alloc_msg (mq, 8);
- rv = (mq->rings[0].cursize != 1
- || msg1.ring_index != 0 || msg1.elt_index != 0);
+ rv = (mq->rings[0].shr->cursize != 1 || msg1.ring_index != 0 ||
+ msg1.elt_index != 0);
SESSION_TEST (rv == 0, "msg alloc1");
msg2 = svm_msg_q_alloc_msg (mq, 15);
- rv = (mq->rings[1].cursize != 1
- || msg2.ring_index != 1 || msg2.elt_index != 0);
+ rv = (mq->rings[1].shr->cursize != 1 || msg2.ring_index != 1 ||
+ msg2.elt_index != 0);
SESSION_TEST (rv == 0, "msg alloc2");
svm_msg_q_free_msg (mq, &msg1);
- SESSION_TEST (mq->rings[0].cursize == 0, "free msg");
+ SESSION_TEST (mq->rings[0].shr->cursize == 0, "free msg");
for (i = 0; i < 12; i++)
{
@@ -1962,7 +1965,7 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
*(u32 *) svm_msg_q_msg_data (mq, &msg[i]) = i;
}
- rv = (mq->rings[0].cursize != 8 || mq->rings[1].cursize != 5);
+ rv = (mq->rings[0].shr->cursize != 8 || mq->rings[1].shr->cursize != 5);
SESSION_TEST (rv == 0, "msg alloc3");
*(u32 *) svm_msg_q_msg_data (mq, &msg2) = 123;
@@ -1998,7 +2001,7 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
SESSION_TEST (0, "dequeue2 wrong data");
svm_msg_q_free_msg (mq, &msg[i]);
}
- rv = (mq->rings[0].cursize == 0 && mq->rings[1].cursize == 0);
+ rv = (mq->rings[0].shr->cursize == 0 && mq->rings[1].shr->cursize == 0);
SESSION_TEST (rv, "post dequeue");
return 0;
diff --git a/src/svm/fifo_segment.c b/src/svm/fifo_segment.c
index 636f223cde1..0c3a79a613b 100644
--- a/src/svm/fifo_segment.c
+++ b/src/svm/fifo_segment.c
@@ -764,9 +764,15 @@ void
fifo_segment_cleanup (fifo_segment_t *fs)
{
int slice_index;
+ svm_msg_q_t *mq = 0;
for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
clib_mem_bulk_destroy (fs->slices[slice_index].fifos);
+
+ vec_foreach (fs->mqs, mq)
+ vec_free (mq->rings);
+
+ vec_free (fs->mqs);
}
/**
@@ -944,6 +950,69 @@ fifo_segment_attach_fifo (fifo_segment_t * fs, svm_fifo_t * f,
}
}
+svm_msg_q_t *
+fifo_segment_msg_q_alloc (fifo_segment_t *fs, u32 mq_index,
+ svm_msg_q_cfg_t *cfg)
+{
+ fifo_segment_header_t *fsh = fs->h;
+ svm_msg_q_shared_t *smq;
+ svm_msg_q_t *mq;
+ void *base;
+ u32 size;
+
+ if (!fs->mqs)
+ {
+ u32 n_mqs = clib_max (fs->h->n_mqs, 1);
+ vec_validate (fs->mqs, n_mqs - 1);
+ }
+
+ size = svm_msg_q_size_to_alloc (cfg);
+ base = fsh_alloc_aligned (fsh, size, 8);
+ fsh->n_reserved_bytes += size;
+
+ smq = svm_msg_q_init (base, cfg);
+ mq = vec_elt_at_index (fs->mqs, mq_index);
+ svm_msg_q_attach (mq, smq);
+
+ return mq;
+}
+
+svm_msg_q_t *
+fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
+{
+ svm_msg_q_t *mq;
+
+ if (!fs->mqs)
+ {
+ u32 n_mqs = clib_max (fs->h->n_mqs, 1);
+ vec_validate (fs->mqs, n_mqs - 1);
+ }
+
+ mq = vec_elt_at_index (fs->mqs, mq_index);
+
+ if (!mq->q)
+ {
+ svm_msg_q_shared_t *smq;
+ smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
+ svm_msg_q_attach (mq, smq);
+ }
+
+ ASSERT (fifo_segment_msg_q_offset (fs, mq_index) == offset);
+
+ return mq;
+}
+
+uword
+fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
+{
+ svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
+
+ if (mq->q == 0)
+ return ~0ULL;
+
+ return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
+}
+
int
fifo_segment_prealloc_fifo_hdrs (fifo_segment_t * fs, u32 slice_index,
u32 batch_size)
diff --git a/src/svm/fifo_segment.h b/src/svm/fifo_segment.h
index 006ffc40f63..195869a25dd 100644
--- a/src/svm/fifo_segment.h
+++ b/src/svm/fifo_segment.h
@@ -17,6 +17,7 @@
#include <svm/ssvm.h>
#include <svm/fifo_types.h>
+#include <svm/message_queue.h>
#include <svm/svm_fifo.h>
typedef enum
@@ -70,6 +71,7 @@ typedef struct
uword max_byte_index;
u8 n_slices; /**< number of fifo segment slices */
fifo_slice_private_t *slices; /**< private slice information */
+ svm_msg_q_t *mqs; /**< private vec of attached mqs */
} fifo_segment_t;
typedef struct
@@ -130,6 +132,37 @@ void fifo_segment_attach_fifo (fifo_segment_t * fs, svm_fifo_t * f,
u32 slice_index);
/**
+ * Allocate message queue on segment
+ *
+ * @param fs fifo segment for mq
+ * @param mq_index index in private mqs vector to use to attach
+ * @param cfg configuration for mq
+ * @return attached message queue
+ */
+svm_msg_q_t *fifo_segment_msg_q_alloc (fifo_segment_t *fs, u32 mq_index,
+ svm_msg_q_cfg_t *cfg);
+
+/**
+ * Attach message queue at fifo segment offset
+ *
+ * @param fs fifo segment for mq
+ * @param offset offset for shared mq on the segment
+ * @param mq_index index in private mqs vector to use to attach
+ * @return attached message queue
+ */
+svm_msg_q_t *fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset,
+ u32 mq_index);
+
+/**
+ * Message queue offset on segment
+ *
+ * @param fs fifo segment for mq
+ * @param mq_index index of mq in private mqs vector
+ * @return offset of the shared mq the private mq is attached to
+ */
+uword fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index);
+
+/**
* Try to preallocate fifo headers
*
* Tries to preallocate fifo headers and adds them to freelist.
diff --git a/src/svm/fifo_types.h b/src/svm/fifo_types.h
index 85e67bb039c..bfd1a41a17b 100644
--- a/src/svm/fifo_types.h
+++ b/src/svm/fifo_types.h
@@ -146,6 +146,7 @@ struct fifo_segment_header_
u8 high_watermark; /**< Memory pressure watermark high */
u8 low_watermark; /**< Memory pressure watermark low */
u8 pct_first_alloc; /**< Pct of fifo size to alloc */
+ u8 n_mqs; /**< Num mqs for mqs segment */
CLIB_CACHE_LINE_ALIGN_MARK (allocator);
uword byte_index;
uword max_byte_index;
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index e586841cdb1..0ebce702430 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -34,45 +34,34 @@ static inline void *
svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
{
ASSERT (elt_index < ring->nitems);
- return (ring->data + elt_index * ring->elsize);
+ return (ring->shr->data + elt_index * ring->elsize);
}
-svm_msg_q_t *
+svm_msg_q_shared_t *
svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
{
- svm_msg_q_ring_t *ring;
- vec_header_t *vh;
- svm_msg_q_t *mq;
- u8 *rings_ptr;
- u32 q_sz;
+ svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_t *smq;
+ u32 q_sz, offset;
int i;
q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
- mq = (svm_msg_q_t *) base;
- mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
- sizeof (svm_msg_q_msg_t));
- mq->q->consumer_pid = cfg->consumer_pid;
- vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
- vh->len = cfg->n_rings;
- mq->rings = (svm_msg_q_ring_t *) (vh + 1);
- rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
+ smq = (svm_msg_q_shared_t *) base;
+ svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
+ smq->q->consumer_pid = cfg->consumer_pid;
+ smq->n_rings = cfg->n_rings;
+ ring = (void *) ((u8 *) smq->q + q_sz);
for (i = 0; i < cfg->n_rings; i++)
{
- ring = &mq->rings[i];
ring->elsize = cfg->ring_cfgs[i].elsize;
ring->nitems = cfg->ring_cfgs[i].nitems;
ring->cursize = ring->head = ring->tail = 0;
- if (cfg->ring_cfgs[i].data)
- ring->data = cfg->ring_cfgs[i].data;
- else
- {
- ring->data = rings_ptr;
- rings_ptr += (uword) ring->nitems * ring->elsize;
- }
+ offset = sizeof (*ring) + ring->nitems * ring->elsize;
+ ring = (void *) ((u8 *) ring + offset);
}
- return mq;
+ return smq;
}
uword
@@ -80,12 +69,12 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
{
svm_msg_q_ring_cfg_t *ring_cfg;
uword rings_sz = 0, mq_sz;
- u32 vec_sz, q_sz;
+ u32 q_sz;
int i;
ASSERT (cfg);
- vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
+ rings_sz = sizeof (svm_msg_q_ring_shared_t) * cfg->n_rings;
for (i = 0; i < cfg->n_rings; i++)
{
if (cfg->ring_cfgs[i].data)
@@ -95,33 +84,18 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
}
q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
- mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
+ mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
return mq_sz;
}
-svm_msg_q_t *
+svm_msg_q_shared_t *
svm_msg_q_alloc (svm_msg_q_cfg_t *cfg)
{
- svm_msg_q_ring_cfg_t *ring_cfg;
- uword rings_sz = 0, mq_sz;
- u32 vec_sz, q_sz;
+ uword mq_sz;
u8 *base;
- int i;
-
- ASSERT (cfg);
- vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
- for (i = 0; i < cfg->n_rings; i++)
- {
- if (cfg->ring_cfgs[i].data)
- continue;
- ring_cfg = &cfg->ring_cfgs[i];
- rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
- }
-
- q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
- mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
+ mq_sz = svm_msg_q_size_to_alloc (cfg);
base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES);
if (!base)
return 0;
@@ -130,6 +104,29 @@ svm_msg_q_alloc (svm_msg_q_cfg_t *cfg)
}
void
+svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
+{
+ svm_msg_q_ring_shared_t *ring;
+ svm_msg_q_shared_t *smq;
+ u32 i, n_rings, q_sz, offset;
+
+ smq = (svm_msg_q_shared_t *) smq_base;
+ mq->q = smq->q;
+ n_rings = smq->n_rings;
+ vec_validate (mq->rings, n_rings - 1);
+ q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+ ring = (void *) ((u8 *) smq->q + q_sz);
+ for (i = 0; i < n_rings; i++)
+ {
+ mq->rings[i].nitems = ring->nitems;
+ mq->rings[i].elsize = ring->elsize;
+ mq->rings[i].shr = ring;
+ offset = sizeof (*ring) + ring->nitems * ring->elsize;
+ ring = (void *) ((u8 *) ring + offset);
+ }
+}
+
+void
svm_msg_q_free (svm_msg_q_t * mq)
{
svm_queue_free (mq->q);
@@ -139,14 +136,18 @@ svm_msg_q_free (svm_msg_q_t * mq)
svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
+ svm_msg_q_ring_shared_t *sr;
+ svm_msg_q_ring_t *ring;
svm_msg_q_msg_t msg;
- svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
- ASSERT (ring->cursize < ring->nitems);
+ ring = svm_msg_q_ring_inline (mq, ring_index);
+ sr = ring->shr;
+
+ ASSERT (sr->cursize < ring->nitems);
msg.ring_index = ring - mq->rings;
- msg.elt_index = ring->tail;
- ring->tail = (ring->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&ring->cursize, 1);
+ msg.elt_index = sr->tail;
+ sr->tail = (sr->tail + 1) % ring->nitems;
+ clib_atomic_fetch_add (&sr->cursize, 1);
return msg;
}
@@ -181,16 +182,18 @@ svm_msg_q_msg_t
svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
{
svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+ svm_msg_q_ring_shared_t *sr;
svm_msg_q_ring_t *ring;
vec_foreach (ring, mq->rings)
{
- if (ring->elsize < nbytes || ring->cursize == ring->nitems)
+ sr = ring->shr;
+ if (ring->elsize < nbytes || sr->cursize == ring->nitems)
continue;
msg.ring_index = ring - mq->rings;
- msg.elt_index = ring->tail;
- ring->tail = (ring->tail + 1) % ring->nitems;
- clib_atomic_fetch_add (&ring->cursize, 1);
+ msg.elt_index = sr->tail;
+ sr->tail = (sr->tail + 1) % ring->nitems;
+ clib_atomic_fetch_add (&sr->cursize, 1);
break;
}
return msg;
@@ -206,14 +209,16 @@ svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
void
svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
+ svm_msg_q_ring_shared_t *sr;
svm_msg_q_ring_t *ring;
int need_signal;
ASSERT (vec_len (mq->rings) > msg->ring_index);
- ring = &mq->rings[msg->ring_index];
- if (msg->elt_index == ring->head)
+ ring = svm_msg_q_ring_inline (mq, msg->ring_index);
+ sr = ring->shr;
+ if (msg->elt_index == sr->head)
{
- ring->head = (ring->head + 1) % ring->nitems;
+ sr->head = (sr->head + 1) % ring->nitems;
}
else
{
@@ -222,8 +227,8 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
ASSERT (0);
}
- need_signal = ring->cursize == ring->nitems;
- clib_atomic_fetch_sub (&ring->cursize, 1);
+ need_signal = sr->cursize == ring->nitems;
+ clib_atomic_fetch_sub (&sr->cursize, 1);
if (PREDICT_FALSE (need_signal))
svm_queue_send_signal (mq->q, 0);
@@ -233,17 +238,20 @@ static int
svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
u32 dist1, dist2, tail, head;
+ svm_msg_q_ring_shared_t *sr;
svm_msg_q_ring_t *ring;
if (vec_len (mq->rings) <= msg->ring_index)
return 0;
- ring = &mq->rings[msg->ring_index];
- tail = ring->tail;
- head = ring->head;
+
+ ring = svm_msg_q_ring_inline (mq, msg->ring_index);
+ sr = ring->shr;
+ tail = sr->tail;
+ head = sr->head;
dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
if (tail == head)
- dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
+ dist2 = (sr->cursize == 0) ? 0 : ring->nitems;
else
dist2 = ((ring->nitems + tail) - head) % ring->nitems;
return (dist1 < dist2);
@@ -316,7 +324,7 @@ format_svm_msg_q (u8 * s, va_list * args)
s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
for (u32 i = 0; i < vec_len (mq->rings); i++)
{
- s = format (s, " [R%d:%d/%d]", i, mq->rings[i].cursize,
+ s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
mq->rings[i].nitems);
}
return s;
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 50f79fb0c19..4b314b837e9 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -24,16 +24,30 @@
#include <vppinfra/error.h>
#include <svm/queue.h>
-typedef struct svm_msg_q_ring_
+typedef struct svm_msg_q_ring_shared_
{
volatile u32 cursize; /**< current size of the ring */
u32 nitems; /**< max size of the ring */
volatile u32 head; /**< current head (for dequeue) */
volatile u32 tail; /**< current tail (for enqueue) */
u32 elsize; /**< size of an element */
- u8 *data; /**< chunk of memory for msg data */
+ u8 data[0]; /**< chunk of memory for msg data */
+} svm_msg_q_ring_shared_t;
+
+typedef struct svm_msg_q_ring_
+{
+ u32 nitems; /**< max size of the ring */
+ u32 elsize; /**< size of an element */
+ svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
} __clib_packed svm_msg_q_ring_t;
+typedef struct svm_msg_q_shared_
+{
+ u32 n_rings; /**< number of rings after q */
+ u32 pad; /**< 8 byte alignment for q */
+ svm_queue_t q[0]; /**< queue for exchanging messages */
+} __clib_packed svm_msg_q_shared_t;
+
typedef struct svm_msg_q_
{
svm_queue_t *q; /**< queue for exchanging messages */
@@ -77,10 +91,12 @@ typedef union
* ring configs
* @return message queue
*/
-svm_msg_q_t *svm_msg_q_alloc (svm_msg_q_cfg_t * cfg);
-svm_msg_q_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
+svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
+svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
+void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
+
/**
* Free message queue
*
@@ -267,8 +283,8 @@ svm_msg_q_is_full (svm_msg_q_t * mq)
static inline u8
svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
{
- ASSERT (ring_index < vec_len (mq->rings));
- return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
+ svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
+ return (ring->shr->cursize >= ring->nitems);
}
/**
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c
index f79f37a4687..14729710aac 100644
--- a/src/svm/svm_fifo.c
+++ b/src/svm/svm_fifo.c
@@ -398,6 +398,7 @@ svm_fifo_init (svm_fifo_t * f, u32 size)
{
c->start_byte = prev->start_byte + prev->length;
c->enq_rb_index = c->deq_rb_index = RBTREE_TNIL_INDEX;
+ ASSERT (c->length >= 1 << FS_MIN_LOG2_CHUNK_SZ);
prev = c;
c = f_cptr (f, c->next);
}
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index e245c4dc865..a13948d754d 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -65,7 +65,6 @@ static void
vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
{
vcl_worker_t *wrk = vcl_worker_get (0);
- svm_msg_q_t *ctrl_mq;
u64 segment_handle;
int *fds = 0, i, rv;
u32 n_fds = 0;
@@ -77,9 +76,6 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
goto failed;
}
- wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
- ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
- vcm->ctrl_mq = wrk->ctrl_mq = ctrl_mq;
segment_handle = clib_net_to_host_u64 (mp->segment_handle);
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
{
@@ -100,6 +96,11 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
fds[n_fds++]))
goto failed;
+ vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
+ mp->vpp_ctrl_mq, mp->vpp_ctrl_mq_thread,
+ &wrk->ctrl_mq);
+ vcm->ctrl_mq = wrk->ctrl_mq;
+
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
{
segment_name = vl_api_from_api_to_new_c_string (&mp->segment_name);
@@ -111,6 +112,8 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
goto failed;
}
+ vcl_segment_attach_mq (segment_handle, mp->app_mq, 0,
+ &wrk->app_event_queue);
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
@@ -169,8 +172,6 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
return;
wrk->vpp_wrk_index = clib_net_to_host_u32 (mp->wrk_index);
- wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
- svm_msg_q_t *);
wrk->ctrl_mq = vcm->ctrl_mq;
segment_handle = clib_net_to_host_u64 (mp->segment_handle);
@@ -204,6 +205,9 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
goto failed;
}
+ vcl_segment_attach_mq (segment_handle, mp->app_event_queue_address, 0,
+ &wrk->app_event_queue);
+
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index ea938113615..5b412351c50 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -376,12 +376,14 @@ vcl_segment_detach (u64 segment_handle)
int
vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
- uword txf_offset, u8 is_ct, vcl_session_t *s)
+ uword txf_offset, uword mq_offset, u8 is_ct,
+ vcl_session_t *s)
{
svm_fifo_shared_t *rxsf, *txsf;
+ u32 fs_index, eqs_index;
svm_fifo_t *rxf, *txf;
fifo_segment_t *fs;
- u32 fs_index;
+ u64 eqs_handle;
fs_index = vcl_segment_table_lookup (segment_handle);
if (fs_index == VCL_INVALID_SEGMENT_INDEX)
@@ -391,6 +393,13 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
return -1;
}
+ if (mq_offset != (uword) ~0)
+ {
+ eqs_handle = vcl_vpp_worker_segment_handle (0);
+ eqs_index = vcl_segment_table_lookup (eqs_handle);
+ ASSERT (eqs_index != VCL_INVALID_SEGMENT_INDEX);
+ }
+
rxsf = uword_to_pointer (rxf_offset, svm_fifo_shared_t *);
txsf = uword_to_pointer (txf_offset, svm_fifo_shared_t *);
@@ -400,6 +409,13 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
rxf = fifo_segment_alloc_fifo_w_shared (fs, rxsf);
txf = fifo_segment_alloc_fifo_w_shared (fs, txsf);
+ if (!is_ct && mq_offset != (uword) ~0)
+ {
+ fs = fifo_segment_get_segment (&vcm->segment_main, eqs_index);
+ s->vpp_evt_q =
+ fifo_segment_msg_q_attach (fs, mq_offset, rxf->shr->slice_index);
+ }
+
clib_rwlock_reader_unlock (&vcm->segment_table_lock);
if (!is_ct)
@@ -420,6 +436,30 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
return 0;
}
+int
+vcl_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+ svm_msg_q_t **mq)
+{
+ fifo_segment_t *fs;
+ u32 fs_index;
+
+ fs_index = vcl_segment_table_lookup (segment_handle);
+ if (fs_index == VCL_INVALID_SEGMENT_INDEX)
+ {
+ VDBG (0, "ERROR: mq segment %lx for is not attached!", segment_handle);
+ return -1;
+ }
+
+ clib_rwlock_reader_lock (&vcm->segment_table_lock);
+
+ fs = fifo_segment_get_segment (&vcm->segment_main, fs_index);
+ *mq = fifo_segment_msg_q_attach (fs, mq_offset, mq_index);
+
+ clib_rwlock_reader_unlock (&vcm->segment_table_lock);
+
+ return 0;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 637581b1b4b..c864375dd31 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -686,7 +686,10 @@ void vcl_segment_detach (u64 segment_handle);
void vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s);
int vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
- uword txf_offset, u8 is_ct, vcl_session_t *s);
+ uword txf_offset, uword mq_offset, u8 is_ct,
+ vcl_session_t *s);
+int vcl_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+ svm_msg_q_t **mq);
/*
* VCL Binary API
diff --git a/src/vcl/vcl_sapi.c b/src/vcl/vcl_sapi.c
index 7651b3531fb..bc44272a5c9 100644
--- a/src/vcl/vcl_sapi.c
+++ b/src/vcl/vcl_sapi.c
@@ -46,7 +46,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
int i, rv, n_fds_used = 0;
- svm_msg_q_t *ctrl_mq;
u64 segment_handle;
u8 *segment_name;
@@ -57,9 +56,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
}
wrk->api_client_handle = mp->api_client_handle;
- wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
- ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
- vcm->ctrl_mq = wrk->ctrl_mq = ctrl_mq;
segment_handle = mp->segment_handle;
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
{
@@ -75,6 +71,10 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
SSVM_SEGMENT_MEMFD, fds[n_fds_used++]))
goto failed;
+ vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_ctrl_mq,
+ mp->vpp_ctrl_mq_thread, &wrk->ctrl_mq);
+ vcm->ctrl_mq = wrk->ctrl_mq;
+
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
{
segment_name = format (0, "memfd-%ld%c", segment_handle, 0);
@@ -85,6 +85,8 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
goto failed;
}
+ vcl_segment_attach_mq (segment_handle, mp->app_mq, 0, &wrk->app_event_queue);
+
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue,
@@ -201,8 +203,6 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
wrk = vcl_worker_get_current ();
wrk->api_client_handle = mp->api_client_handle;
wrk->vpp_wrk_index = mp->wrk_index;
- wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
- svm_msg_q_t *);
wrk->ctrl_mq = vcm->ctrl_mq;
segment_handle = mp->segment_handle;
@@ -231,6 +231,9 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
goto failed;
}
+ vcl_segment_attach_mq (segment_handle, mp->app_event_queue_address, 0,
+ &wrk->app_event_queue);
+
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
{
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 734d062669e..dbb2cd59a3b 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -374,11 +374,9 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
goto error;
}
- session->vpp_evt_q =
- uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-
if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
- mp->server_tx_fifo, 0, session))
+ mp->server_tx_fifo,
+ mp->vpp_event_queue_address, 0, session))
{
VDBG (0, "failed to attach fifos for %u", session->session_index);
goto error;
@@ -412,7 +410,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
return session->session_index;
error:
- evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
+ mp->vpp_event_queue_address, mp->mq_index, &evt_q);
vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
VNET_API_ERROR_INVALID_ARGUMENT);
vcl_session_free (wrk, session);
@@ -444,11 +443,10 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
}
session->vpp_handle = mp->handle;
- session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_msg_q_t *);
if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
- mp->server_tx_fifo, 0, session))
+ mp->server_tx_fifo,
+ mp->vpp_event_queue_address, 0, session))
{
VDBG (0, "failed to attach fifos for %u", session->session_index);
session->session_state = VCL_STATE_DETACHED;
@@ -459,7 +457,7 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
if (mp->ct_rx_fifo)
{
if (vcl_segment_attach_session (mp->ct_segment_handle, mp->ct_rx_fifo,
- mp->ct_tx_fifo, 1, session))
+ mp->ct_tx_fifo, (uword) ~0, 1, session))
{
VDBG (0, "failed to attach ct fifos for %u", session->session_index);
session->session_state = VCL_STATE_DETACHED;
@@ -571,12 +569,11 @@ vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
session->transport.lcl_port = mp->lcl_port;
vcl_session_table_add_listener (wrk, mp->handle, sid);
session->session_state = VCL_STATE_LISTEN;
- session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
if (vcl_session_is_cl (session))
{
if (vcl_segment_attach_session (mp->segment_handle, mp->rx_fifo,
- mp->tx_fifo, 0, session))
+ mp->tx_fifo, mp->vpp_evt_q, 0, session))
{
VDBG (0, "failed to attach fifos for %u", session->session_index);
session->session_state = VCL_STATE_DETACHED;
@@ -645,7 +642,9 @@ vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
}
s->vpp_handle = mp->new_handle;
- s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+
+ vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_evt_q,
+ mp->vpp_thread_index, &s->vpp_evt_q);
vcl_session_table_del_vpp_handle (wrk, mp->handle);
vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
@@ -856,7 +855,7 @@ vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
if (s->rx_fifo)
{
if (vcl_segment_attach_session (msg->segment_handle, msg->rx_fifo,
- msg->tx_fifo, 0, s))
+ msg->tx_fifo, (uword) ~0, 0, s))
{
VDBG (0, "failed to attach fifos for %u", s->session_index);
return;
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 096af1efa15..96142573b98 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -370,6 +370,7 @@ typedef struct session_accepted_msg_
uword server_tx_fifo;
u64 segment_handle;
uword vpp_event_queue_address;
+ u32 mq_index;
transport_endpoint_t rmt;
u8 flags;
} __clib_packed session_accepted_msg_t;
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 7683760fde9..eca199f9d0e 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -875,7 +875,6 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
svm_msg_q_t *q;
- void *base;
fifo_evt_size = sizeof (session_event_t);
notif_q_size = clib_max (16, props->evt_q_size >> 4);
@@ -890,8 +889,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
cfg->q_nitems = props->evt_q_size;
cfg->ring_cfgs = rc;
- base = fifo_segment_alloc (segment, svm_msg_q_size_to_alloc (cfg));
- q = svm_msg_q_init (base, cfg);
+ q = fifo_segment_msg_q_alloc (segment, 0, cfg);
if (props->use_mq_eventfd)
{
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 259e212e1a3..9a4d29bdf29 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1508,7 +1508,6 @@ session_vpp_event_queues_allocate (session_main_t * smm)
fifo_segment_t *eqs = &smm->evt_qs_segment;
uword eqs_size = 64 << 20;
pid_t vpp_pid = getpid ();
- void *base;
int i;
if (smm->configured_event_queue_length)
@@ -1531,6 +1530,9 @@ session_vpp_event_queues_allocate (session_main_t * smm)
fifo_segment_init (eqs);
+ /* Special fifo segment that's filled only with mqs */
+ eqs->h->n_mqs = vec_len (smm->wrk);
+
for (i = 0; i < vec_len (smm->wrk); i++)
{
svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
@@ -1544,8 +1546,7 @@ session_vpp_event_queues_allocate (session_main_t * smm)
cfg->q_nitems = evt_q_length;
cfg->ring_cfgs = rc;
- base = fifo_segment_alloc (eqs, svm_msg_q_size_to_alloc (cfg));
- smm->wrk[i].vpp_event_queue = svm_msg_q_init (base, cfg);
+ smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
clib_warning ("eventfd returned");
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 8fe1acf6a6e..4602a783e60 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -130,7 +130,8 @@ mq_send_session_accepted_cb (session_t * s)
{
app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
svm_msg_q_msg_t _msg, *msg = &_msg;
- svm_msg_q_t *vpp_queue, *app_mq;
+ svm_msg_q_t *app_mq;
+ fifo_segment_t *eq_seg;
session_t *listener;
session_accepted_msg_t *mp;
session_event_t *evt;
@@ -152,6 +153,8 @@ mq_send_session_accepted_cb (session_t * s)
mp->segment_handle = session_segment_handle (s);
mp->flags = s->flags;
+ eq_seg = session_main_get_evt_q_segment ();
+
if (session_has_transport (s))
{
listener = listen_session_get_from_handle (s->listener_handle);
@@ -164,8 +167,9 @@ mq_send_session_accepted_cb (session_t * s)
if (listener)
mp->listener_handle = listen_session_get_handle (listener);
}
- vpp_queue = session_main_get_vpp_event_queue (s->thread_index);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+ mp->vpp_event_queue_address =
+ fifo_segment_msg_q_offset (eq_seg, s->thread_index);
+ mp->mq_index = s->thread_index;
mp->handle = session_handle (s);
session_get_endpoint (s, &mp->rmt, 0 /* is_lcl */ );
@@ -180,8 +184,9 @@ mq_send_session_accepted_cb (session_t * s)
mp->rmt.is_ip4 = session_type_is_ip4 (listener->session_type);
mp->rmt.port = ct->c_rmt_port;
mp->handle = session_handle (s);
- vpp_queue = session_main_get_vpp_event_queue (s->thread_index);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+ mp->vpp_event_queue_address =
+ fifo_segment_msg_q_offset (eq_seg, s->thread_index);
+ mp->mq_index = s->thread_index;
}
svm_msg_q_add_and_unlock (app_mq, msg);
@@ -262,8 +267,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
{
svm_msg_q_msg_t _msg, *msg = &_msg;
session_connected_msg_t *mp;
- svm_msg_q_t *vpp_mq, *app_mq;
+ svm_msg_q_t *app_mq;
transport_connection_t *tc;
+ fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
@@ -289,6 +295,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
if (err)
goto done;
+ eq_seg = session_main_get_evt_q_segment ();
+
if (session_has_transport (s))
{
tc = session_get_transport (s);
@@ -299,9 +307,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
goto done;
}
- vpp_mq = session_main_get_vpp_event_queue (s->thread_index);
mp->handle = session_handle (s);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+ mp->vpp_event_queue_address =
+ fifo_segment_msg_q_offset (eq_seg, s->thread_index);
session_get_endpoint (s, &mp->lcl, 1 /* is_lcl */ );
@@ -318,8 +326,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
mp->handle = session_handle (s);
mp->lcl.port = cct->c_lcl_port;
mp->lcl.is_ip4 = cct->c_is_ip4;
- vpp_mq = session_main_get_vpp_event_queue (s->thread_index);
- mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+ mp->vpp_event_queue_address =
+ fifo_segment_msg_q_offset (eq_seg, s->thread_index);
mp->server_rx_fifo = pointer_to_uword (s->rx_fifo->shr);
mp->server_tx_fifo = pointer_to_uword (s->tx_fifo->shr);
mp->segment_handle = session_segment_handle (s);
@@ -341,9 +349,10 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
session_handle_t handle, int rv)
{
svm_msg_q_msg_t _msg, *msg = &_msg;
- svm_msg_q_t *app_mq, *vpp_evt_q;
+ svm_msg_q_t *app_mq;
transport_endpoint_t tep;
session_bound_msg_t *mp;
+ fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
app_listener_t *al;
@@ -381,8 +390,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
mp->lcl_is_ip4 = tep.is_ip4;
clib_memcpy_fast (mp->lcl_ip, &tep.ip, sizeof (tep.ip));
- vpp_evt_q = session_main_get_vpp_event_queue (0);
- mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+ eq_seg = session_main_get_evt_q_segment ();
+ mp->vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
{
@@ -425,10 +434,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
{
svm_msg_q_msg_t _msg, *msg = &_msg;
session_migrated_msg_t *mp;
- svm_msg_q_t *vpp_evt_q;
+ fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
session_event_t *evt;
svm_msg_q_t *app_mq;
+ u32 thread_index;
+
+ thread_index = session_thread_from_handle (new_sh);
+ eq_seg = session_main_get_evt_q_segment ();
app_wrk = app_worker_get (s->app_wrk_index);
app_mq = app_wrk->event_queue;
@@ -441,9 +454,8 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
mp = (session_migrated_msg_t *) evt->data;
mp->handle = session_handle (s);
mp->new_handle = new_sh;
- mp->vpp_thread_index = session_thread_from_handle (new_sh);
- vpp_evt_q = session_main_get_vpp_event_queue (mp->vpp_thread_index);
- mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+ mp->vpp_thread_index = thread_index;
+ mp->vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
mp->segment_handle = session_segment_handle (s);
svm_msg_q_add_and_unlock (app_mq, msg);
}
@@ -601,12 +613,10 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
{
int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
vl_api_app_attach_reply_t *rmp;
- ssvm_private_t *segp;
+ fifo_segment_t *segp, *evt_q_segment = 0;
vnet_app_attach_args_t _a, *a = &_a;
- fifo_segment_t *evt_q_segment;
u8 fd_flags = 0, ctrl_thread;
vl_api_registration_t *reg;
- svm_msg_q_t *ctrl_mq;
reg = vl_api_client_index_to_registration (mp->client_index);
if (!reg)
@@ -669,19 +679,19 @@ done:
if (!rv)
{
ctrl_thread = vlib_num_workers () ? 1 : 0;
- ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
- segp = a->segment;
+ segp = (fifo_segment_t *) a->segment;
rmp->app_index = clib_host_to_net_u32 (a->app_index);
- rmp->app_mq = pointer_to_uword (a->app_evt_q);
- rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+ rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
+ rmp->vpp_ctrl_mq =
+ fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
- if (vec_len (segp->name))
+ if (vec_len (segp->ssvm.name))
{
- vl_api_vec_to_api_string (segp->name, &rmp->segment_name);
+ vl_api_vec_to_api_string (segp->ssvm.name, &rmp->segment_name);
}
- rmp->segment_size = segp->ssvm_size;
+ rmp->segment_size = segp->ssvm.ssvm_size;
rmp->segment_handle = clib_host_to_net_u64 (a->segment_handle);
}
}));
@@ -755,13 +765,14 @@ done:
rmp->segment_handle = clib_host_to_net_u64 (args.segment_handle);
if (!rv && mp->is_add)
{
+ rmp->app_event_queue_address =
+ fifo_segment_msg_q_offset ((fifo_segment_t *) args.segment, 0);
+ rmp->n_fds = n_fds;
+ rmp->fd_flags = fd_flags;
if (vec_len (args.segment->name))
{
vl_api_vec_to_api_string (args.segment->name, &rmp->segment_name);
}
- rmp->app_event_queue_address = pointer_to_uword (args.evt_q);
- rmp->n_fds = n_fds;
- rmp->fd_flags = fd_flags;
}
}));
/* *INDENT-ON* */
@@ -1341,7 +1352,6 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
app_ns_api_handle_t *handle;
app_sapi_msg_t msg = { 0 };
app_worker_t *app_wrk;
- svm_msg_q_t *ctrl_mq;
application_t *app;
/* Make sure name is null terminated */
@@ -1390,10 +1400,11 @@ done:
if (!rv)
{
ctrl_thread = vlib_num_workers ()? 1 : 0;
- ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
rmp->app_index = a->app_index;
- rmp->app_mq = pointer_to_uword (a->app_evt_q);
- rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+ rmp->app_mq =
+ fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
+ rmp->vpp_ctrl_mq =
+ fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
rmp->vpp_ctrl_mq_thread = ctrl_thread;
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
@@ -1502,7 +1513,8 @@ done:
if (!rv && mp->is_add)
{
/* No segment name and size. This supports only memfds */
- rmp->app_event_queue_address = pointer_to_uword (args.evt_q);
+ rmp->app_event_queue_address =
+ fifo_segment_msg_q_offset ((fifo_segment_t *) args.segment, 0);
rmp->n_fds = n_fds;
rmp->fd_flags = fd_flags;
diff --git a/src/vnet/session/session_debug.c b/src/vnet/session/session_debug.c
index c2718f3bfc2..cd4198c5da8 100644
--- a/src/vnet/session/session_debug.c
+++ b/src/vnet/session/session_debug.c
@@ -120,7 +120,6 @@ session_debug_init (void)
void
dump_thread_0_event_queue (void)
{
- session_main_t *smm = vnet_get_session_main ();
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_event_t _e, *e = &_e;
@@ -130,7 +129,7 @@ dump_thread_0_event_queue (void)
svm_msg_q_t *mq;
int i, index;
- mq = smm->wrk[my_thread_index].vpp_event_queue;
+ mq = session_main_get_vpp_event_queue (my_thread_index);
index = mq->q->head;
for (i = 0; i < mq->q->cursize; i++)
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 2fde85b03ec..49e4e5a6b10 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -472,8 +472,8 @@ session_mq_worker_update_handler (void *data)
evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
rmp = (session_worker_update_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
- rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
- rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
+ rmp->rx_fifo = pointer_to_uword (s->rx_fifo->shr);
+ rmp->tx_fifo = pointer_to_uword (s->tx_fifo->shr);
rmp->segment_handle = session_segment_handle (s);
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);