aboutsummaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2018-09-12 16:08:01 -0700
committerMarco Varlese <marco.varlese@suse.de>2018-09-14 07:59:41 +0000
commit86f04500ae027dc66e91519a006388e56df4ceff (patch)
tree7dfb1c7b6546637119596e845eee6a33d7fc3782 /src/vcl/vppcom.c
parent958192dc2abf427118a227d142f5b03807098cb2 (diff)
vcl: keep track of unexpected events
If sessions are marked as blocking, events for other sessions received while waiting for the blocking sessions, are added to a pending list and processed later. Change-Id: Ia6c71006b1c2bcb78af708390da0cd436af397cc Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vcl/vppcom.c')
-rw-r--r--src/vcl/vppcom.c544
1 files changed, 293 insertions, 251 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index df8f4cae964..cf3a770f045 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -478,8 +478,8 @@ vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
return sid;
}
-int
-vcl_handle_mq_ctrl_event (vcl_worker_t * wrk, session_event_t * e)
+static int
+vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
{
session_accepted_msg_t *accepted_msg;
session_disconnected_msg_t *disconnected_msg;
@@ -491,9 +491,10 @@ vcl_handle_mq_ctrl_event (vcl_worker_t * wrk, session_event_t * e)
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
- clib_warning ("unhandled rx: sid %u (0x%x)",
- e->fifo->client_session_index,
- e->fifo->client_session_index);
+ case FIFO_EVENT_APP_TX:
+ case SESSION_IO_EVT_CT_RX:
+ case SESSION_IO_EVT_CT_TX:
+ vec_add1 (wrk->unhandled_evts_vector, *e);
break;
case SESSION_CTRL_EVT_ACCEPTED:
accepted_msg = (session_accepted_msg_t *) e->data;
@@ -524,7 +525,7 @@ vcl_handle_mq_ctrl_event (vcl_worker_t * wrk, session_event_t * e)
break;
}
session->session_state = STATE_DISCONNECT;
- VDBG (0, "disconnected handle 0xllx, sid %u", disconnected_msg->handle,
+ VDBG (0, "disconnected handle 0x%llx, sid %u", disconnected_msg->handle,
sid);
break;
case SESSION_CTRL_EVT_RESET:
@@ -569,7 +570,7 @@ vppcom_wait_for_session_state_change (u32 session_index,
if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
continue;
e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (wrk->app_event_queue, &msg);
}
while (clib_time_now (&wrk->clib_time) < timeout);
@@ -1313,7 +1314,7 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
svm_msg_q_unlock (mq);
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
{
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
continue;
}
@@ -1406,7 +1407,7 @@ vppcom_session_read_segments (uint32_t session_handle,
svm_msg_q_unlock (mq);
if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
{
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
continue;
}
@@ -1559,7 +1560,7 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
if (!vcl_is_tx_evt_for_session (e, s->session_index,
s->our_evt_q != 0))
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
}
}
@@ -1680,21 +1681,124 @@ if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \
break; \
} \
-static int
-vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
- unsigned long n_bits, unsigned long *read_map,
- unsigned long *write_map, unsigned long *except_map,
- double time_to_wait, u32 * bits_set)
+static void
+vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
+ unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map,
+ unsigned long *except_map, u32 * bits_set)
{
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
+ u64 handle;
+ u32 sid;
+
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ if (!session)
+ break;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ if (!session)
+ break;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+ if (!session)
+ break;
+ sid = session->session_index;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
+ if (!session)
+ break;
+ sid = session->session_index;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vcl_session_table_lookup_listener (wrk, handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ sid = session->session_index;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (wrk, connected_msg);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ default:
+ clib_warning ("unhandled: %u", e->event_type);
+ break;
+ }
+}
+
+static int
+vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
+ unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
svm_msg_q_msg_t *msg;
session_event_t *e;
- u32 i, sid;
- u64 handle;
+ u32 i;
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
@@ -1730,104 +1834,10 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- switch (e->event_type)
- {
- case FIFO_EVENT_APP_RX:
- vcl_fifo_rx_evt_valid_or_break (e->fifo);
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- if (!session)
- break;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case FIFO_EVENT_APP_TX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- if (!session)
- break;
- if (sid < n_bits && write_map)
- {
- clib_bitmap_set_no_check (write_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_IO_EVT_CT_TX:
- vcl_fifo_rx_evt_valid_or_break (e->fifo);
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
- if (!session)
- break;
- sid = session->session_index;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_IO_EVT_CT_RX:
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
- if (!session)
- break;
- sid = session->session_index;
- if (sid < n_bits && write_map)
- {
- clib_bitmap_set_no_check (write_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_ACCEPTED:
- accepted_msg = (session_accepted_msg_t *) e->data;
- handle = accepted_msg->listener_handle;
- session = vcl_session_table_lookup_listener (wrk, handle);
- if (!session)
- {
- clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
- "listener handle %llx", getpid (), handle);
- break;
- }
-
- clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
- vcl_msg->accepted_msg = *accepted_msg;
- sid = session->session_index;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_CONNECTED:
- connected_msg = (session_connected_msg_t *) e->data;
- vcl_session_connected_handler (wrk, connected_msg);
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- disconnected_msg = (session_disconnected_msg_t *) e->data;
- sid = vcl_session_index_from_vpp_handle (wrk,
- disconnected_msg->handle);
- if (sid < n_bits && except_map)
- {
- clib_bitmap_set_no_check (except_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_RESET:
- sid = vcl_session_reset_handler (wrk,
- (session_reset_msg_t *) e->data);
- if (sid < n_bits && except_map)
- {
- clib_bitmap_set_no_check (except_map, sid, 1);
- *bits_set += 1;
- }
- break;
- default:
- clib_warning ("unhandled: %u", e->event_type);
- break;
- }
+ vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
+ except_map, bits_set);
svm_msg_q_free_msg (mq, msg);
}
-
vec_reset_length (wrk->mq_msg_vector);
return *bits_set;
}
@@ -1898,7 +1908,7 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *session = 0;
- int rv;
+ int rv, i;
ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
@@ -1972,6 +1982,13 @@ check_rd:
check_mq:
+ for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
+ {
+ vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
+ read_map, write_map, except_map, &bits_set);
+ }
+ vec_reset_length (wrk->unhandled_evts_vector);
+
if (vcm->cfg.use_mq_eventfd)
vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
time_to_wait, &bits_set);
@@ -2275,10 +2292,9 @@ done:
return rv;
}
-static int
-vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
- struct epoll_event *events, u32 maxevents,
- double wait_for_time, u32 * num_ev)
+static inline void
+vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
+ struct epoll_event *events, u32 * num_ev)
{
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
@@ -2287,9 +2303,131 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
u32 sid = ~0, session_events;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
+ u8 add_event = 0;
+
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+ sid = session->session_index;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
+ sid = session->session_index;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vcl_session_table_lookup_listener (wrk, handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session_events))
+ break;
+
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (wrk, connected_msg);
+ /* Generate EPOLLOUT because there's no connected event */
+ sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (EPOLLOUT & session_events)
+ {
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ }
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
+ if (!(session = vcl_session_get (wrk, sid)))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
+ if (!(session = vcl_session_get (wrk, sid)))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ break;
+ default:
+ VDBG (0, "unhandled: %u", e->event_type);
+ break;
+ }
+
+ if (add_event)
+ {
+ events[*num_ev].data.u64 = session_evt_data;
+ if (EPOLLONESHOT & session_events)
+ {
+ session = vcl_session_get (wrk, sid);
+ session->vep.ev.events = 0;
+ }
+ *num_ev += 1;
+ }
+}
+
+static int
+vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
+ struct epoll_event *events, u32 maxevents,
+ double wait_for_time, u32 * num_ev)
+{
svm_msg_q_msg_t *msg;
session_event_t *e;
- u8 add_event;
int i;
svm_msg_q_lock (mq);
@@ -2320,141 +2458,26 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- add_event = 0;
- switch (e->event_type)
- {
- case FIFO_EVENT_APP_RX:
- ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
- vcl_fifo_rx_evt_valid_or_break (e->fifo);
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case FIFO_EVENT_APP_TX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (!(EPOLLOUT & session_events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_IO_EVT_CT_TX:
- vcl_fifo_rx_evt_valid_or_break (e->fifo);
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
- sid = session->session_index;
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_IO_EVT_CT_RX:
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
- sid = session->session_index;
- session_events = session->vep.ev.events;
- if (!(EPOLLOUT & session_events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_CTRL_EVT_ACCEPTED:
- accepted_msg = (session_accepted_msg_t *) e->data;
- handle = accepted_msg->listener_handle;
- session = vcl_session_table_lookup_listener (wrk, handle);
- if (!session)
- {
- clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
- "listener handle %llx", getpid (), handle);
- break;
- }
-
- clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
- vcl_msg->accepted_msg = *accepted_msg;
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session_events))
- break;
-
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_CTRL_EVT_CONNECTED:
- connected_msg = (session_connected_msg_t *) e->data;
- vcl_session_connected_handler (wrk, connected_msg);
- /* Generate EPOLLOUT because there's no connected event */
- sid = vcl_session_index_from_vpp_handle (wrk,
- connected_msg->handle);
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (EPOLLOUT & session_events)
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- disconnected_msg = (session_disconnected_msg_t *) e->data;
- sid = vcl_session_index_from_vpp_handle (wrk,
- disconnected_msg->handle);
- if (!(session = vcl_session_get (wrk, sid)))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
- session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
- break;
- case SESSION_CTRL_EVT_RESET:
- sid = vcl_session_reset_handler (wrk,
- (session_reset_msg_t *) e->data);
- if (!(session = vcl_session_get (wrk, sid)))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
- session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
- break;
- default:
- VDBG (0, "unhandled: %u", e->event_type);
- svm_msg_q_free_msg (mq, msg);
- continue;
- }
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
svm_msg_q_free_msg (mq, msg);
-
- if (add_event)
+ if (*num_ev == maxevents)
{
- events[*num_ev].data.u64 = session_evt_data;
- if (EPOLLONESHOT & session_events)
- {
- session = vcl_session_get (wrk, sid);
- session->vep.ev.events = 0;
- }
- *num_ev += 1;
- if (*num_ev == maxevents)
- break;
+ i += 1;
+ break;
}
}
- vec_reset_length (wrk->mq_msg_vector);
+ vec_delete (wrk->mq_msg_vector, i, 0);
+
return *num_ev;
}
static int
vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
- int maxevents, double wait_for_time)
+ int maxevents, u32 n_evts, double wait_for_time)
{
vcl_cut_through_registration_t *cr;
double total_wait = 0, wait_slice;
- u32 num_ev = 0;
int rv;
wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
@@ -2465,31 +2488,30 @@ vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
vcl_ct_registration_lock (wrk);
/* *INDENT-OFF* */
pool_foreach (cr, wrk->cut_through_registrations, ({
- vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &num_ev);
+ vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
}));
/* *INDENT-ON* */
vcl_ct_registration_unlock (wrk);
rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
- maxevents, num_ev ? 0 : wait_slice,
- &num_ev);
+ maxevents, n_evts ? 0 : wait_slice,
+ &n_evts);
if (rv)
total_wait += wait_slice;
- if (num_ev)
- return num_ev;
+ if (n_evts)
+ return n_evts;
}
while (total_wait < wait_for_time);
- return (int) num_ev;
+ return n_evts;
}
static int
vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
- int maxevents, double wait_for_time)
+ int maxevents, u32 n_evts, double wait_for_time)
{
vcl_mq_evt_conn_t *mqc;
int __clib_unused n_read;
int n_mq_evts, i;
- u32 n_evts = 0;
u64 buf;
vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
@@ -2511,6 +2533,8 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
{
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *vep_session;
+ u32 n_evts = 0;
+ int i;
if (PREDICT_FALSE (maxevents <= 0))
{
@@ -2532,10 +2556,28 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
memset (events, 0, sizeof (*events) * maxevents);
+ if (vec_len (wrk->unhandled_evts_vector))
+ {
+ for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
+ {
+ vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
+ events, &n_evts);
+ if (n_evts == maxevents)
+ {
+ i += 1;
+ break;
+ }
+ }
+
+ vec_delete (wrk->unhandled_evts_vector, i, 0);
+ }
+
if (vcm->cfg.use_mq_eventfd)
- return vppcom_epoll_wait_eventfd (wrk, events, maxevents, wait_for_time);
+ return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
+ wait_for_time);
- return vppcom_epoll_wait_condvar (wrk, events, maxevents, wait_for_time);
+ return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
+ wait_for_time);
}
int