summaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vcl/vppcom.c')
-rw-r--r--src/vcl/vppcom.c479
1 files changed, 319 insertions, 160 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index a74e55af634..8572b69aa13 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -248,47 +248,6 @@ vppcom_wait_for_app_state_change (app_state_t app_state)
return VPPCOM_ETIMEDOUT;
}
-static u32
-vcl_ct_registration_add (svm_msg_q_t * mq, u32 sid)
-{
- vcl_cut_through_registration_t *cr;
- pool_get (vcm->cut_through_registrations, cr);
- cr->mq = mq;
- cr->sid = sid;
- return (cr - vcm->cut_through_registrations);
-}
-
-static void
-vcl_ct_registration_del (u32 ct_index)
-{
- pool_put_index (vcm->cut_through_registrations, ct_index);
-}
-
-static vcl_session_t *
-vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
-{
- vcl_session_t *s;
- s = vcl_session_get (f->client_session_index);
- if (s)
- {
- /* rx fifo */
- if (type == 0 && s->rx_fifo == f)
- return s;
- /* tx fifo */
- if (type == 1 && s->tx_fifo == f)
- return s;
- }
- s = vcl_session_get (f->master_session_index);
- if (s)
- {
- if (type == 0 && s->rx_fifo == f)
- return s;
- if (type == 1 && s->tx_fifo == f)
- return s;
- }
- return 0;
-}
-
static void
vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
session_handle_t handle, int retval)
@@ -303,15 +262,34 @@ vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
app_send_ctrl_evt_to_vpp (mq, app_evt);
}
+static void
+vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
+ session_handle_t handle, int retval)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_disconnected_reply_msg_t *rmp;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt,
+ SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+ rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = handle;
+ rmp->context = context;
+ rmp->retval = retval;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
static u32
vcl_session_accepted_handler (session_accepted_msg_t * mp)
{
vcl_session_t *session, *listen_session;
svm_fifo_t *rx_fifo, *tx_fifo;
- u32 session_index;
+ u32 session_index, vpp_wrk_index;
+ svm_msg_q_t *evt_q;
VCL_SESSION_LOCK ();
+ session = vcl_session_alloc ();
+ session_index = vcl_session_index (session);
+
listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
if (!listen_session)
{
@@ -322,13 +300,11 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
getpid (), mp->listener_handle);
vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
VNET_API_ERROR_INVALID_ARGUMENT);
+ vcl_session_free (session);
+ VCL_SESSION_UNLOCK ();
return VCL_INVALID_SESSION_INDEX;
}
- pool_get (vcm->sessions, session);
- memset (session, 0, sizeof (*session));
- session_index = (u32) (session - vcm->sessions);
-
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
@@ -339,10 +315,11 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
svm_msg_q_t *);
vcl_wait_for_memory (session->vpp_evt_q);
- session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
- session_index);
rx_fifo->master_session_index = session_index;
tx_fifo->master_session_index = session_index;
+ vec_validate (vcm->vpp_event_queues, 0);
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcm->vpp_event_queues[0] = evt_q;
}
else
{
@@ -350,6 +327,10 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
svm_msg_q_t *);
rx_fifo->client_session_index = session_index;
tx_fifo->client_session_index = session_index;
+
+ vpp_wrk_index = tx_fifo->master_thread_index;
+ vec_validate (vcm->vpp_event_queues, vpp_wrk_index);
+ vcm->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
}
session->vpp_handle = mp->handle;
@@ -382,9 +363,10 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
static u32
vcl_session_connected_handler (session_connected_msg_t * mp)
{
- vcl_session_t *session = 0;
- u32 session_index;
+ u32 session_index, vpp_wrk_index;
svm_fifo_t *rx_fifo, *tx_fifo;
+ vcl_session_t *session = 0;
+ svm_msg_q_t *evt_q;
int rv = VPPCOM_OK;
session_index = mp->context;
@@ -425,12 +407,19 @@ done:
svm_msg_q_t *);
session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
svm_msg_q_t *);
- session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
- session_index);
+
+ vec_validate (vcm->vpp_event_queues, 0);
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcm->vpp_event_queues[0] = evt_q;
}
else
- session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_msg_q_t *);
+ {
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ vpp_wrk_index = tx_fifo->master_thread_index;
+ vec_validate (vcm->vpp_event_queues, vpp_wrk_index);
+ vcm->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
+ }
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
@@ -609,13 +598,23 @@ done:
return rv;
}
+static svm_msg_q_t *
+vcl_session_vpp_evt_q (vcl_session_t * s)
+{
+ if (vcl_session_is_ct (s))
+ return vcm->vpp_event_queues[0];
+ else
+ return vcm->vpp_event_queues[s->tx_fifo->master_thread_index];
+}
+
static int
vppcom_session_disconnect (u32 session_index)
{
- int rv;
+ svm_msg_q_t *vpp_evt_q;
vcl_session_t *session;
- u64 vpp_handle;
session_state_t state;
+ u64 vpp_handle;
+ int rv;
VCL_SESSION_LOCK_AND_GET (session_index, &session);
@@ -637,8 +636,9 @@ vppcom_session_disconnect (u32 session_index)
if (state & STATE_CLOSE_ON_EMPTY)
{
- vppcom_send_disconnect_session_reply (vpp_handle, session_index,
- 0 /* rv */ );
+ vpp_evt_q = vcl_session_vpp_evt_q (session);
+ vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index,
+ vpp_handle, 0);
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
"REPLY...", getpid (), vpp_handle, session_index);
}
@@ -666,6 +666,11 @@ vppcom_app_create (char *app_name)
{
vcm->init = 1;
vppcom_cfg (&vcm->cfg);
+ vcl_cfg = &vcm->cfg;
+
+ vcm->mqs_epfd = -1;
+ if (vcl_cfg->use_mq_eventfd)
+ vcm->mqs_epfd = epoll_create (1);
clib_spinlock_init (&vcm->session_fifo_lockp);
clib_fifo_validate (vcm->client_session_index_fifo,
@@ -676,11 +681,16 @@ vppcom_app_create (char *app_name)
vcm->main_cpu = os_get_thread_index ();
vcm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+ vcm->ct_registration_by_mq = hash_create (0, sizeof (uword));
+ clib_spinlock_init (&vcm->ct_registration_lock);
clib_time_init (&vcm->clib_time);
vppcom_init_error_string_table ();
svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
20 /* timeout in secs */ );
+ vec_validate (vcm->mq_events, 64);
+ vec_validate (vcm->mq_msg_vector, 128);
+ vec_reset_length (vcm->mq_msg_vector);
}
if (vcm->my_client_index == ~0)
@@ -868,8 +878,21 @@ vppcom_session_close (uint32_t session_index)
}
VCL_SESSION_LOCK_AND_GET (session_index, &session);
- if (session->our_evt_q)
- vcl_ct_registration_del (session->ct_registration);
+ if (vcl_session_is_ct (session))
+ {
+ vcl_cut_through_registration_t *ctr;
+ uword mq_addr;
+
+ mq_addr = pointer_to_uword (session->our_evt_q);
+ ctr = vcl_ct_registration_lock_and_lookup (mq_addr);
+ ASSERT (ctr);
+ if (ctr->epoll_evt_conn_index != ~0)
+ vcl_mq_epoll_del_evfd (ctr->epoll_evt_conn_index);
+ VDBG (0, "Removing ct registration %u",
+ vcl_ct_registration_index (ctr));
+ vcl_ct_registration_del (ctr);
+ vcl_ct_registration_unlock ();
+ }
vpp_handle = session->vpp_handle;
if (vpp_handle != ~0)
@@ -1084,6 +1107,7 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
handle:
client_session_index = vcl_session_accepted_handler (&accepted_msg);
+ listen_session = vcl_session_get (listen_session_index);
VCL_SESSION_LOCK_AND_GET (client_session_index, &client_session);
rv = client_session_index;
@@ -1115,6 +1139,7 @@ handle:
svm_msg_q_t *);
else
vpp_evt_q = client_session->vpp_evt_q;
+
vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
client_session->vpp_handle, 0);
@@ -1287,23 +1312,25 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
VCL_SESSION_UNLOCK ();
mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
+ svm_fifo_unset_event (rx_fifo);
is_full = svm_fifo_is_full (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
{
- svm_fifo_unset_event (rx_fifo);
if (is_nonblocking)
{
rv = VPPCOM_OK;
goto done;
}
- svm_msg_q_lock (mq);
while (1)
{
+ svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
+
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
if (!vcl_is_rx_evt_for_session (e, session_index,
s->our_evt_q != 0))
{
@@ -1311,25 +1338,29 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
svm_msg_q_free_msg (mq, &msg);
continue;
}
+ svm_fifo_unset_event (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
{
svm_msg_q_free_msg (mq, &msg);
continue;
}
svm_msg_q_free_msg (mq, &msg);
- svm_msg_q_unlock (mq);
break;
}
}
if (s->is_dgram)
- n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 1, peek);
+ n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
else
- n_read = app_recv_stream_raw (rx_fifo, buf, n, 1, peek);
+ n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
if (vcl_session_is_ct (s) && is_full)
- app_send_io_evt_to_vpp (s->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
- SVM_Q_WAIT);
+ {
+ /* If the peer is not polling send notification */
+ if (!svm_fifo_has_event (s->rx_fifo))
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
+ SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ }
if (VPPCOM_DEBUG > 2)
{
@@ -1452,18 +1483,20 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
rv = VPPCOM_EWOULDBLOCK;
goto done;
}
- svm_msg_q_lock (mq);
while (1)
{
+ svm_msg_q_lock (mq);
if (!svm_fifo_is_full (tx_fifo))
{
svm_msg_q_unlock (mq);
break;
}
- if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
- continue;
+ while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
+ ;
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
+
if (!vcl_is_tx_evt_for_session (e, session_index,
s->our_evt_q != 0))
{
@@ -1477,7 +1510,6 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
continue;
}
svm_msg_q_free_msg (mq, &msg);
- svm_msg_q_unlock (mq);
break;
}
}
@@ -1510,6 +1542,31 @@ done:
return rv;
}
+static vcl_session_t *
+vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
+{
+ vcl_session_t *s;
+ s = vcl_session_get (f->client_session_index);
+ if (s)
+ {
+ /* rx fifo */
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ /* tx fifo */
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ s = vcl_session_get (f->master_session_index);
+ if (s)
+ {
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ return 0;
+}
+
static inline int
vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
{
@@ -1552,6 +1609,22 @@ vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
return svm_fifo_max_enqueue (session->tx_fifo);
}
+static inline int
+vcl_mq_dequeue_batch (svm_msg_q_t * mq)
+{
+ svm_msg_q_msg_t *msg;
+ u32 n_msgs;
+ int i;
+
+ n_msgs = svm_msg_q_size (mq);
+ for (i = 0; i < n_msgs; i++)
+ {
+ vec_add2 (vcm->mq_msg_vector, msg, 1);
+ svm_msg_q_sub_w_lock (mq, msg);
+ }
+ return n_msgs;
+}
+
static int
vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
unsigned long *read_map, unsigned long *write_map,
@@ -1559,12 +1632,13 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
u32 * bits_set)
{
session_disconnected_msg_t *disconnected_msg;
+ session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
- svm_msg_q_msg_t msg;
+ svm_msg_q_msg_t *msg;
session_event_t *e;
- u32 n_msgs, i, sid;
+ u32 i, sid;
u64 handle;
svm_msg_q_lock (mq);
@@ -1594,23 +1668,20 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
}
}
}
+ vcl_mq_dequeue_batch (mq);
svm_msg_q_unlock (mq);
- n_msgs = svm_msg_q_size (mq);
- for (i = 0; i < n_msgs; i++)
+ for (i = 0; i < vec_len (vcm->mq_msg_vector); i++)
{
- if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
- {
- clib_warning ("message queue returned");
- continue;
- }
- e = svm_msg_q_msg_data (mq, &msg);
+ msg = vec_elt_at_index (vcm->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
sid = e->fifo->client_session_index;
session = vcl_session_get (sid);
- if (!session || svm_fifo_is_empty (session->rx_fifo))
+ svm_fifo_unset_event (session->rx_fifo);
+ if (svm_fifo_is_empty (session->rx_fifo))
break;
if (sid < n_bits && read_map)
{
@@ -1632,7 +1703,8 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
case SESSION_IO_EVT_CT_TX:
session = vcl_ct_session_get_from_fifo (e->fifo, 0);
sid = vcl_session_index (session);
- if (!session || svm_fifo_is_empty (session->rx_fifo))
+ svm_fifo_unset_event (session->rx_fifo);
+ if (svm_fifo_is_empty (session->rx_fifo))
break;
if (sid < n_bits && read_map)
{
@@ -1672,6 +1744,10 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
*bits_set += 1;
}
break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (connected_msg);
+ break;
case SESSION_CTRL_EVT_DISCONNECTED:
disconnected_msg = (session_disconnected_msg_t *) e->data;
sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
@@ -1685,20 +1761,73 @@ vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
clib_warning ("unhandled: %u", e->event_type);
break;
}
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
}
+ vec_reset_length (vcm->mq_msg_vector);
return *bits_set;
}
+static int
+vppcom_select_condvar (unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
+ double total_wait = 0, wait_slice;
+ vcl_cut_through_registration_t *cr;
+
+ time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
+ do
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
+ 0, bits_set);
+ }));
+ /* *INDENT-ON* */
+
+ vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
+ except_map, time_to_wait, bits_set);
+ total_wait += wait_slice;
+ if (*bits_set)
+ return *bits_set;
+ }
+ while (total_wait < time_to_wait);
+
+ return 0;
+}
+
+static int
+vppcom_select_eventfd (unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
+ vcl_mq_evt_conn_t *mqc;
+ int __clib_unused n_read;
+ int n_mq_evts, i;
+ u64 buf;
+
+ vec_validate (vcm->mq_events, pool_elts (vcm->mq_evt_conns));
+ n_mq_evts = epoll_wait (vcm->mqs_epfd, vcm->mq_events,
+ vec_len (vcm->mq_events), time_to_wait);
+ for (i = 0; i < n_mq_evts; i++)
+ {
+ mqc = vcl_mq_evt_conn_get (vcm->mq_events[i].data.u32);
+ n_read = read (mqc->mq_fd, &buf, sizeof (buf));
+ vcl_select_handle_mq (mqc->mq, n_bits, read_map, write_map,
+ except_map, 0, bits_set);
+ }
+
+ return (n_mq_evts > 0 ? (int) *bits_set : 0);
+}
+
int
vppcom_select (unsigned long n_bits, unsigned long *read_map,
unsigned long *write_map, unsigned long *except_map,
double time_to_wait)
{
u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
- vcl_cut_through_registration_t *cr;
- double total_wait = 0, wait_slice;
vcl_session_t *session = 0;
int rv;
@@ -1736,17 +1865,14 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
/* *INDENT-OFF* */
clib_bitmap_foreach (sid, vcm->wr_bitmap, ({
- VCL_SESSION_LOCK();
if (!(session = vcl_session_get (sid)))
{
- VCL_SESSION_UNLOCK();
VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
getpid (), sid);
return VPPCOM_EBADFD;
}
rv = svm_fifo_is_full (session->tx_fifo);
- VCL_SESSION_UNLOCK();
if (!rv)
{
clib_bitmap_set_no_check (write_map, sid, 1);
@@ -1757,18 +1883,16 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
check_rd:
if (!read_map)
goto check_mq;
+
clib_bitmap_foreach (sid, vcm->rd_bitmap, ({
- VCL_SESSION_LOCK();
if (!(session = vcl_session_get (sid)))
{
- VCL_SESSION_UNLOCK();
VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
getpid (), sid);
return VPPCOM_EBADFD;
}
rv = vppcom_session_read_ready (session);
- VCL_SESSION_UNLOCK();
if (rv)
{
clib_bitmap_set_no_check (read_map, sid, 1);
@@ -1778,23 +1902,13 @@ check_rd:
/* *INDENT-ON* */
check_mq:
- wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
- do
- {
- /* *INDENT-OFF* */
- pool_foreach (cr, vcm->cut_through_registrations, ({
- vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
- 0, &bits_set);
- }));
- /* *INDENT-ON* */
- vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
- except_map, time_to_wait, &bits_set);
- total_wait += wait_slice;
- if (bits_set)
- return bits_set;
- }
- while (total_wait < time_to_wait);
+ if (vcm->cfg.use_mq_eventfd)
+ vppcom_select_eventfd (n_bits, read_map, write_map, except_map,
+ time_to_wait, &bits_set);
+ else
+ vppcom_select_condvar (n_bits, read_map, write_map, except_map,
+ time_to_wait, &bits_set);
return (bits_set);
}
@@ -2130,11 +2244,11 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
- u32 sid = ~0, session_events, n_msgs;
u64 session_evt_data = ~0, handle;
+ u32 sid = ~0, session_events;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
- svm_msg_q_msg_t msg;
+ svm_msg_q_msg_t *msg;
session_event_t *e;
u8 add_event;
int i;
@@ -2153,61 +2267,58 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
}
else
{
- if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
+ if (svm_msg_q_timedwait (mq, wait_for_time / 1e3) < 0)
{
svm_msg_q_unlock (mq);
return 0;
}
}
}
+ vcl_mq_dequeue_batch (mq);
svm_msg_q_unlock (mq);
- n_msgs = svm_msg_q_size (mq);
- for (i = 0; i < n_msgs; i++)
+ for (i = 0; i < vec_len (vcm->mq_msg_vector); i++)
{
- if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
- {
- clib_warning ("message queue returned");
- continue;
- }
- e = svm_msg_q_msg_data (mq, &msg);
+ msg = vec_elt_at_index (vcm->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
add_event = 0;
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
sid = e->fifo->client_session_index;
- clib_spinlock_lock (&vcm->sessions_lockp);
session = vcl_session_get (sid);
session_events = session->vep.ev.events;
- if ((EPOLLIN & session->vep.ev.events)
- && !svm_fifo_is_empty (session->rx_fifo))
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ svm_fifo_unset_event (session->rx_fifo);
+ if (!svm_fifo_is_empty (session->rx_fifo))
{
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
}
- clib_spinlock_unlock (&vcm->sessions_lockp);
break;
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
- clib_spinlock_lock (&vcm->sessions_lockp);
session = vcl_session_get (sid);
session_events = session->vep.ev.events;
- if ((EPOLLOUT & session_events)
- && !svm_fifo_is_full (session->tx_fifo))
+ if (!(EPOLLOUT & session_events))
+ break;
+ if (!svm_fifo_is_full (session->tx_fifo))
{
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
}
- clib_spinlock_unlock (&vcm->sessions_lockp);
break;
case SESSION_IO_EVT_CT_TX:
session = vcl_ct_session_get_from_fifo (e->fifo, 0);
sid = vcl_session_index (session);
session_events = session->vep.ev.events;
- if ((EPOLLIN & session->vep.ev.events)
- && !svm_fifo_is_empty (session->rx_fifo))
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ svm_fifo_unset_event (session->rx_fifo);
+ if (!svm_fifo_is_empty (session->rx_fifo))
{
add_event = 1;
events[*num_ev].events |= EPOLLIN;
@@ -2218,8 +2329,9 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
session = vcl_ct_session_get_from_fifo (e->fifo, 1);
sid = vcl_session_index (session);
session_events = session->vep.ev.events;
- if ((EPOLLOUT & session_events)
- && !svm_fifo_is_full (session->tx_fifo))
+ if (!(EPOLLOUT & session_events))
+ break;
+ if (!svm_fifo_is_full (session->tx_fifo))
{
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
@@ -2276,11 +2388,10 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
break;
default:
clib_warning ("unhandled: %u", e->event_type);
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
continue;
}
-
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
if (add_event)
{
@@ -2297,17 +2408,70 @@ vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
break;
}
}
+
+ vec_reset_length (vcm->mq_msg_vector);
return *num_ev;
}
+static int
+vppcom_epoll_wait_condvar (struct epoll_event *events, int maxevents,
+ double wait_for_time)
+{
+ vcl_cut_through_registration_t *cr;
+ double total_wait = 0, wait_slice;
+ u32 num_ev = 0;
+ int rv;
+
+ wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
+
+ do
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
+ }));
+ /* *INDENT-ON* */
+
+ rv = vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
+ num_ev ? 0 : wait_slice, &num_ev);
+ if (rv)
+ total_wait += wait_slice;
+ if (num_ev)
+ return num_ev;
+ }
+ while (total_wait < wait_for_time);
+ return (int) num_ev;
+}
+
+static int
+vppcom_epoll_wait_eventfd (struct epoll_event *events, int maxevents,
+ double wait_for_time)
+{
+ vcl_mq_evt_conn_t *mqc;
+ int __clib_unused n_read;
+ int n_mq_evts, i;
+ u32 n_evts = 0;
+ u64 buf;
+
+ vec_validate (vcm->mq_events, pool_elts (vcm->mq_evt_conns));
+ n_mq_evts = epoll_wait (vcm->mqs_epfd, vcm->mq_events,
+ vec_len (vcm->mq_events), wait_for_time);
+ for (i = 0; i < n_mq_evts; i++)
+ {
+ mqc = vcl_mq_evt_conn_get (vcm->mq_events[i].data.u32);
+ n_read = read (mqc->mq_fd, &buf, sizeof (buf));
+ vcl_epoll_wait_handle_mq (mqc->mq, events, maxevents, 0, &n_evts);
+ }
+
+ return (int) n_evts;
+}
+
int
vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
int maxevents, double wait_for_time)
{
- vcl_cut_through_registration_t *cr;
vcl_session_t *vep_session;
- double total_wait = 0, wait_slice;
- u32 num_ev = 0;
if (PREDICT_FALSE (maxevents <= 0))
{
@@ -2328,25 +2492,11 @@ vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
clib_spinlock_unlock (&vcm->sessions_lockp);
memset (events, 0, sizeof (*events) * maxevents);
- wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
-
- do
- {
- /* *INDENT-OFF* */
- pool_foreach (cr, vcm->cut_through_registrations, ({
- vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
- }));
- /* *INDENT-ON* */
- vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
- num_ev ? 0 : wait_slice, &num_ev);
- total_wait += wait_slice;
- if (num_ev)
- return num_ev;
- }
- while (total_wait < wait_for_time);
+ if (vcm->cfg.use_mq_eventfd)
+ return vppcom_epoll_wait_eventfd (events, maxevents, wait_for_time);
- return num_ev;
+ return vppcom_epoll_wait_condvar (events, maxevents, wait_for_time);
}
int
@@ -2917,12 +3067,15 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
return VPPCOM_EAFNOSUPPORT;
}
- if (session->transport.is_ip4)
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
- sizeof (ip4_address_t));
- else
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
- sizeof (ip6_address_t));
+ if (ep)
+ {
+ if (session->transport.is_ip4)
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
+ sizeof (ip4_address_t));
+ else
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
+ sizeof (ip6_address_t));
+ }
return rv;
}
@@ -3054,6 +3207,12 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
return num_ev;
}
+int
+vppcom_mq_epoll_fd (void)
+{
+ return vcm->mqs_epfd;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*