aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/svm/message_queue.c102
-rw-r--r--src/svm/message_queue.h157
-rw-r--r--src/svm/queue.c18
-rw-r--r--src/svm/queue.h8
-rw-r--r--src/svm/svm_fifo.h6
-rw-r--r--src/svm/test_svm_message_queue.c4
-rw-r--r--src/tests/vnet/session/tcp_echo.c42
-rw-r--r--src/tests/vnet/session/udp_echo.c34
-rw-r--r--src/vcl/vcl_bapi.c6
-rw-r--r--src/vcl/vcl_private.h2
-rw-r--r--src/vcl/vppcom.c31
-rw-r--r--src/vnet/session-apps/echo_client.c31
-rw-r--r--src/vnet/session-apps/echo_client.h2
-rw-r--r--src/vnet/session-apps/echo_server.c17
-rw-r--r--src/vnet/session-apps/http_server.c23
-rw-r--r--src/vnet/session-apps/proxy.c31
-rw-r--r--src/vnet/session-apps/proxy.h4
-rw-r--r--src/vnet/session/application.c141
-rw-r--r--src/vnet/session/application.h7
-rw-r--r--src/vnet/session/application_interface.h83
-rw-r--r--src/vnet/session/segment_manager.c43
-rw-r--r--src/vnet/session/segment_manager.h9
-rw-r--r--src/vnet/session/session.c192
-rw-r--r--src/vnet/session/session.h47
-rwxr-xr-xsrc/vnet/session/session_api.c8
-rw-r--r--src/vnet/session/session_node.c73
-rw-r--r--src/vnet/tls/tls.c52
27 files changed, 773 insertions, 400 deletions
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 4f3e7642740..89411143c12 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -16,6 +16,25 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>
+static inline svm_msg_q_ring_t *
+svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
+{
+ return vec_elt_at_index (mq->rings, ring_index);
+}
+
+svm_msg_q_ring_t *
+svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
+{
+ return svm_msg_q_ring_inline (mq, ring_index);
+}
+
+static inline void *
+svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
+{
+ ASSERT (elt_index < ring->nitems);
+ return (ring->data + elt_index * ring->elsize);
+}
+
svm_msg_q_t *
svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
{
@@ -63,6 +82,53 @@ svm_msg_q_free (svm_msg_q_t * mq)
}
svm_msg_q_msg_t
+svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
+{
+ svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+ svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
+
+ ASSERT (ring->cursize != ring->nitems);
+ msg.ring_index = ring - mq->rings;
+ msg.elt_index = ring->tail;
+ ring->tail = (ring->tail + 1) % ring->nitems;
+ __sync_fetch_and_add (&ring->cursize, 1);
+ return msg;
+}
+
+int
+svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
+ u8 noblock, svm_msg_q_msg_t * msg)
+{
+ if (noblock)
+ {
+ if (svm_msg_q_try_lock (mq))
+ return -1;
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ }
+ else
+ {
+ svm_msg_q_lock (mq);
+ *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+ while (svm_msg_q_msg_is_invalid (msg))
+ {
+ svm_msg_q_wait (mq);
+ *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+ }
+ }
+ return 0;
+}
+
+svm_msg_q_msg_t
svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
{
svm_msg_q_msg_t msg = {.as_u64 = ~0 };
@@ -81,23 +147,10 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
return msg;
}
-static inline svm_msg_q_ring_t *
-svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index)
-{
- return vec_elt_at_index (mq->rings, ring_index);
-}
-
-static inline void *
-svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
-{
- ASSERT (elt_index < ring->nitems);
- return (ring->data + elt_index * ring->elsize);
-}
-
void *
svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
- svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index);
+ svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index);
return svm_msg_q_ring_data (ring, msg->elt_index);
}
@@ -131,7 +184,7 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return 0;
ring = &mq->rings[msg->ring_index];
- dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems;
+ dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
if (ring->tail == ring->head)
dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
else
@@ -140,10 +193,17 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
}
int
-svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait)
+svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
+{
+ ASSERT (svm_msq_q_msg_is_valid (mq, msg));
+ return svm_queue_add (mq->q, (u8 *) msg, nowait);
+}
+
+void
+svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
- ASSERT (svm_msq_q_msg_is_valid (mq, &msg));
- return svm_queue_add (mq->q, (u8 *) & msg, nowait);
+ ASSERT (svm_msq_q_msg_is_valid (mq, msg));
+ svm_queue_add_raw (mq->q, (u8 *) msg);
}
int
@@ -153,6 +213,12 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
}
+void
+svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
+{
+ svm_queue_sub_raw (mq->q, (u8 *) msg);
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 5ec8547016e..708a03d716e 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -21,14 +21,15 @@
#define SRC_SVM_MESSAGE_QUEUE_H_
#include <vppinfra/clib.h>
+#include <vppinfra/error.h>
#include <svm/queue.h>
typedef struct svm_msg_q_ring_
{
volatile u32 cursize; /**< current size of the ring */
u32 nitems; /**< max size of the ring */
- u32 head; /**< current head (for dequeue) */
- u32 tail; /**< current tail (for enqueue) */
+ volatile u32 head; /**< current head (for dequeue) */
+ volatile u32 tail; /**< current tail (for enqueue) */
u32 elsize; /**< size of an element */
u8 *data; /**< chunk of memory for msg data */
} svm_msg_q_ring_t;
@@ -64,6 +65,7 @@ typedef union
u64 as_u64;
} svm_msg_q_msg_t;
+#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
/**
* Allocate message queue
*
@@ -98,6 +100,36 @@ void svm_msg_q_free (svm_msg_q_t * mq);
svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
/**
+ * Allocate message buffer on ring
+ *
+ * Message is allocated, on requested ring. The caller MUST check that
+ * the ring is not full.
+ *
+ * @param mq message queue
+ * @param ring_index ring on which the allocation should occur
+ * @return message structure pointing to the ring and position
+ * allocated
+ */
+svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
+
+/**
+ * Lock message queue and allocate message buffer on ring
+ *
+ * This should be used when multiple writers/readers are expected to
+ * compete for the rings/queue. Message should be enqueued by calling
+ * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
+ * the message in enqueued.
+ *
+ * @param mq message queue
+ * @param ring_index ring on which the allocation should occur
+ * @param noblock flag that indicates if request should block
+ * @param msg pointer to message to be filled in
+ * @return 0 on success, negative number otherwise
+ */
+int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
+ u8 noblock, svm_msg_q_msg_t * msg);
+
+/**
* Free message buffer
*
* Marks message buffer on ring as free.
@@ -106,6 +138,7 @@ svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
* @param msg message to be freed
*/
void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+
/**
* Producer enqueue one message to queue
*
@@ -117,7 +150,20 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
* @param nowait flag to indicate if request is blocking or not
* @return success status
*/
-int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
+int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
+
+/**
+ * Producer enqueue one message to queue with mutex held
+ *
+ * Prior to calling this, the producer should've obtained a message buffer
+ * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
+ * the queue mutex is held.
+ *
+ * @param mq message queue
+ * @param msg message (pointer to ring position) to be enqueued
+ * @return success status
+ */
+void svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
/**
* Consumer dequeue one message from queue
@@ -129,13 +175,28 @@ int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
* @param mq message queue
* @param msg pointer to structure where message is to be received
* @param cond flag that indicates if request should block or not
+ * @param time time to wait if condition it SVM_Q_TIMEDWAIT
* @return success status
*/
int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time);
/**
- * Get data for message in queu
+ * Consumer dequeue one message from queue with mutex held
+ *
+ * Returns the message pointing to the data in the message rings under the
+ * assumption that the message queue lock is already held. The consumer is
+ * expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq message queue
+ * @param msg pointer to structure where message is to be received
+ * @return success status
+ */
+void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+
+/**
+ * Get data for message in queue
*
* @param mq message queue
* @param msg message for which the data is requested
@@ -143,6 +204,94 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
*/
void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+/**
+ * Get message queue ring
+ *
+ * @param mq message queue
+ * @param ring_index index of ring
+ * @return pointer to ring
+ */
+svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
+
+/**
+ * Check if message queue is full
+ */
+static inline u8
+svm_msg_q_is_full (svm_msg_q_t * mq)
+{
+ return (mq->q->cursize == mq->q->maxsize);
+}
+
+static inline u8
+svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
+{
+ ASSERT (ring_index < vec_len (mq->rings));
+ return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
+}
+
+/**
+ * Check if message queue is empty
+ */
+static inline u8
+svm_msg_q_is_empty (svm_msg_q_t * mq)
+{
+ return (mq->q->cursize == 0);
+}
+
+/**
+ * Check length of message queue
+ */
+static inline u32
+svm_msg_q_size (svm_msg_q_t * mq)
+{
+ return mq->q->cursize;
+}
+
+/**
+ * Check if message is invalid
+ */
+static inline u8
+svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
+{
+ return (msg->as_u64 == (u64) ~ 0);
+}
+
+/**
+ * Try locking message queue
+ */
+static inline int
+svm_msg_q_try_lock (svm_msg_q_t * mq)
+{
+ return pthread_mutex_trylock (&mq->q->mutex);
+}
+
+/**
+ * Lock, or block trying, the message queue
+ */
+static inline int
+svm_msg_q_lock (svm_msg_q_t * mq)
+{
+ return pthread_mutex_lock (&mq->q->mutex);
+}
+
+static inline void
+svm_msg_q_wait (svm_msg_q_t * mq)
+{
+ pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
+}
+
+/**
+ * Unlock message queue
+ */
+static inline void
+svm_msg_q_unlock (svm_msg_q_t * mq)
+{
+ /* The other side of the connection is not polling */
+ if (mq->q->cursize < (mq->q->maxsize / 8))
+ (void) pthread_cond_broadcast (&mq->q->condvar);
+ pthread_mutex_unlock (&mq->q->mutex);
+}
+
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
/*
diff --git a/src/svm/queue.c b/src/svm/queue.c
index 96e40fc2aec..8e18f5832e3 100644
--- a/src/svm/queue.c
+++ b/src/svm/queue.c
@@ -154,26 +154,16 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
return 0;
}
-int
+void
svm_queue_add_raw (svm_queue_t * q, u8 * elem)
{
i8 *tailp;
- if (PREDICT_FALSE (q->cursize == q->maxsize))
- {
- while (q->cursize == q->maxsize)
- ;
- }
-
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
clib_memcpy (tailp, elem, q->elsize);
- q->tail++;
+ q->tail = (q->tail + 1) % q->maxsize;
q->cursize++;
-
- if (q->tail == q->maxsize)
- q->tail = 0;
- return 0;
}
@@ -414,11 +404,9 @@ svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
headp = (i8 *) (&q->data[0] + q->elsize * q->head);
clib_memcpy (elem, headp, q->elsize);
- q->head++;
+ q->head = (q->head + 1) % q->maxsize;
q->cursize--;
- if (q->head == q->maxsize)
- q->head = 0;
return 0;
}
diff --git a/src/svm/queue.h b/src/svm/queue.h
index 856c17237d1..68a63d769b6 100644
--- a/src/svm/queue.h
+++ b/src/svm/queue.h
@@ -69,7 +69,13 @@ void svm_queue_unlock (svm_queue_t * q);
int svm_queue_is_full (svm_queue_t * q);
int svm_queue_add_nolock (svm_queue_t * q, u8 * elem);
int svm_queue_sub_raw (svm_queue_t * q, u8 * elem);
-int svm_queue_add_raw (svm_queue_t * q, u8 * elem);
+
+/**
+ * Add element to queue with mutex held
+ * @param q queue
+ * @param elem pointer element data to add
+ */
+void svm_queue_add_raw (svm_queue_t * q, u8 * elem);
/*
* DEPRECATED please use svm_queue_t instead
diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h
index 0d5a08b86ae..40182901db5 100644
--- a/src/svm/svm_fifo.h
+++ b/src/svm/svm_fifo.h
@@ -107,6 +107,12 @@ svm_fifo_max_dequeue (svm_fifo_t * f)
return f->cursize;
}
+static inline int
+svm_fifo_is_full (svm_fifo_t * f)
+{
+ return (f->cursize == f->nitems);
+}
+
static inline u32
svm_fifo_max_enqueue (svm_fifo_t * f)
{
diff --git a/src/svm/test_svm_message_queue.c b/src/svm/test_svm_message_queue.c
index 69ffd131ac2..758163ffeab 100644
--- a/src/svm/test_svm_message_queue.c
+++ b/src/svm/test_svm_message_queue.c
@@ -88,9 +88,9 @@ test1 (int verbose)
test1_error ("failed: msg alloc3");
*(u32 *)svm_msg_q_msg_data (mq, &msg2) = 123;
- svm_msg_q_add (mq, msg2, SVM_Q_NOWAIT);
+ svm_msg_q_add (mq, &msg2, SVM_Q_NOWAIT);
for (i = 0; i < 12; i++)
- svm_msg_q_add (mq, msg[i], SVM_Q_NOWAIT);
+ svm_msg_q_add (mq, &msg[i], SVM_Q_NOWAIT);
if (svm_msg_q_sub (mq, &msg2, SVM_Q_NOWAIT, 0))
test1_error ("failed: dequeue1");
diff --git a/src/tests/vnet/session/tcp_echo.c b/src/tests/vnet/session/tcp_echo.c
index 59314f943f0..f8b75d95ac7 100644
--- a/src/tests/vnet/session/tcp_echo.c
+++ b/src/tests/vnet/session/tcp_echo.c
@@ -48,7 +48,7 @@ typedef struct
svm_fifo_t *server_rx_fifo;
svm_fifo_t *server_tx_fifo;
- svm_queue_t *vpp_evt_q;
+ svm_msg_q_t *vpp_evt_q;
u64 vpp_session_handle;
u64 bytes_sent;
@@ -99,7 +99,7 @@ typedef struct
int no_return;
/* Our event queue */
- svm_queue_t *our_event_queue;
+ svm_msg_q_t *our_event_queue;
/* $$$ single thread only for the moment */
svm_queue_t *vpp_event_queue;
@@ -426,7 +426,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
ASSERT (mp->app_event_queue_address);
em->our_event_queue = uword_to_pointer (mp->app_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
em->state = STATE_ATTACHED;
}
@@ -677,7 +677,6 @@ send_test_chunk (echo_main_t * em, session_t * s)
svm_fifo_t *tx_fifo = s->server_tx_fifo;
u8 *test_data = em->connect_test_data;
u32 enq_space, min_chunk = 16 << 10;
- session_fifo_event_t evt;
int written;
test_buf_len = vec_len (test_data);
@@ -698,12 +697,8 @@ send_test_chunk (echo_main_t * em, session_t * s)
s->bytes_sent += written;
if (svm_fifo_set_event (tx_fifo))
- {
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (s->vpp_evt_q, (u8 *) & evt, 0 /* wait for mutex */ );
- }
+ app_send_io_evt_to_vpp (s->vpp_evt_q, tx_fifo, FIFO_EVENT_APP_TX,
+ 0 /* do wait for mutex */ );
}
}
@@ -751,6 +746,7 @@ client_rx_thread_fn (void *arg)
session_fifo_event_t _e, *e = &_e;
echo_main_t *em = &echo_main;
static u8 *rx_buf = 0;
+ svm_msg_q_msg_t msg;
vec_validate (rx_buf, 1 << 20);
@@ -759,7 +755,8 @@ client_rx_thread_fn (void *arg)
while (!em->time_to_stop)
{
- svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+ svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+ e = svm_msg_q_msg_data (em->our_event_queue, &msg);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
@@ -769,6 +766,7 @@ client_rx_thread_fn (void *arg)
clib_warning ("unknown event type %d", e->event_type);
break;
}
+ svm_msg_q_free_msg (em->our_event_queue, &msg);
}
pthread_exit (0);
}
@@ -808,7 +806,7 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
session->vpp_session_handle = mp->handle;
session->start = clib_time_now (&em->clib_time);
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
@@ -869,8 +867,8 @@ client_disconnect (echo_main_t * em, session_t * s)
static void
clients_run (echo_main_t * em)
{
- session_fifo_event_t _e, *e = &_e;
f64 start_time, deltat, timeout = 100.0;
+ svm_msg_q_msg_t msg;
session_t *s;
int i;
@@ -918,9 +916,15 @@ clients_run (echo_main_t * em)
start_time = clib_time_now (&em->clib_time);
em->state = STATE_READY;
while (em->n_active_clients)
- if (em->our_event_queue->cursize)
- svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_NOWAIT, 0);
-
+ if (!svm_msg_q_is_empty (em->our_event_queue))
+ {
+ if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0))
+ {
+ clib_warning ("svm msg q returned");
+ }
+ else
+ svm_msg_q_free_msg (em->our_event_queue, &msg);
+ }
for (i = 0; i < em->n_clients; i++)
{
@@ -1180,10 +1184,12 @@ void
server_handle_event_queue (echo_main_t * em)
{
session_fifo_event_t _e, *e = &_e;
+ svm_msg_q_msg_t msg;
while (1)
{
- svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+ svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+ e = svm_msg_q_msg_data (em->our_event_queue, &msg);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
@@ -1191,6 +1197,7 @@ server_handle_event_queue (echo_main_t * em)
break;
case FIFO_EVENT_DISCONNECT:
+ svm_msg_q_free_msg (em->our_event_queue, &msg);
return;
default:
@@ -1204,6 +1211,7 @@ server_handle_event_queue (echo_main_t * em)
em->time_to_print_stats = 0;
fformat (stdout, "%d connections\n", pool_elts (em->sessions));
}
+ svm_msg_q_free_msg (em->our_event_queue, &msg);
}
}
diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c
index 54e00b181bd..d796b6b8de0 100644
--- a/src/tests/vnet/session/udp_echo.c
+++ b/src/tests/vnet/session/udp_echo.c
@@ -87,10 +87,10 @@ typedef struct
u8 is_connected;
/* Our event queue */
- svm_queue_t *our_event_queue;
+ svm_msg_q_t *our_event_queue;
/* $$$ single thread only for the moment */
- svm_queue_t *vpp_event_queue;
+ svm_msg_q_t *vpp_event_queue;
/* $$$$ hack: cut-through session index */
volatile u32 cut_through_session_index;
@@ -369,7 +369,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
}
utm->our_event_queue =
- uword_to_pointer (mp->app_event_queue_address, svm_queue_t *);
+ uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
}
static void
@@ -736,7 +736,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
sizeof (ip46_address_t));
session->transport.is_ip4 = mp->lcl_is_ip4;
session->transport.lcl_port = mp->lcl_port;
- session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_queue_t *);
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
utm->state = utm->is_connected ? STATE_BOUND : STATE_READY;
}
@@ -896,7 +896,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
start_time = clib_time_now (&utm->clib_time);
utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -909,7 +909,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
{
clib_warning ("cut-through session");
utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
rx_fifo->master_session_index = session_index;
tx_fifo->master_session_index = session_index;
utm->cut_through_session_index = session_index;
@@ -1012,23 +1012,23 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
/* Cut-through case */
if (mp->client_event_queue_address)
{
clib_warning ("cut-through session");
utm->cut_through_session_index = session - utm->sessions;
utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
utm->do_echo = 1;
}
else
{
utm->connected_session = session - utm->sessions;
utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
sizeof (ip46_address_t));
@@ -1134,14 +1134,20 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, session_fifo_event_t * e)
void
server_handle_event_queue (udp_echo_main_t * utm)
{
- session_fifo_event_t _e, *e = &_e;
+ session_fifo_event_t *e;
+ svm_msg_q_msg_t msg;
while (utm->state != STATE_READY)
sleep (5);
while (1)
{
- svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+ if (svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0))
+ {
+ clib_warning ("svm msg q returned");
+ continue;
+ }
+ e = svm_msg_q_msg_data (utm->our_event_queue, &msg);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
@@ -1149,12 +1155,14 @@ server_handle_event_queue (udp_echo_main_t * utm)
break;
case FIFO_EVENT_DISCONNECT:
- return;
+ utm->time_to_stop = 1;
+ break;
default:
clib_warning ("unknown event type %d", e->event_type);
break;
}
+ svm_msg_q_free_msg (utm->our_event_queue, &msg);
if (PREDICT_FALSE (utm->time_to_stop == 1))
return;
if (PREDICT_FALSE (utm->time_to_print_stats == 1))
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index cc3179d860a..ca65782e9c6 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -99,7 +99,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
}
vcm->app_event_queue =
- uword_to_pointer (mp->app_event_queue_address, svm_queue_t *);
+ uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
vcm->app_state = STATE_APP_ATTACHED;
}
@@ -291,7 +291,7 @@ done:
VCL_IO_SESSIONS_UNLOCK ();
}
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
rx_fifo->client_session_index = session_index;
@@ -431,7 +431,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_queue_t *);
+ svm_msg_q_t *);
session->session_state = STATE_ACCEPT;
session->transport.rmt_port = mp->port;
session->transport.is_ip4 = mp->is_ip4;
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 4283b6e1167..aba4839f129 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -194,7 +194,7 @@ typedef struct vppcom_main_t_
clib_bitmap_t *ex_bitmap;
/* Our event queue */
- svm_queue_t *app_event_queue;
+ svm_msg_q_t *app_event_queue;
/* unique segment name counter */
u32 unique_segment_index;
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 1fd138e6dd3..0d077caa797 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -1164,16 +1164,19 @@ vppcom_session_read_ready (vcl_session_t * session, u32 session_index)
}
rv = ready;
- if (vcm->app_event_queue->cursize &&
- !pthread_mutex_trylock (&vcm->app_event_queue->mutex))
+ if (!svm_msg_q_is_empty (vcm->app_event_queue) &&
+ !pthread_mutex_trylock (&vcm->app_event_queue->q->mutex))
{
- u32 i, n_to_dequeue = vcm->app_event_queue->cursize;
- session_fifo_event_t e;
+ u32 i, n_to_dequeue = vcm->app_event_queue->q->cursize;
+ svm_msg_q_msg_t msg;
for (i = 0; i < n_to_dequeue; i++)
- svm_queue_sub_raw (vcm->app_event_queue, (u8 *) & e);
+ {
+ svm_queue_sub_raw (vcm->app_event_queue->q, (u8 *) & msg);
+ svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+ }
- pthread_mutex_unlock (&vcm->app_event_queue->mutex);
+ pthread_mutex_unlock (&vcm->app_event_queue->q->mutex);
}
done:
return rv;
@@ -1184,8 +1187,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
{
vcl_session_t *session = 0;
svm_fifo_t *tx_fifo = 0;
- svm_queue_t *q;
- session_fifo_event_t evt;
+ svm_msg_q_t *mq;
session_state_t state;
int rv, n_write, is_nonblocking;
u32 poll_et;
@@ -1241,18 +1243,15 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
if ((n_write > 0) && svm_fifo_set_event (tx_fifo))
{
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
-
+ /* Send TX event to vpp */
VCL_SESSION_LOCK_AND_GET (session_index, &session);
- q = session->vpp_evt_q;
- ASSERT (q);
- svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+ mq = session->vpp_evt_q;
+ ASSERT (mq);
+ app_send_io_evt_to_vpp (mq, tx_fifo, FIFO_EVENT_APP_TX, SVM_Q_WAIT);
VCL_SESSION_UNLOCK ();
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
"to vpp_event_q %p, n_write %d", getpid (),
- vpp_handle, session_index, q, n_write);
+ vpp_handle, session_index, mq, n_write);
}
if (n_write <= 0)
diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c
index 6ee91f9c54b..3d1af676186 100644
--- a/src/vnet/session-apps/echo_client.c
+++ b/src/vnet/session-apps/echo_client.c
@@ -62,13 +62,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
svm_fifo_t *f = s->data.tx_fifo;
rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk);
svm_fifo_enqueue_nocopy (f, rv);
- if (svm_fifo_set_event (f))
- {
- session_fifo_event_t evt;
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0);
- }
+ session_send_io_evt_to_thread_custom (f, s->thread_index,
+ FIFO_EVENT_APP_TX);
}
else
rv = app_send_stream (&s->data, test_data + test_buf_offset,
@@ -98,13 +93,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
hdr.lcl_port = at->lcl_port;
svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr);
svm_fifo_enqueue_nocopy (f, rv);
- if (svm_fifo_set_event (f))
- {
- session_fifo_event_t evt;
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0);
- }
+ session_send_io_evt_to_thread_custom (f, s->thread_index,
+ FIFO_EVENT_APP_TX);
}
else
rv = app_send_dgram (&s->data, test_data + test_buf_offset,
@@ -462,18 +452,9 @@ echo_clients_rx_callback (stream_session_t * s)
if (svm_fifo_max_dequeue (s->server_rx_fifo))
{
- session_fifo_event_t evt;
- svm_queue_t *q;
if (svm_fifo_set_event (s->server_rx_fifo))
- {
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_BUILTIN_RX;
- q = session_manager_get_vpp_event_queue (s->thread_index);
- if (PREDICT_FALSE (q->cursize == q->maxsize))
- clib_warning ("out of event queue space");
- else if (svm_queue_add (q, (u8 *) & evt, 0))
- clib_warning ("failed to enqueue self-tap");
- }
+ session_send_io_evt_to_thread (s->server_rx_fifo,
+ FIFO_EVENT_BUILTIN_RX);
}
return 0;
}
diff --git a/src/vnet/session-apps/echo_client.h b/src/vnet/session-apps/echo_client.h
index f3fc8d2b59e..db5ba163628 100644
--- a/src/vnet/session-apps/echo_client.h
+++ b/src/vnet/session-apps/echo_client.h
@@ -46,7 +46,7 @@ typedef struct
* Application setup parameters
*/
svm_queue_t *vl_input_queue; /**< vpe input queue */
- svm_queue_t **vpp_event_queue;
+ svm_msg_q_t **vpp_event_queue;
u32 cli_node_index; /**< cli process node index */
u32 my_client_index; /**< loopback API client handle */
diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c
index 7d1ae5a4d49..770f4ba7337 100644
--- a/src/vnet/session-apps/echo_server.c
+++ b/src/vnet/session-apps/echo_server.c
@@ -23,7 +23,7 @@ typedef struct
/*
* Server app parameters
*/
- svm_queue_t **vpp_queue;
+ svm_msg_q_t **vpp_queue;
svm_queue_t *vl_input_queue; /**< Sever's event queue */
u32 app_index; /**< Server app index */
@@ -145,10 +145,8 @@ echo_server_rx_callback (stream_session_t * s)
int actual_transfer;
svm_fifo_t *tx_fifo, *rx_fifo;
echo_server_main_t *esm = &echo_server_main;
- session_fifo_event_t evt;
u32 thread_index = vlib_get_thread_index ();
app_session_transport_t at;
- svm_queue_t *q;
ASSERT (s->thread_index == thread_index);
@@ -170,8 +168,9 @@ echo_server_rx_callback (stream_session_t * s)
max_dequeue = ph.data_length - ph.data_offset;
if (!esm->vpp_queue[s->thread_index])
{
- q = session_manager_get_vpp_event_queue (s->thread_index);
- esm->vpp_queue[s->thread_index] = q;
+ svm_msg_q_t *mq;
+ mq = session_manager_get_vpp_event_queue (s->thread_index);
+ esm->vpp_queue[s->thread_index] = mq;
}
max_enqueue -= sizeof (session_dgram_hdr_t);
}
@@ -191,13 +190,7 @@ echo_server_rx_callback (stream_session_t * s)
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
- evt.fifo = rx_fifo;
- evt.event_type = FIFO_EVENT_BUILTIN_RX;
-
- q = esm->vpp_queue[s->thread_index];
- if (PREDICT_FALSE (q->cursize == q->maxsize))
- clib_warning ("out of event queue space");
- else if (svm_queue_add (q, (u8 *) & evt, 0))
+ if (session_send_io_evt_to_thread (rx_fifo, FIFO_EVENT_BUILTIN_RX))
clib_warning ("failed to enqueue self-tap");
vec_validate (esm->rx_retries[s->thread_index], s->session_index);
diff --git a/src/vnet/session-apps/http_server.c b/src/vnet/session-apps/http_server.c
index 9ad1297b901..ef9f760c992 100644
--- a/src/vnet/session-apps/http_server.c
+++ b/src/vnet/session-apps/http_server.c
@@ -32,7 +32,7 @@ typedef struct
typedef struct
{
u8 **rx_buf;
- svm_queue_t **vpp_queue;
+ svm_msg_q_t **vpp_queue;
u64 byte_index;
uword *handler_by_get_request;
@@ -140,7 +140,6 @@ http_cli_output (uword arg, u8 * buffer, uword buffer_bytes)
void
send_data (stream_session_t * s, u8 * data)
{
- session_fifo_event_t evt;
u32 offset, bytes_to_send;
f64 delay = 10e-3;
http_server_main_t *hsm = &http_server_main;
@@ -178,14 +177,8 @@ send_data (stream_session_t * s, u8 * data)
bytes_to_send -= actual_transfer;
if (svm_fifo_set_event (s->server_tx_fifo))
- {
- /* Fabricate TX event, send to vpp */
- evt.fifo = s->server_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
-
- svm_queue_add (hsm->vpp_queue[s->thread_index],
- (u8 *) & evt, 0 /* do wait for mutex */ );
- }
+ session_send_io_evt_to_thread (s->server_tx_fifo,
+ FIFO_EVENT_APP_TX);
delay = 10e-3;
}
}
@@ -379,15 +372,7 @@ http_server_rx_callback (stream_session_t * s)
/* Send an RPC request via the thread-0 input node */
if (vlib_get_thread_index () != 0)
- {
- session_fifo_event_t evt;
- evt.rpc_args.fp = alloc_http_process_callback;
- evt.rpc_args.arg = args;
- evt.event_type = FIFO_EVENT_RPC;
- svm_queue_add
- (session_manager_get_vpp_event_queue (0 /* main thread */ ),
- (u8 *) & evt, 0 /* do wait for mutex */ );
- }
+ session_send_rpc_evt_to_thread (0, alloc_http_process_callback, args);
else
alloc_http_process (args);
return 0;
diff --git a/src/vnet/session-apps/proxy.c b/src/vnet/session-apps/proxy.c
index 78aa0de2840..6260ad350f0 100644
--- a/src/vnet/session-apps/proxy.c
+++ b/src/vnet/session-apps/proxy.c
@@ -194,7 +194,6 @@ proxy_rx_callback (stream_session_t * s)
int proxy_index;
uword *p;
svm_fifo_t *active_open_tx_fifo;
- session_fifo_event_t evt;
ASSERT (s->thread_index == thread_index);
@@ -212,10 +211,9 @@ proxy_rx_callback (stream_session_t * s)
if (svm_fifo_set_event (active_open_tx_fifo))
{
u32 ao_thread_index = active_open_tx_fifo->master_thread_index;
- evt.fifo = active_open_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- if (svm_queue_add (pm->active_open_event_queue[ao_thread_index],
- (u8 *) & evt, 0 /* do wait for mutex */ ))
+ if (session_send_io_evt_to_thread_custom (active_open_tx_fifo,
+ ao_thread_index,
+ FIFO_EVENT_APP_TX))
clib_warning ("failed to enqueue tx evt");
}
}
@@ -278,7 +276,6 @@ active_open_connected_callback (u32 app_index, u32 opaque,
proxy_main_t *pm = &proxy_main;
proxy_session_t *ps;
u8 thread_index = vlib_get_thread_index ();
- session_fifo_event_t evt;
if (is_fail)
{
@@ -320,15 +317,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
/*
* Send event for active open tx fifo
*/
+ ASSERT (s->thread_index == thread_index);
if (svm_fifo_set_event (s->server_tx_fifo))
- {
- evt.fifo = s->server_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- if (svm_queue_add
- (pm->active_open_event_queue[thread_index], (u8 *) & evt,
- 0 /* do wait for mutex */ ))
- clib_warning ("failed to enqueue tx evt");
- }
+ session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
return 0;
}
@@ -354,8 +345,6 @@ active_open_disconnect_callback (stream_session_t * s)
static int
active_open_rx_callback (stream_session_t * s)
{
- proxy_main_t *pm = &proxy_main;
- session_fifo_event_t evt;
svm_fifo_t *proxy_tx_fifo;
proxy_tx_fifo = s->server_rx_fifo;
@@ -365,12 +354,10 @@ active_open_rx_callback (stream_session_t * s)
*/
if (svm_fifo_set_event (proxy_tx_fifo))
{
- u32 p_thread_index = proxy_tx_fifo->master_thread_index;
- evt.fifo = proxy_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- if (svm_queue_add (pm->server_event_queue[p_thread_index], (u8 *) & evt,
- 0 /* do wait for mutex */ ))
- clib_warning ("failed to enqueue server rx evt");
+ u8 thread_index = proxy_tx_fifo->master_thread_index;
+ return session_send_io_evt_to_thread_custom (proxy_tx_fifo,
+ thread_index,
+ FIFO_EVENT_APP_TX);
}
return 0;
diff --git a/src/vnet/session-apps/proxy.h b/src/vnet/session-apps/proxy.h
index 4bca0a02cd8..c221a5e75f2 100644
--- a/src/vnet/session-apps/proxy.h
+++ b/src/vnet/session-apps/proxy.h
@@ -40,8 +40,8 @@ typedef struct
{
svm_queue_t *vl_input_queue; /**< vpe input queue */
/** per-thread vectors */
- svm_queue_t **server_event_queue;
- svm_queue_t **active_open_event_queue;
+ svm_msg_q_t **server_event_queue;
+ svm_msg_q_t **active_open_event_queue;
u8 **rx_buf; /**< intermediate rx buffers */
u32 cli_node_index; /**< cli process node index */
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 60019dd7961..6041b49712d 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -807,6 +807,143 @@ application_get_segment_manager_properties (u32 app_index)
return &app->sm_properties;
}
+static inline int
+app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
+{
+ if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
+ {
+ if (lock)
+ {
+ svm_msg_q_add_w_lock (mq, msg);
+ svm_msg_q_unlock (mq);
+ }
+ else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+ {
+ clib_warning ("msg q add returned");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+ }
+ else
+ {
+ clib_warning ("evt q full");
+ svm_msg_q_free_msg (mq, msg);
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+ return 0;
+}
+
+static inline int
+app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
+{
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+
+ if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ {
+ /* Session is closed so app will never clean up. Flush rx fifo */
+ svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+ return 0;
+ }
+
+ /* Built-in app? Hand event to the callback... */
+ if (app->cb_fns.builtin_app_rx_callback)
+ return app->cb_fns.builtin_app_rx_callback (s);
+
+ /* If no need for event, return */
+ if (!svm_fifo_set_event (s->server_rx_fifo))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = s->server_rx_fifo;
+ evt->event_type = FIFO_EVENT_APP_RX;
+
+ return app_enqueue_evt (mq, &msg, lock);
+}
+
+static inline int
+app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
+{
+ svm_msg_q_t *mq;
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+
+ if (application_is_builtin (app))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = FIFO_EVENT_APP_TX;
+ evt->fifo = s->server_tx_fifo;
+
+ return app_enqueue_evt (mq, &msg, lock);
+}
+
+/* *INDENT-OFF* */
+typedef int (app_send_evt_handler_fn) (application_t *app,
+ stream_session_t *s,
+ u8 lock);
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+ app_send_io_evt_rx,
+ app_send_io_evt_tx,
+};
+/* *INDENT-ON* */
+
+/**
+ * Send event to application
+ *
+ * Logic from queue perspective is non-blocking. That is, if there's
+ * not enough space to enqueue a message, we return. However, if the lock
+ * flag is set, we do wait for queue mutex.
+ */
+int
+application_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
+ return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
+}
+
+int
+application_lock_and_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+}
+
local_session_t *
application_alloc_local_session (application_t * app)
{
@@ -949,14 +1086,14 @@ application_local_session_connect (u32 table_index, application_t * client,
svm_fifo_segment_private_t *seg;
segment_manager_t *sm;
local_session_t *ls;
- svm_queue_t *sq, *cq;
+ svm_msg_q_t *sq, *cq;
ls = application_alloc_local_session (server);
props = application_segment_manager_properties (server);
cprops = application_segment_manager_properties (client);
evt_q_elts = props->evt_q_size + cprops->evt_q_size;
- evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+ evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
has_transport = session_has_transport ((stream_session_t *) ll);
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index aad7089ba6f..f6c81275826 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -72,7 +72,7 @@ typedef struct _application
u32 ns_index;
/** Application listens for events on this svm queue */
- svm_queue_t *event_queue;
+ svm_msg_q_t *event_queue;
/*
* Callbacks: shoulder-taps for the server/client
@@ -207,6 +207,11 @@ int application_local_session_disconnect_w_index (u32 app_index,
u32 ls_index);
void application_local_sessions_del (application_t * app);
+int application_send_event (application_t * app, stream_session_t * s,
+ u8 evt);
+int application_lock_and_send_event (application_t * app,
+ stream_session_t * s, u8 evt_type);
+
always_inline u32
local_session_id (local_session_t * ll)
{
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 2c171487e70..50c043493f2 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -136,7 +136,7 @@ typedef enum
_(IS_BUILTIN, "Application is builtin") \
_(IS_PROXY, "Application is proxying") \
_(USE_GLOBAL_SCOPE, "App can use global session scope") \
- _(USE_LOCAL_SCOPE, "App can use local session scope")
+ _(USE_LOCAL_SCOPE, "App can use local session scope") \
typedef enum _app_options
{
@@ -187,7 +187,7 @@ typedef struct app_session_transport_
_(volatile u8, session_state) /**< session state */ \
_(u32, session_index) /**< index in owning pool */ \
_(app_session_transport_t, transport) /**< transport info */ \
- _(svm_queue_t, *vpp_evt_q) /**< vpp event queue */ \
+ _(svm_msg_q_t, *vpp_evt_q) /**< vpp event queue */ \
_(u8, is_dgram) /**< flag for dgram mode */ \
typedef struct
@@ -197,13 +197,73 @@ typedef struct
#undef _
} app_session_t;
+/**
+ * Send fifo io event to vpp worker thread
+ *
+ * Because there may be multiple writers to one of vpp's queues, this
+ * protects message allocation and enqueueing.
+ *
+ * @param mq vpp message queue
+ * @param f fifo for which the event is sent
+ * @param evt_type type of event
+ * @param noblock flag to indicate is request is blocking or not
+ * @return 0 if success, negative integer otherwise
+ */
+static inline int
+app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
+ u8 noblock)
+{
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+
+ if (noblock)
+ {
+ if (svm_msg_q_try_lock (mq))
+ return -1;
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = f;
+ evt->event_type = evt_type;
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ else
+ {
+ svm_msg_q_lock (mq);
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ while (svm_msg_q_msg_is_invalid (&msg))
+ {
+ svm_msg_q_wait (mq);
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = f;
+ evt->event_type = evt_type;
+ if (svm_msg_q_is_full (mq))
+ svm_msg_q_wait (mq);
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+}
+
always_inline int
app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
- svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
+ svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
{
u32 max_enqueue, actual_write;
session_dgram_hdr_t hdr;
- session_fifo_event_t evt;
int rv;
max_enqueue = svm_fifo_max_enqueue (f);
@@ -225,11 +285,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
{
if (svm_fifo_set_event (f))
- {
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
- }
+ app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
}
ASSERT (rv);
return rv;
@@ -243,20 +299,15 @@ app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
}
always_inline int
-app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data,
+app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
u32 len, u8 noblock)
{
- session_fifo_event_t evt;
int rv;
if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
{
if (svm_fifo_set_event (f))
- {
- evt.fifo = f;
- evt.event_type = FIFO_EVENT_APP_TX;
- svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
- }
+ app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
}
return rv;
}
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 31bb0c3aab5..b00bcd5bbc8 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -599,25 +599,52 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
segment_manager_segment_reader_unlock (sm);
}
+u32
+segment_manager_evt_q_expected_size (u32 q_len)
+{
+ u32 fifo_evt_size, notif_q_size, q_hdrs;
+ u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
+
+ fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t));
+ notif_q_size = clib_max (16, q_len >> 4);
+
+ msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
+ fifo_evt_ring_sz = q_len * fifo_evt_size;
+ session_ntf_ring_sz = notif_q_size * 256;
+ q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t);
+
+ return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs);
+}
+
/**
* Allocates shm queue in the first segment
*
* Must be called with lock held
*/
-svm_queue_t *
+svm_msg_q_t *
segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
u32 queue_size)
{
- ssvm_shared_header_t *sh;
- svm_queue_t *q;
+ u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ svm_msg_q_t *q;
void *oldheap;
- sh = segment->ssvm.sh;
+ fifo_evt_size = sizeof (session_fifo_event_t);
+ notif_q_size = clib_max (16, queue_size >> 4);
+ /* *INDENT-OFF* */
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {queue_size, fifo_evt_size, 0},
+ {notif_q_size, session_evt_size, 0}
+ };
+ /* *INDENT-ON* */
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = queue_size;
+ cfg->ring_cfgs = rc;
- oldheap = ssvm_push_heap (sh);
- q = svm_queue_init (queue_size, sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0 /* signal when queue non-empty */ );
+ oldheap = ssvm_push_heap (segment->ssvm.sh);
+ q = svm_msg_q_alloc (cfg);
ssvm_pop_heap (oldheap);
return q;
}
diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h
index 62e5e97e703..73cb4827a8b 100644
--- a/src/vnet/session/segment_manager.h
+++ b/src/vnet/session/segment_manager.h
@@ -17,7 +17,7 @@
#include <vnet/vnet.h>
#include <svm/svm_fifo_segment.h>
-#include <svm/queue.h>
+#include <svm/message_queue.h>
#include <vlibmemory/api.h>
#include <vppinfra/lock.h>
#include <vppinfra/valloc.h>
@@ -61,7 +61,7 @@ typedef struct _segment_manager
/**
* App event queue allocated in first segment
*/
- svm_queue_t *event_queue;
+ svm_msg_q_t *event_queue;
} segment_manager_t;
#define segment_manager_foreach_segment_w_lock(VAR, SM, BODY) \
@@ -114,7 +114,7 @@ segment_manager_index (segment_manager_t * sm)
return sm - segment_manager_main.segment_managers;
}
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
segment_manager_event_queue (segment_manager_t * sm)
{
return sm->event_queue;
@@ -152,7 +152,8 @@ int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs,
svm_fifo_t ** tx_fifo);
void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
svm_fifo_t * tx_fifo);
-svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
+u32 segment_manager_evt_q_expected_size (u32 q_size);
+svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
u32 queue_size);
void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
void segment_manager_app_detach (segment_manager_t * sm);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 26bc70e6fd7..38a0521af94 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -27,49 +27,90 @@
session_manager_main_t session_manager_main;
extern transport_proto_vft_t *tp_vfts;
-static void
-session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
- u32 thread_index, void *fp, void *rpc_args)
+static inline int
+session_send_evt_to_thread (void *data, void *args, u32 thread_index,
+ session_evt_type_t evt_type)
{
- session_fifo_event_t evt = { {0}, };
- svm_queue_t *q;
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
u32 tries = 0, max_tries;
- evt.event_type = evt_type;
- if (evt_type == FIFO_EVENT_RPC)
- {
- evt.rpc_args.fp = fp;
- evt.rpc_args.arg = rpc_args;
- }
- else
- evt.session_handle = session_handle;
-
- q = session_manager_get_vpp_event_queue (thread_index);
- while (svm_queue_add (q, (u8 *) & evt, 1))
+ mq = session_manager_get_vpp_event_queue (thread_index);
+ while (svm_msg_q_try_lock (mq))
{
max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
- break;
+ return -1;
}
}
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = evt_type;
+ switch (evt_type)
+ {
+ case FIFO_EVENT_RPC:
+ evt->rpc_args.fp = data;
+ evt->rpc_args.arg = args;
+ break;
+ case FIFO_EVENT_APP_TX:
+ case FIFO_EVENT_BUILTIN_RX:
+ evt->fifo = data;
+ break;
+ case FIFO_EVENT_DISCONNECT:
+ evt->session_handle = session_handle ((stream_session_t *) data);
+ break;
+ default:
+ clib_warning ("evt unhandled!");
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
}
-void
-session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index)
+int
+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type)
{
- session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+ return session_send_evt_to_thread (f, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+ session_evt_type_t evt_type)
+{
+ /* only event supported for now is disconnect */
+ ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+ return session_send_evt_to_thread (s, 0, s->thread_index,
+ FIFO_EVENT_DISCONNECT);
}
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
- rpc_args);
+ session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
else
{
void (*fnp) (void *) = fp;
@@ -440,24 +481,15 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
/**
* Notify session peer that new data has been enqueued.
*
- * @param s Stream session for which the event is to be generated.
- * @param block Flag to indicate if call should block if event queue is full.
+ * @param s Stream session for which the event is to be generated.
+ * @param lock Flag to indicate if call should lock message queue.
*
- * @return 0 on succes or negative number if failed to send notification.
+ * @return 0 on success or negative number if failed to send notification.
*/
-static int
-session_enqueue_notify (stream_session_t * s, u8 block)
+static inline int
+session_enqueue_notify (stream_session_t * s, u8 lock)
{
application_t *app;
- session_fifo_event_t evt;
- svm_queue_t *q;
-
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
- {
- /* Session is closed so app will never clean up. Flush rx fifo */
- svm_fifo_dequeue_drop_all (s->server_rx_fifo);
- return 0;
- }
app = application_get_if_valid (s->app_index);
if (PREDICT_FALSE (app == 0))
@@ -466,68 +498,32 @@ session_enqueue_notify (stream_session_t * s, u8 block)
return 0;
}
- /* Built-in app? Hand event to the callback... */
- if (app->cb_fns.builtin_app_rx_callback)
- return app->cb_fns.builtin_app_rx_callback (s);
-
- /* If no event, send one */
- if (svm_fifo_set_event (s->server_rx_fifo))
- {
- /* Fabricate event */
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_APP_RX;
-
- /* Add event to server's event queue */
- q = app->event_queue;
-
- /* Based on request block (or not) for lack of space */
- if (block || PREDICT_TRUE (q->cursize < q->maxsize))
- svm_queue_add (app->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
- {
- clib_warning ("fifo full");
- return -1;
- }
- }
-
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
- ed->data[0] = evt.event_type;
+ ed->data[0] = FIFO_EVENT_APP_RX;
ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
- return 0;
+ if (lock)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_RX);
}
int
session_dequeue_notify (stream_session_t * s)
{
application_t *app;
- svm_queue_t *q;
app = application_get_if_valid (s->app_index);
if (PREDICT_FALSE (!app))
return -1;
- if (application_is_builtin (app))
- return 0;
+ if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
- q = app->event_queue;
- if (PREDICT_TRUE (q->cursize < q->maxsize))
- {
- session_fifo_event_t evt = {
- .event_type = FIFO_EVENT_APP_TX,
- .fifo = s->server_tx_fifo
- };
- svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
- }
- else
- {
- return -1;
- }
- return 0;
+ return application_send_event (app, s, FIFO_EVENT_APP_TX);
}
/**
@@ -542,16 +538,24 @@ int
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
- u32 *indices;
+ transport_service_type_t tp_service;
+ int i, errors = 0, lock;
stream_session_t *s;
- int i, errors = 0;
+ u32 *indices;
indices = smm->session_to_enqueue[transport_proto][thread_index];
+ tp_service = transport_protocol_service_type (transport_proto);
+ lock = tp_service == TRANSPORT_SERVICE_CL;
for (i = 0; i < vec_len (indices); i++)
{
s = session_get_if_valid (indices[i], thread_index);
- if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+ if (PREDICT_FALSE (!s))
+ {
+ errors++;
+ continue;
+ }
+ if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
errors++;
}
@@ -1118,9 +1122,7 @@ stream_session_disconnect (stream_session_t * s)
evt->event_type = FIFO_EVENT_DISCONNECT;
}
else
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT,
- s->thread_index);
+ session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
}
/**
@@ -1231,8 +1233,18 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
{
- smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
- vpp_pid, 0);
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ u32 notif_q_size = clib_max (16, evt_q_length >> 4);
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {evt_q_length, evt_size, 0}
+ ,
+ {notif_q_size, 256, 0}
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+ smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
}
if (smm->evt_qs_use_memfd_seg)
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index fe3477b63e3..879b3823e5d 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -38,10 +38,10 @@ typedef enum
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_RPC,
-} fifo_event_type_t;
+} session_evt_type_t;
static inline const char *
-fifo_event_type_str (fifo_event_type_t et)
+fifo_event_type_str (session_evt_type_t et)
{
switch (et)
{
@@ -62,6 +62,13 @@ fifo_event_type_str (fifo_event_type_t et)
}
}
+typedef enum
+{
+ SESSION_MQ_IO_EVT_RING,
+ SESSION_MQ_CTRL_EVT_RING,
+ SESSION_MQ_N_RINGS
+} session_mq_rings_e;
+
#define foreach_session_input_error \
_(NO_SESSION, "No session drops") \
_(NO_LISTENER, "No listener for dst port drops") \
@@ -86,23 +93,30 @@ typedef struct
{
void *fp;
void *arg;
-} rpc_args_t;
+} session_rpc_args_t;
typedef u64 session_handle_t;
/* *INDENT-OFF* */
-typedef CLIB_PACKED (struct {
+typedef struct
+{
+ u8 event_type;
+ u8 postponed;
union
+ {
+ svm_fifo_t *fifo;
+ session_handle_t session_handle;
+ session_rpc_args_t rpc_args;
+ struct
{
- svm_fifo_t * fifo;
- session_handle_t session_handle;
- rpc_args_t rpc_args;
+ u8 data[0];
};
- u8 event_type;
- u8 postponed;
-}) session_fifo_event_t;
+ };
+} __clib_packed session_fifo_event_t;
/* *INDENT-ON* */
+#define SESSION_MSG_NULL { }
+
typedef struct session_dgram_pre_hdr_
{
u32 data_length;
@@ -193,7 +207,7 @@ struct _session_manager_main
session_tx_context_t *ctx;
/** vpp fifo event queue */
- svm_queue_t **vpp_event_queues;
+ svm_msg_q_t **vpp_event_queues;
/** Event queues memfd segment initialized only if so configured */
ssvm_private_t evt_qs_segment;
@@ -533,9 +547,12 @@ int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
void stream_session_disconnect_transport (stream_session_t * s);
void stream_session_cleanup (stream_session_t * s);
-void session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index);
+int session_send_io_evt_to_thread (svm_fifo_t * f,
+ session_evt_type_t evt_type);
+int session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type);
+void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
+ void *rpc_args);
ssvm_private_t *session_manager_get_evt_q_segment (void);
u8 *format_stream_session (u8 * s, va_list * args);
@@ -549,7 +566,7 @@ void session_register_transport (transport_proto_t transport_proto,
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
session_manager_get_vpp_event_queue (u32 thread_index)
{
return session_manager_main.vpp_event_queues[thread_index];
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 1a41dbd9e08..f9fddea6148 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -155,7 +155,7 @@ send_session_accept_callback (stream_session_t * s)
vl_api_registration_t *reg;
transport_connection_t *tc;
stream_session_t *listener;
- svm_queue_t *vpp_queue;
+ svm_msg_q_t *vpp_queue;
reg = vl_mem_api_client_index_to_registration (server->api_client_index);
if (!reg)
@@ -300,7 +300,7 @@ send_session_connected_callback (u32 app_index, u32 api_context,
vl_api_connect_session_reply_t *mp;
transport_connection_t *tc;
vl_api_registration_t *reg;
- svm_queue_t *vpp_queue;
+ svm_msg_q_t *vpp_queue;
application_t *app;
app = application_get (app_index);
@@ -485,7 +485,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
vl_api_bind_uri_reply_t *rmp;
stream_session_t *s;
application_t *app = 0;
- svm_queue_t *vpp_evt_q;
+ svm_msg_q_t *vpp_evt_q;
int rv;
if (session_manager_is_enabled () == 0)
@@ -759,7 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
stream_session_t *s;
transport_connection_t *tc = 0;
ip46_address_t *ip46;
- svm_queue_t *vpp_evt_q;
+ svm_msg_q_t *vpp_evt_q;
if (session_manager_is_enabled () == 0)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 85fd28db7a9..350282bd902 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
+ u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
session_fifo_event_t *pending_events, *e;
session_fifo_event_t *fifo_events;
- u32 n_to_dequeue, n_events;
- svm_queue_t *q;
- application_t *app;
- int n_tx_packets = 0;
- u32 thread_index = vm->thread_index;
- int i, rv;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
+ int n_tx_packets = 0, i, rv;
+ application_t *app;
+ svm_msg_q_t *mq;
void (*fp) (void *);
SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
@@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
transport_update_time (now, thread_index);
/*
- * Get vpp queue events
+ * Get vpp queue events that we can dequeue without blocking
*/
- q = smm->vpp_event_queues[thread_index];
- if (PREDICT_FALSE (q == 0))
- return 0;
-
+ mq = smm->vpp_event_queues[thread_index];
fifo_events = smm->free_event_vector[thread_index];
-
- /* min number of events we can dequeue without blocking */
- n_to_dequeue = q->cursize;
+ n_to_dequeue = svm_msg_q_size (mq);
pending_events = smm->pending_event_vector[thread_index];
if (!n_to_dequeue && !vec_len (pending_events)
@@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
}
/* See you in the next life, don't be late
- * XXX: we may need priorities here
- */
- if (pthread_mutex_trylock (&q->mutex))
+ * XXX: we may need priorities here */
+ if (svm_msg_q_try_lock (mq))
return 0;
for (i = 0; i < n_to_dequeue; i++)
{
vec_add2 (fifo_events, e, 1);
- svm_queue_sub_raw (q, (u8 *) e);
+ svm_msg_q_sub_w_lock (mq, msg);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+ svm_msg_q_free_msg (mq, msg);
}
- /* The other side of the connection is not polling */
- if (q->cursize < (q->maxsize / 8))
- (void) pthread_cond_broadcast (&q->condvar);
- pthread_mutex_unlock (&q->mutex);
+ svm_msg_q_unlock (mq);
vec_append (fifo_events, pending_events);
vec_append (fifo_events, smm->pending_disconnects[thread_index]);
@@ -760,19 +752,20 @@ dump_thread_0_event_queue (void)
vlib_main_t *vm = &vlib_global_main;
u32 my_thread_index = vm->thread_index;
session_fifo_event_t _e, *e = &_e;
+ svm_msg_q_ring_t *ring;
stream_session_t *s0;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_t *mq;
int i, index;
- i8 *headp;
-
- svm_queue_t *q;
- q = smm->vpp_event_queues[my_thread_index];
- index = q->head;
+ mq = smm->vpp_event_queues[my_thread_index];
+ index = mq->q->head;
- for (i = 0; i < q->cursize; i++)
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
switch (e->event_type)
{
@@ -805,7 +798,7 @@ dump_thread_0_event_queue (void)
index++;
- if (index == q->maxsize)
+ if (index == mq->q->maxsize)
index = 0;
}
}
@@ -844,10 +837,11 @@ u8
session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- svm_queue_t *q;
+ svm_msg_q_t *mq;
session_fifo_event_t *pending_event_vector, *evt;
int i, index, found = 0;
- i8 *headp;
+ svm_msg_q_msg_t *msg;
+ svm_msg_q_ring_t *ring;
u8 thread_index;
ASSERT (e);
@@ -855,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
/*
* Search evt queue
*/
- q = smm->vpp_event_queues[thread_index];
- index = q->head;
- for (i = 0; i < q->cursize; i++)
+ mq = smm->vpp_event_queues[thread_index];
+ index = mq->q->head;
+ for (i = 0; i < mq->q->cursize; i++)
{
- headp = (i8 *) (&q->data[0] + q->elsize * index);
- clib_memcpy (e, headp, q->elsize);
+ msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+ ring = svm_msg_q_ring (mq, msg->ring_index);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
found = session_node_cmp_event (e, f);
if (found)
return 1;
- if (++index == q->maxsize)
+ if (++index == mq->q->maxsize)
index = 0;
}
/*
diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c
index 492b3cc7a2a..cea449163f8 100644
--- a/src/vnet/tls/tls.c
+++ b/src/vnet/tls/tls.c
@@ -41,63 +41,15 @@ tls_get_available_engine (void)
int
tls_add_vpp_q_evt (svm_fifo_t * f, u8 evt_type)
{
- session_fifo_event_t evt;
- svm_queue_t *q;
-
if (svm_fifo_set_event (f))
- {
- evt.fifo = f;
- evt.event_type = evt_type;
-
- q = session_manager_get_vpp_event_queue (f->master_thread_index);
- if (PREDICT_TRUE (q->cursize < q->maxsize))
- {
- svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
- }
- else
- {
- clib_warning ("vpp's evt q full");
- return -1;
- }
- }
+ session_send_io_evt_to_thread (f, evt_type);
return 0;
}
static inline int
tls_add_app_q_evt (application_t * app, stream_session_t * app_session)
{
- session_fifo_event_t evt;
- svm_queue_t *q;
-
- if (PREDICT_FALSE (app_session->session_state == SESSION_STATE_CLOSED))
- {
- /* Session is closed so app will never clean up. Flush rx fifo */
- u32 to_dequeue = svm_fifo_max_dequeue (app_session->server_rx_fifo);
- if (to_dequeue)
- svm_fifo_dequeue_drop (app_session->server_rx_fifo, to_dequeue);
- return 0;
- }
-
- if (app->cb_fns.builtin_app_rx_callback)
- return app->cb_fns.builtin_app_rx_callback (app_session);
-
- if (svm_fifo_set_event (app_session->server_rx_fifo))
- {
- evt.fifo = app_session->server_rx_fifo;
- evt.event_type = FIFO_EVENT_APP_RX;
- q = app->event_queue;
-
- if (PREDICT_TRUE (q->cursize < q->maxsize))
- {
- svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
- }
- else
- {
- clib_warning ("app evt q full");
- return -1;
- }
- }
- return 0;
+ return application_send_event (app, app_session, FIFO_EVENT_APP_RX);
}
u32