summaryrefslogtreecommitdiffstats
path: root/src/vcl
diff options
context:
space:
mode:
Diffstat (limited to 'src/vcl')
-rw-r--r--src/vcl/vppcom.c136
1 files changed, 33 insertions, 103 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index cf8bb72d451..21892438c37 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -25,15 +25,14 @@ __thread uword __vcl_worker_index = ~0;
static inline int
vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
{
- svm_msg_q_msg_t *msg;
- u32 n_msgs;
- int i;
+ u32 n_msgs = 0, sz, len;
- n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
- for (i = 0; i < n_msgs; i++)
+ while ((sz = svm_msg_q_size (mq)))
{
- vec_add2 (wrk->mq_msg_vector, msg, 1);
- svm_msg_q_sub_w_lock (mq, msg);
+ len = vec_len (wrk->mq_msg_vector);
+ vec_validate (wrk->mq_msg_vector, len + sz - 1);
+ svm_msg_q_sub_raw_batch (mq, wrk->mq_msg_vector + len, sz);
+ n_msgs += sz;
}
return n_msgs;
}
@@ -1101,18 +1100,15 @@ vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
}
void
-vcl_flush_mq_events (void)
+vcl_worker_flush_mq_events (vcl_worker_t *wrk)
{
- vcl_worker_t *wrk = vcl_worker_get_current ();
svm_msg_q_msg_t *msg;
session_event_t *e;
svm_msg_q_t *mq;
int i;
mq = wrk->app_event_queue;
- svm_msg_q_lock (mq);
vcl_mq_dequeue_batch (wrk, mq, ~0);
- svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
{
@@ -1125,6 +1121,12 @@ vcl_flush_mq_events (void)
vcl_handle_pending_wrk_updates (wrk);
}
+void
+vcl_flush_mq_events (void)
+{
+ vcl_worker_flush_mq_events (vcl_worker_get_current ());
+}
+
static int
vppcom_session_unbind (u32 session_handle)
{
@@ -1539,11 +1541,11 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
vcl_session_t *listen_session = 0;
vcl_session_t *client_session = 0;
vcl_session_msg_t *evt;
- svm_msg_q_msg_t msg;
- session_event_t *e;
u8 is_nonblocking;
int rv;
+again:
+
listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
if (!listen_session)
return VPPCOM_EBADFD;
@@ -1567,19 +1569,9 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
return VPPCOM_EAGAIN;
- if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
- return VPPCOM_EAGAIN;
-
- e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
- if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
- {
- vcl_handle_mq_event (wrk, e);
- svm_msg_q_free_msg (wrk->app_event_queue, &msg);
- continue;
- }
- clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
- svm_msg_q_free_msg (wrk->app_event_queue, &msg);
- break;
+ svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_flush_mq_events (wrk);
+ goto again;
}
handle:
@@ -1785,12 +1777,6 @@ vppcom_session_stream_connect (uint32_t session_handle,
return rv;
}
-static u8
-vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
-{
- return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
-}
-
static inline int
vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
u8 peek)
@@ -1799,7 +1785,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
int rv, n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
- svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
u8 is_ct;
@@ -1844,16 +1829,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_lock (mq);
- if (svm_msg_q_is_empty (mq))
- svm_msg_q_wait (mq);
-
- svm_msg_q_sub_w_lock (mq, &msg);
- e = svm_msg_q_msg_data (mq, &msg);
- svm_msg_q_unlock (mq);
- if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
- vcl_handle_mq_event (wrk, e);
- svm_msg_q_free_msg (mq, &msg);
+
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_flush_mq_events (wrk);
}
}
@@ -1924,8 +1902,6 @@ vppcom_session_read_segments (uint32_t session_handle,
int n_read = 0, is_nonblocking;
vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
- svm_msg_q_msg_t msg;
- session_event_t *e;
svm_msg_q_t *mq;
u8 is_ct;
@@ -1959,16 +1935,9 @@ vppcom_session_read_segments (uint32_t session_handle,
if (is_ct)
svm_fifo_unset_event (s->rx_fifo);
svm_fifo_unset_event (rx_fifo);
- svm_msg_q_lock (mq);
- if (svm_msg_q_is_empty (mq))
- svm_msg_q_wait (mq);
-
- svm_msg_q_sub_w_lock (mq, &msg);
- e = svm_msg_q_msg_data (mq, &msg);
- svm_msg_q_unlock (mq);
- if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
- vcl_handle_mq_event (wrk, e);
- svm_msg_q_free_msg (mq, &msg);
+
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_flush_mq_events (wrk);
}
}
@@ -2015,12 +1984,6 @@ vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
s->rx_bytes_pending -= n_bytes;
}
-static u8
-vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
-{
- return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
-}
-
always_inline u8
vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
{
@@ -2037,9 +2000,7 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
{
int n_write, is_nonblocking;
session_evt_type_t et;
- svm_msg_q_msg_t msg;
svm_fifo_t *tx_fifo;
- session_event_t *e;
svm_msg_q_t *mq;
u8 is_ct;
@@ -2077,17 +2038,9 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
- svm_msg_q_lock (mq);
- if (svm_msg_q_is_empty (mq))
- svm_msg_q_wait (mq);
- svm_msg_q_sub_w_lock (mq, &msg);
- e = svm_msg_q_msg_data (mq, &msg);
- svm_msg_q_unlock (mq);
-
- if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
- vcl_handle_mq_event (wrk, e);
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+ vcl_worker_flush_mq_events (wrk);
}
}
@@ -2290,35 +2243,22 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
session_event_t *e;
u32 i;
- svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
if (*bits_set)
- {
- svm_msg_q_unlock (mq);
- return 0;
- }
+ return 0;
if (!time_to_wait)
- {
- svm_msg_q_unlock (mq);
- return 0;
- }
+ return 0;
else if (time_to_wait < 0)
- {
- svm_msg_q_wait (mq);
- }
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
else
{
if (svm_msg_q_timedwait (mq, time_to_wait))
- {
- svm_msg_q_unlock (mq);
- return 0;
- }
+ return 0;
}
}
vcl_mq_dequeue_batch (wrk, mq, ~0);
- svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
{
@@ -2939,30 +2879,20 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
goto handle_dequeued;
- svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
if (!wait_for_time)
- {
- svm_msg_q_unlock (mq);
- return 0;
- }
+ return 0;
else if (wait_for_time < 0)
- {
- svm_msg_q_wait (mq);
- }
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
else
{
if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
- {
- svm_msg_q_unlock (mq);
- return 0;
- }
+ return 0;
}
}
ASSERT (maxevents > *num_ev);
vcl_mq_dequeue_batch (wrk, mq, ~0);
- svm_msg_q_unlock (mq);
handle_dequeued:
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)