aboutsummaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2018-07-17 10:46:29 -0700
committerFlorin Coras <florin.coras@gmail.com>2018-07-27 17:40:29 +0000
commit54693d23307ce8944a4d97379efd3bd4dcf0485c (patch)
tree14ee8cded17a87405de9c0cc9ba3fe7370aabc7f /src/vcl/vppcom.c
parent5df580eec93c0c6fc07dd38f8713f671565b9c38 (diff)
vcl: use events for epoll/select/read/write
Have vcl poll and wait on the event message queues as opposed to constantly polling the session fifos. This also adds event signaling to cut through sessions. On the downside, because we can't wait on multiple condvars, i.e., when we have multiple message queues because of cut-through registrations, we do timed waits. Change-Id: I29ade95dba449659fe46008bb1af502276a7c5fd Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vcl/vppcom.c')
-rw-r--r--src/vcl/vppcom.c1530
1 files changed, 910 insertions, 620 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 60c649d325f..d1c4413b2a8 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -21,6 +21,42 @@
#include <vcl/vcl_debug.h>
#include <vcl/vcl_private.h>
+static u8 not_ready;
+
+void
+sigsegv_signal (int signum)
+{
+ not_ready = 1;
+}
+
+static void
+vcl_wait_for_memory (void *mem)
+{
+ u8 __clib_unused test;
+ if (1 || vcm->debug)
+ {
+ sleep (1);
+ return;
+ }
+ if (signal (SIGSEGV, sigsegv_signal))
+ {
+ perror ("signal()");
+ return;
+ }
+ not_ready = 0;
+
+again:
+ test = *(u8 *) mem;
+ if (not_ready)
+ {
+ not_ready = 0;
+ usleep (1);
+ goto again;
+ }
+
+ signal (SIGSEGV, SIG_DFL);
+}
+
static const char *
vppcom_app_state_str (app_state_t state)
{
@@ -206,6 +242,256 @@ vppcom_wait_for_app_state_change (app_state_t app_state)
return VPPCOM_ETIMEDOUT;
}
+static u32
+vcl_ct_registration_add (svm_msg_q_t * mq, u32 sid)
+{
+ vcl_cut_through_registration_t *cr;
+ pool_get (vcm->cut_through_registrations, cr);
+ cr->mq = mq;
+ cr->sid = sid;
+ return (cr - vcm->cut_through_registrations);
+}
+
+static void
+vcl_ct_registration_del (u32 ct_index)
+{
+ pool_put_index (vcm->cut_through_registrations, ct_index);
+}
+
+static vcl_session_t *
+vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
+{
+ vcl_session_t *s;
+ s = vcl_session_get (f->client_session_index);
+ if (s)
+ {
+ /* rx fifo */
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ /* tx fifo */
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ s = vcl_session_get (f->master_session_index);
+ if (s)
+ {
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ return 0;
+}
+
+static void
+vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
+ session_handle_t handle, int retval)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_accepted_reply_msg_t *rmp;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
+ rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = handle;
+ rmp->context = context;
+ rmp->retval = retval;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static u32
+vcl_session_accepted_handler (session_accepted_msg_t * mp)
+{
+ vcl_session_t *session, *listen_session;
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ u32 session_index;
+
+ VCL_SESSION_LOCK ();
+
+ listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
+ if (!listen_session)
+ {
+ svm_msg_q_t *evt_q;
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
+ "unknown vpp listener handle %llx",
+ getpid (), mp->listener_handle);
+ vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
+ VNET_API_ERROR_INVALID_ARGUMENT);
+ return VCL_INVALID_SESSION_INDEX;
+ }
+
+ pool_get (vcm->sessions, session);
+ memset (session, 0, sizeof (*session));
+ session_index = (u32) (session - vcm->sessions);
+
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+
+ if (mp->server_event_queue_address)
+ {
+ session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
+ svm_msg_q_t *);
+ session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
+ svm_msg_q_t *);
+ vcl_wait_for_memory (session->vpp_evt_q);
+ session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
+ session_index);
+ rx_fifo->master_session_index = session_index;
+ tx_fifo->master_session_index = session_index;
+ }
+ else
+ {
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ rx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session_index;
+ }
+
+ session->vpp_handle = mp->handle;
+ session->client_context = mp->context;
+ session->rx_fifo = rx_fifo;
+ session->tx_fifo = tx_fifo;
+
+ session->session_state = STATE_ACCEPT;
+ session->transport.rmt_port = mp->port;
+ session->transport.is_ip4 = mp->is_ip4;
+ clib_memcpy (&session->transport.rmt_ip, mp->ip, sizeof (ip46_address_t));
+
+ hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
+ session->transport.lcl_port = listen_session->transport.lcl_port;
+ session->transport.lcl_ip = listen_session->transport.lcl_ip;
+
+ VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
+ " address %U port %d queue %p!", getpid (), mp->handle, session_index,
+ mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
+ mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
+ clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
+ vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
+
+ VCL_SESSION_UNLOCK ();
+ return session_index;
+}
+
+static u32
+vcl_session_connected_handler (session_connected_msg_t * mp)
+{
+ vcl_session_t *session = 0;
+ u32 session_index;
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ int rv = VPPCOM_OK;
+
+ session_index = mp->context;
+ VCL_SESSION_LOCK_AND_GET (session_index, &session);
+done:
+ if (mp->retval)
+ {
+ clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
+ "connect failed! %U",
+ getpid (), mp->handle, session_index,
+ format_api_error, ntohl (mp->retval));
+ if (session)
+ {
+ session->session_state = STATE_FAILED;
+ session->vpp_handle = mp->handle;
+ }
+ else
+ {
+ clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
+ "Invalid session index (%u)!",
+ getpid (), mp->handle, session_index);
+ }
+ goto done_unlock;
+ }
+
+ if (rv)
+ goto done_unlock;
+
+ if (mp->client_event_queue_address)
+ {
+ session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
+ svm_msg_q_t *);
+ session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
+ svm_msg_q_t *);
+ vcl_wait_for_memory (session->vpp_evt_q);
+ session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
+ session_index);
+ }
+ else
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ rx_fifo->client_session_index = session_index;
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+ tx_fifo->client_session_index = session_index;
+
+ session->rx_fifo = rx_fifo;
+ session->tx_fifo = tx_fifo;
+ session->vpp_handle = mp->handle;
+ session->transport.is_ip4 = mp->is_ip4;
+ clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
+ sizeof (session->transport.lcl_ip));
+ session->transport.lcl_port = mp->lcl_port;
+ session->session_state = STATE_CONNECT;
+
+ /* Add it to lookup table */
+ hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
+
+ VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
+ "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
+ getpid (), mp->handle, session_index, session->rx_fifo,
+ session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
+done_unlock:
+ VCL_SESSION_UNLOCK ();
+ return session_index;
+}
+
+int
+vcl_handle_mq_ctrl_event (session_event_t * e)
+{
+ session_accepted_msg_t *accepted_msg;
+ session_disconnected_msg_t *disconnected_msg;
+ vcl_session_msg_t *vcl_msg;
+ vcl_session_t *session;
+ u64 handle;
+ u32 sid;
+
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ clib_warning ("unhandled rx: sid %u (0x%x)",
+ e->fifo->client_session_index,
+ e->fifo->client_session_index);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vppcom_session_table_lookup_listener (handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ vcl_session_connected_handler ((session_connected_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+ session = vcl_session_get (sid);
+ session->session_state = STATE_DISCONNECT;
+ VDBG (0, "disconnected %u", sid);
+ break;
+ default:
+ clib_warning ("unhandled %u", e->event_type);
+ }
+ return VPPCOM_OK;
+}
+
static inline int
vppcom_wait_for_session_state_change (u32 session_index,
session_state_t state,
@@ -213,6 +499,8 @@ vppcom_wait_for_session_state_change (u32 session_index,
{
f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
vcl_session_t *volatile session;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
int rv;
do
@@ -234,8 +522,13 @@ vppcom_wait_for_session_state_change (u32 session_index,
VCL_SESSION_UNLOCK ();
return VPPCOM_ECONNREFUSED;
}
-
VCL_SESSION_UNLOCK ();
+
+ if (svm_msg_q_sub (vcm->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
+ continue;
+ e = svm_msg_q_msg_data (vcm->app_event_queue, &msg);
+ vcl_handle_mq_ctrl_event (e);
+ svm_msg_q_free_msg (vcm->app_event_queue, &msg);
}
while (clib_time_now (&vcm->clib_time) < timeout);
@@ -334,25 +627,17 @@ vppcom_session_disconnect (u32 session_index)
goto done;
}
- /* The peer has already initiated the close,
- * so send the disconnect session reply.
- */
if (state & STATE_CLOSE_ON_EMPTY)
{
- //XXX alagalah - Check and drain here?
- vppcom_send_disconnect_session_reply (vpp_handle,
- session_index, 0 /* rv */ );
+ vppcom_send_disconnect_session_reply (vpp_handle, session_index,
+ 0 /* rv */ );
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
"REPLY...", getpid (), vpp_handle, session_index);
}
-
- /* Otherwise, send a disconnect session msg...
- */
else
{
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
getpid (), vpp_handle, session_index);
-
vppcom_send_disconnect_session (vpp_handle, session_index);
}
@@ -564,7 +849,6 @@ vppcom_session_close (uint32_t session_index)
getpid (), vpp_handle, session_index,
rv, vppcom_retval_str (rv));
}
-
else if (state & (CLIENT_STATE_OPEN | SERVER_STATE_OPEN))
{
rv = vppcom_session_disconnect (session_index);
@@ -577,6 +861,9 @@ vppcom_session_close (uint32_t session_index)
}
VCL_SESSION_LOCK_AND_GET (session_index, &session);
+ if (session->our_evt_q)
+ vcl_ct_registration_del (session->ct_registration);
+
vpp_handle = session->vpp_handle;
if (vpp_handle != ~0)
{
@@ -625,8 +912,12 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
}
session->transport.is_ip4 = ep->is_ip4;
- session->transport.lcl_ip = to_ip46 (ep->is_ip4 ? IP46_TYPE_IP4 :
- IP46_TYPE_IP6, ep->ip);
+ if (ep->is_ip4)
+ clib_memcpy (&session->transport.lcl_ip.ip4, ep->ip,
+ sizeof (ip4_address_t));
+ else
+ clib_memcpy (&session->transport.lcl_ip.ip6, ep->ip,
+ sizeof (ip6_address_t));
session->transport.lcl_port = ep->port;
VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
@@ -678,9 +969,9 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len)
vppcom_send_bind_sock (listen_session, listen_session_index);
VCL_SESSION_UNLOCK ();
- retval =
- vppcom_wait_for_session_state_change (listen_session_index, STATE_LISTEN,
- vcm->cfg.session_timeout);
+ retval = vppcom_wait_for_session_state_change (listen_session_index,
+ STATE_LISTEN,
+ vcm->cfg.session_timeout);
VCL_SESSION_LOCK_AND_GET (listen_session_index, &listen_session);
if (PREDICT_FALSE (retval))
@@ -693,10 +984,6 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len)
goto done;
}
- VCL_ACCEPT_FIFO_LOCK ();
- clib_fifo_validate (vcm->client_session_index_fifo, q_len);
- VCL_ACCEPT_FIFO_UNLOCK ();
-
VCL_SESSION_UNLOCK ();
done:
@@ -732,101 +1019,67 @@ int
vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
uint32_t flags)
{
+ session_accepted_msg_t accepted_msg;
vcl_session_t *listen_session = 0;
vcl_session_t *client_session = 0;
u32 client_session_index = ~0;
- int rv;
+ svm_msg_q_t *vpp_evt_q;
+ vcl_session_msg_t *evt;
u64 listen_vpp_handle;
- vce_event_handler_reg_t *reg;
- vce_event_t *ev;
- vce_event_connect_request_t *result;
- struct timespec ts;
- struct timeval tv;
- int millisecond_timeout = 1;
- int hours_timeout = 20 * 60 * 60;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ u8 is_nonblocking;
+ int rv;
VCL_SESSION_LOCK_AND_GET (listen_session_index, &listen_session);
- listen_vpp_handle = listen_session->vpp_handle; // For debugging
- rv = validate_args_session_accept_ (listen_session);
- if (rv)
+ if (validate_args_session_accept_ (listen_session))
{
VCL_SESSION_UNLOCK ();
goto done;
}
- /* Using an aggressive timer of 1ms and a generous timer of
- * 20 hours, we can implement a blocking and non-blocking listener
- * as both event and time driven */
- gettimeofday (&tv, NULL);
- ts.tv_nsec = (tv.tv_usec * 1000) + (1000 * millisecond_timeout);
- ts.tv_sec = tv.tv_sec;
-
- /* Predict that the Listener is blocking more often than not */
- if (PREDICT_TRUE (!VCL_SESS_ATTR_TEST (listen_session->attr,
- VCL_SESS_ATTR_NONBLOCK)))
- ts.tv_sec += hours_timeout;
-
VCL_SESSION_UNLOCK ();
- /* Register handler for connect_request event on listen_session_index */
- vce_event_key_t evk;
- evk.session_index = listen_session_index;
- evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
- reg = vce_register_handler (&vcm->event_thread, &evk,
- vce_connect_request_handler_fn, 0);
- VCL_EVENTS_LOCK ();
- ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
- pthread_mutex_lock (&reg->handler_lock);
- while (!ev)
- {
- VCL_EVENTS_UNLOCK ();
- rv = pthread_cond_timedwait (&reg->handler_cond,
- &reg->handler_lock, &ts);
- if (rv == ETIMEDOUT)
- {
- rv = VPPCOM_EAGAIN;
- goto cleanup;
- }
- VCL_EVENTS_LOCK ();
- ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
- }
- result = vce_get_event_data (ev, sizeof (*result));
- client_session_index = result->accepted_session_index;
- VCL_EVENTS_UNLOCK ();
-
- /* Remove from the FIFO used to service epoll */
- VCL_ACCEPT_FIFO_LOCK ();
- if (clib_fifo_elts (vcm->client_session_index_fifo))
- {
- u32 tmp_client_session_index;
- clib_fifo_sub1 (vcm->client_session_index_fifo,
- tmp_client_session_index);
- /* It wasn't ours... put it back ... */
- if (tmp_client_session_index != client_session_index)
- clib_fifo_add1 (vcm->client_session_index_fifo,
- tmp_client_session_index);
- }
- VCL_ACCEPT_FIFO_UNLOCK ();
+ if (clib_fifo_elts (listen_session->accept_evts_fifo))
+ {
+ clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
+ accepted_msg = evt->accepted_msg;
+ goto handle;
+ }
- VCL_SESSION_LOCK ();
+ is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
+ VCL_SESS_ATTR_NONBLOCK);
+ if (svm_msg_q_is_empty (vcm->app_event_queue) && is_nonblocking)
+ return VPPCOM_EAGAIN;
- rv = vppcom_session_at_index (client_session_index, &client_session);
- if (PREDICT_FALSE (rv))
+ while (1)
{
- rv = VPPCOM_ECONNABORTED;
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: client sid %u "
- "lookup failed! returning %d (%s)", getpid (),
- listen_vpp_handle, listen_session_index,
- client_session_index, rv, vppcom_retval_str (rv));
- goto cleanup;
+ if (svm_msg_q_sub (vcm->app_event_queue, &msg, SVM_Q_WAIT, 0))
+ return VPPCOM_EAGAIN;
+
+ e = svm_msg_q_msg_data (vcm->app_event_queue, &msg);
+ if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
+ {
+ clib_warning ("discarded event: %u", e->event_type);
+ svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+ continue;
+ }
+ clib_memcpy (&accepted_msg, e->data, sizeof (accepted_msg));
+ svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+ break;
}
+handle:
+
+ client_session_index = vcl_session_accepted_handler (&accepted_msg);
+ VCL_SESSION_LOCK_AND_GET (client_session_index, &client_session);
+ rv = client_session_index;
+
if (flags & O_NONBLOCK)
VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
- else
- VCL_SESS_ATTR_CLR (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
+ listen_vpp_handle = listen_session->vpp_handle;
VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: Got a client request! "
"vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
getpid (), listen_vpp_handle, listen_session_index,
@@ -846,46 +1099,31 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
sizeof (ip6_address_t));
}
- vppcom_send_accept_session_reply (client_session->vpp_handle,
- client_session->client_context,
- 0 /* retval OK */ );
+ if (accepted_msg.server_event_queue_address)
+ vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
+ svm_msg_q_t *);
+ else
+ vpp_evt_q = client_session->vpp_evt_q;
+ vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
+ client_session->vpp_handle, 0);
- VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx,"
- " sid %u connection from peer %s address %U port %u to local %s address"
- " %U port %u",
- getpid (), listen_vpp_handle,
+ VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx, "
+ "sid %u connection from peer %s address %U port %u to local %s "
+ "address %U port %u", getpid (), listen_vpp_handle,
listen_session_index, client_session->vpp_handle,
client_session_index,
client_session->transport.is_ip4 ? "IPv4" : "IPv6",
format_ip46_address, &client_session->transport.rmt_ip,
- client_session->transport.is_ip4 ?
- IP46_TYPE_IP4 : IP46_TYPE_IP6,
+ client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
clib_net_to_host_u16 (client_session->transport.rmt_port),
client_session->transport.is_ip4 ? "IPv4" : "IPv6",
format_ip46_address, &client_session->transport.lcl_ip,
- client_session->transport.is_ip4 ?
- IP46_TYPE_IP4 : IP46_TYPE_IP6,
+ client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
clib_net_to_host_u16 (client_session->transport.lcl_port));
vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
client_session_index);
VCL_SESSION_UNLOCK ();
- rv = (int) client_session_index;
- vce_clear_event (&vcm->event_thread, reg->ev_idx);
- if (vcm->session_io_thread.io_sessions_lockp)
- {
- /* Throw this new accepted session index into the rx poll thread pool */
- VCL_IO_SESSIONS_LOCK ();
- u32 *active_session_index;
- pool_get (vcm->session_io_thread.active_session_indexes,
- active_session_index);
- *active_session_index = client_session_index;
- VCL_IO_SESSIONS_UNLOCK ();
- }
-cleanup:
- vce_unregister_handler (&vcm->event_thread, reg);
- pthread_mutex_unlock (&reg->handler_lock);
-
done:
return rv;
}
@@ -947,9 +1185,8 @@ vppcom_session_connect (uint32_t session_index, vppcom_endpt_t * server_ep)
vppcom_send_connect_sock (session, session_index);
VCL_SESSION_UNLOCK ();
- retval =
- vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
- vcm->cfg.session_timeout);
+ retval = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
+ vcm->cfg.session_timeout);
VCL_SESSION_LOCK_AND_GET (session_index, &session);
vpp_handle = session->vpp_handle;
@@ -978,29 +1215,32 @@ done:
return rv;
}
+static u8
+vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
+{
+ if (!is_ct)
+ return (e->event_type == FIFO_EVENT_APP_RX
+ && e->fifo->client_session_index == sid);
+ else
+ return (e->event_type == SESSION_IO_EVT_CT_TX);
+}
+
static inline int
vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
u8 peek)
{
+ int n_read = 0, rv, is_nonblocking;
vcl_session_t *session = 0;
svm_fifo_t *rx_fifo;
- int n_read = 0;
- int rv;
- int is_nonblocking;
-
- u64 vpp_handle;
- u32 poll_et;
- session_state_t state;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ svm_msg_q_t *mq;
+ u8 is_full;
ASSERT (buf);
VCL_SESSION_LOCK_AND_GET (session_index, &session);
- is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
- rx_fifo = session->rx_fifo;
- state = session->session_state;
- vpp_handle = session->vpp_handle;
-
if (PREDICT_FALSE (session->is_vep))
{
VCL_SESSION_UNLOCK ();
@@ -1010,72 +1250,84 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
goto done;
}
- if (PREDICT_FALSE (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN))))
+ is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+ rx_fifo = session->rx_fifo;
+
+ if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
{
+ session_state_t state = session->session_state;
VCL_SESSION_UNLOCK ();
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
"state 0x%x (%s), returning %d (%s)",
- getpid (), vpp_handle, session_index, state,
+ getpid (), session->vpp_handle, session_index, state,
vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
goto done;
}
VCL_SESSION_UNLOCK ();
+ mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ is_full = svm_fifo_is_full (rx_fifo);
- do
+ if (svm_fifo_is_empty (rx_fifo))
{
- if (peek)
- n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
- else
- n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
- }
- while (!is_nonblocking && (n_read <= 0));
-
- if (n_read <= 0)
- {
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
-
- poll_et = (((EPOLLET | EPOLLIN) & session->vep.ev.events) ==
- (EPOLLET | EPOLLIN));
- if (poll_et)
- session->vep.et_mask |= EPOLLIN;
-
- if (state & STATE_CLOSE_ON_EMPTY)
+ svm_fifo_unset_event (rx_fifo);
+ if (is_nonblocking)
{
- rv = VPPCOM_ECONNRESET;
-
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
- "session state 0x%x (%s)! Setting state to 0x%x (%s), "
- "returning %d (%s)",
- getpid (), session->vpp_handle, session_index,
- state, vppcom_session_state_str (state),
- STATE_DISCONNECT,
- vppcom_session_state_str (STATE_DISCONNECT), rv,
- vppcom_retval_str (rv));
-
- session->session_state = STATE_DISCONNECT;
+ rv = VPPCOM_OK;
+ goto done;
+ }
+ svm_msg_q_lock (mq);
+ while (1)
+ {
+ if (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+ svm_msg_q_sub_w_lock (mq, &msg);
+ e = svm_msg_q_msg_data (mq, &msg);
+ if (!vcl_is_rx_evt_for_session (e, session_index,
+ session->our_evt_q != 0))
+ {
+ vcl_handle_mq_ctrl_event (e);
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ if (svm_fifo_is_empty (rx_fifo))
+ {
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_unlock (mq);
+ break;
}
- else
- rv = VPPCOM_EAGAIN;
-
- VCL_SESSION_UNLOCK ();
}
+
+ if (peek)
+ n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
else
- rv = n_read;
+ n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
+ ASSERT (n_read > 0);
+ svm_fifo_unset_event (rx_fifo);
+
+ if (session->our_evt_q && is_full)
+ app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
+ SVM_Q_WAIT);
+
if (VPPCOM_DEBUG > 2)
{
- if (rv > 0)
+ if (n_read > 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
- "from (%p)", getpid (), vpp_handle,
+ "from (%p)", getpid (), session->vpp_handle,
session_index, n_read, rx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
- "returning %d (%s)", getpid (), vpp_handle,
+ "returning %d (%s)", getpid (), session->vpp_handle,
session_index, rv, vppcom_retval_str (rv));
}
+ return n_read;
+
done:
return rv;
}
@@ -1093,100 +1345,55 @@ vppcom_session_peek (uint32_t session_index, void *buf, int n)
}
static inline int
-vppcom_session_read_ready (vcl_session_t * session, u32 session_index)
+vppcom_session_read_ready (vcl_session_t * session)
{
- int ready = 0;
- u32 poll_et;
- int rv;
- session_state_t state = session->session_state;
- u64 vpp_handle = session->vpp_handle;
-
/* Assumes caller has acquired spinlock: vcm->sessions_lockp */
if (PREDICT_FALSE (session->is_vep))
{
clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
- "epoll session!", getpid (), session_index);
- rv = VPPCOM_EBADFD;
- goto done;
- }
-
- if (session->session_state & STATE_LISTEN)
- {
- VCL_ACCEPT_FIFO_LOCK ();
- ready = clib_fifo_elts (vcm->client_session_index_fifo);
- VCL_ACCEPT_FIFO_UNLOCK ();
- }
- else
- {
- if (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN | STATE_LISTEN)))
- {
- rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET :
- VPPCOM_ENOTCONN);
-
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
- " state 0x%x (%s), returning %d (%s)",
- getpid (), vpp_handle, session_index,
- state, vppcom_session_state_str (state),
- rv, vppcom_retval_str (rv));
- goto done;
- }
-
- ready = svm_fifo_max_dequeue (session->rx_fifo);
+ "epoll session!", getpid (), vcl_session_index (session));
+ return VPPCOM_EBADFD;
}
- if (ready == 0)
+ if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
{
- poll_et =
- ((EPOLLET | EPOLLIN) & session->vep.ev.events) == (EPOLLET | EPOLLIN);
- if (poll_et)
- session->vep.et_mask |= EPOLLIN;
+ session_state_t state = session->session_state;
+ int rv;
- if (state & STATE_CLOSE_ON_EMPTY)
- {
- rv = VPPCOM_ECONNRESET;
+ rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
- "session state 0x%x (%s)! Setting state to 0x%x (%s), "
- "returning %d (%s)",
- getpid (), session_index, vpp_handle,
- state, vppcom_session_state_str (state),
- STATE_DISCONNECT,
- vppcom_session_state_str (STATE_DISCONNECT), rv,
- vppcom_retval_str (rv));
- session->session_state = STATE_DISCONNECT;
- goto done;
- }
+ VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
+ " state 0x%x (%s), returning %d (%s)", getpid (),
+ session->vpp_handle, vcl_session_index (session), state,
+ vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
+ return rv;
}
- rv = ready;
- if (!svm_msg_q_is_empty (vcm->app_event_queue) &&
- !pthread_mutex_trylock (&vcm->app_event_queue->q->mutex))
- {
- u32 i, n_to_dequeue = vcm->app_event_queue->q->cursize;
- svm_msg_q_msg_t msg;
+ if (session->session_state & STATE_LISTEN)
+ return clib_fifo_elts (session->accept_evts_fifo);
- for (i = 0; i < n_to_dequeue; i++)
- {
- svm_queue_sub_raw (vcm->app_event_queue->q, (u8 *) & msg);
- svm_msg_q_free_msg (vcm->app_event_queue, &msg);
- }
+ return svm_fifo_max_dequeue (session->rx_fifo);
+}
- pthread_mutex_unlock (&vcm->app_event_queue->q->mutex);
- }
-done:
- return rv;
+static u8
+vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
+{
+ if (!is_ct)
+ return (e->event_type == FIFO_EVENT_APP_TX
+ && e->fifo->client_session_index == sid);
+ else
+ return (e->event_type == SESSION_IO_EVT_CT_RX);
}
int
vppcom_session_write (uint32_t session_index, void *buf, size_t n)
{
+ int rv, n_write, is_nonblocking;
vcl_session_t *session = 0;
svm_fifo_t *tx_fifo = 0;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
svm_msg_q_t *mq;
- session_state_t state;
- int rv, n_write, is_nonblocking;
- u32 poll_et;
- u64 vpp_handle;
ASSERT (buf);
@@ -1194,105 +1401,98 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
tx_fifo = session->tx_fifo;
is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
- vpp_handle = session->vpp_handle;
- state = session->session_state;
if (PREDICT_FALSE (session->is_vep))
{
VCL_SESSION_UNLOCK ();
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"cannot write to an epoll session!",
- getpid (), vpp_handle, session_index);
+ getpid (), session->vpp_handle, session_index);
rv = VPPCOM_EBADFD;
goto done;
}
- if (!(session->session_state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN)))
+ if (!(session->session_state & STATE_OPEN))
{
- rv =
- ((session->session_state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET :
- VPPCOM_ENOTCONN);
-
+ session_state_t state = session->session_state;
+ rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VCL_SESSION_UNLOCK ();
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
"state 0x%x (%s)",
- getpid (), vpp_handle, session_index,
+ getpid (), session->vpp_handle, session_index,
state, vppcom_session_state_str (state));
goto done;
}
VCL_SESSION_UNLOCK ();
- do
+ mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ if (svm_fifo_is_full (tx_fifo))
{
- n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
+ if (is_nonblocking)
+ {
+ rv = VPPCOM_EWOULDBLOCK;
+ goto done;
+ }
+ svm_msg_q_lock (mq);
+ while (1)
+ {
+ if (!svm_fifo_is_full (tx_fifo))
+ {
+ svm_msg_q_unlock (mq);
+ break;
+ }
+ if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
+ continue;
+ svm_msg_q_sub_w_lock (mq, &msg);
+ e = svm_msg_q_msg_data (mq, &msg);
+ if (!vcl_is_tx_evt_for_session (e, session_index,
+ session->our_evt_q != 0))
+ {
+ vcl_handle_mq_ctrl_event (e);
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ if (svm_fifo_is_full (tx_fifo))
+ {
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_unlock (mq);
+ break;
+ }
}
- while (!is_nonblocking && (n_write <= 0));
- /* If event wasn't set, add one
- *
- * To reduce context switching, can check if an
- * event is already there for this event_key, but for now
- * this will suffice. */
+ n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
+ ASSERT (n_write > 0);
- if ((n_write > 0) && svm_fifo_set_event (tx_fifo))
+ if (svm_fifo_set_event (tx_fifo))
{
- /* Send TX event to vpp */
+ session_evt_type_t et;
VCL_SESSION_LOCK_AND_GET (session_index, &session);
- mq = session->vpp_evt_q;
- ASSERT (mq);
- app_send_io_evt_to_vpp (mq, tx_fifo, FIFO_EVENT_APP_TX, SVM_Q_WAIT);
+ et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
+ app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
VCL_SESSION_UNLOCK ();
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
"to vpp_event_q %p, n_write %d", getpid (),
- vpp_handle, session_index, mq, n_write);
- }
-
- if (n_write <= 0)
- {
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
-
- poll_et = (((EPOLLET | EPOLLOUT) & session->vep.ev.events) ==
- (EPOLLET | EPOLLOUT));
- if (poll_et)
- session->vep.et_mask |= EPOLLOUT;
-
- if (session->session_state & STATE_CLOSE_ON_EMPTY)
- {
- rv = VPPCOM_ECONNRESET;
-
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
- "session state 0x%x (%s)! Setting state to 0x%x (%s), "
- "returning %d (%s)",
- getpid (), session->vpp_handle, session_index,
- session->session_state,
- vppcom_session_state_str (session->session_state),
- STATE_DISCONNECT,
- vppcom_session_state_str (STATE_DISCONNECT), rv,
- vppcom_retval_str (rv));
-
- session->session_state = STATE_DISCONNECT;
- }
- else
- rv = VPPCOM_EAGAIN;
-
- VCL_SESSION_UNLOCK ();
+ session->vpp_handle, session_index, session->vpp_evt_q, n_write);
}
- else
- rv = n_write;
if (VPPCOM_DEBUG > 2)
{
if (n_write <= 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "FIFO-FULL (%p)", getpid (), vpp_handle,
+ "FIFO-FULL (%p)", getpid (), session->vpp_handle,
session_index, tx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
"wrote %d bytes tx-fifo: (%p)", getpid (),
- vpp_handle, session_index, n_write, tx_fifo);
+ session->vpp_handle, session_index, n_write, tx_fifo);
}
+ return n_write;
+
done:
return rv;
}
@@ -1300,20 +1500,13 @@ done:
static inline int
vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
{
- int ready;
- u32 poll_et;
- int rv;
-
- ASSERT (session);
-
/* Assumes caller has acquired spinlock: vcm->sessions_lockp */
if (PREDICT_FALSE (session->is_vep))
{
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"cannot write to an epoll session!",
getpid (), session->vpp_handle, session_index);
- rv = VPPCOM_EBADFD;
- goto done;
+ return VPPCOM_EBADFD;
}
if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
@@ -1321,58 +1514,168 @@ vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"cannot write to a listen session!",
getpid (), session->vpp_handle, session_index);
- rv = VPPCOM_EBADFD;
- goto done;
+ return VPPCOM_EBADFD;
}
- if (!(session->session_state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN)))
+ if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
{
session_state_t state = session->session_state;
+ int rv;
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"session is not open! state 0x%x (%s), "
"returning %d (%s)", getpid (), session->vpp_handle,
session_index,
state, vppcom_session_state_str (state),
rv, vppcom_retval_str (rv));
- goto done;
+ return rv;
}
- ready = svm_fifo_max_enqueue (session->tx_fifo);
-
VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
getpid (), session->vpp_handle, session_index, session->tx_fifo,
- ready);
+ svm_fifo_max_enqueue (session->tx_fifo));
- if (ready == 0)
+ return svm_fifo_max_enqueue (session->tx_fifo);
+}
+
+static int
+vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
+ unsigned long *read_map, unsigned long *write_map,
+ unsigned long *except_map, double time_to_wait,
+ u32 * bits_set)
+{
+ session_disconnected_msg_t *disconnected_msg;
+ session_accepted_msg_t *accepted_msg;
+ vcl_session_msg_t *vcl_msg;
+ vcl_session_t *session;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ u32 n_msgs, i, sid;
+ u64 handle;
+
+ svm_msg_q_lock (mq);
+ if (svm_msg_q_is_empty (mq))
{
- poll_et = (((EPOLLET | EPOLLOUT) & session->vep.ev.events) ==
- (EPOLLET | EPOLLOUT));
- if (poll_et)
- session->vep.et_mask |= EPOLLOUT;
+ if (*bits_set)
+ {
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
- if (session->session_state & STATE_CLOSE_ON_EMPTY)
+ if (!time_to_wait)
+ {
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ else if (time_to_wait < 0)
{
- rv = VPPCOM_ECONNRESET;
+ svm_msg_q_wait (mq);
+ }
+ else
+ {
+ if (svm_msg_q_timedwait (mq, time_to_wait))
+ {
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ }
+ }
+ svm_msg_q_unlock (mq);
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
- "session state 0x%x (%s)! Setting state to 0x%x (%s), "
- "returning %d (%s)", getpid (),
- session->vpp_handle, session_index,
- session->session_state,
- vppcom_session_state_str (session->session_state),
- STATE_DISCONNECT,
- vppcom_session_state_str (STATE_DISCONNECT), rv,
- vppcom_retval_str (rv));
- session->session_state = STATE_DISCONNECT;
- goto done;
+ n_msgs = svm_msg_q_size (mq);
+ for (i = 0; i < n_msgs; i++)
+ {
+ if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
+ {
+ clib_warning ("message queue returned");
+ continue;
+ }
+ e = svm_msg_q_msg_data (mq, &msg);
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (sid);
+ if (!session || svm_fifo_is_empty (session->rx_fifo))
+ break;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (sid);
+ if (!session || svm_fifo_is_full (session->tx_fifo))
+ break;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ session = vcl_ct_session_get_from_fifo (e->fifo, 0);
+ sid = vcl_session_index (session);
+ if (!session || svm_fifo_is_empty (session->rx_fifo))
+ break;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (e->fifo, 1);
+ sid = vcl_session_index (session);
+ if (!session || svm_fifo_is_full (session->tx_fifo))
+ break;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vppcom_session_table_lookup_listener (handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ sid = session - vcm->sessions;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ default:
+ clib_warning ("unhandled: %u", e->event_type);
+ break;
}
+ svm_msg_q_free_msg (mq, &msg);
}
- rv = ready;
-done:
- return rv;
+
+ return *bits_set;
}
int
@@ -1380,11 +1683,11 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
unsigned long *write_map, unsigned long *except_map,
double time_to_wait)
{
- u32 session_index;
+ u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
+ vcl_cut_through_registration_t *cr;
+ double total_wait = 0, wait_slice;
vcl_session_t *session = 0;
- int rv, bits_set = 0;
- f64 timeout = clib_time_now (&vcm->clib_time) + time_to_wait;
- u32 minbits = clib_max (n_bits, BITS (uword));
+ int rv;
ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
@@ -1412,124 +1715,74 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
vec_len (vcm->ex_bitmap) * sizeof (clib_bitmap_t));
}
+ if (!n_bits)
+ return 0;
+
+ if (!write_map)
+ goto check_rd;
+
+ /* *INDENT-OFF* */
+ clib_bitmap_foreach (sid, vcm->wr_bitmap, ({
+ VCL_SESSION_LOCK();
+ if (!(session = vcl_session_get (sid)))
+ {
+ VCL_SESSION_UNLOCK();
+ VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
+ getpid (), sid);
+ return VPPCOM_EBADFD;
+ }
+
+ rv = svm_fifo_is_full (session->tx_fifo);
+ VCL_SESSION_UNLOCK();
+ if (!rv)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ bits_set++;
+ }
+ }));
+
+check_rd:
+ if (!read_map)
+ goto check_mq;
+ clib_bitmap_foreach (sid, vcm->rd_bitmap, ({
+ VCL_SESSION_LOCK();
+ if (!(session = vcl_session_get (sid)))
+ {
+ VCL_SESSION_UNLOCK();
+ VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
+ getpid (), sid);
+ return VPPCOM_EBADFD;
+ }
+
+ rv = vppcom_session_read_ready (session);
+ VCL_SESSION_UNLOCK();
+ if (rv)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ bits_set++;
+ }
+ }));
+ /* *INDENT-ON* */
+
+check_mq:
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
do
{
/* *INDENT-OFF* */
- if (n_bits)
- {
- if (read_map)
- {
- clib_bitmap_foreach (session_index, vcm->rd_bitmap,
- ({
- VCL_SESSION_LOCK();
- rv = vppcom_session_at_index (session_index, &session);
- if (rv < 0)
- {
- VCL_SESSION_UNLOCK();
- VDBG (1, "VCL<%d>: session %d specified in read_map is"
- " closed.", getpid (),
- session_index);
- bits_set = VPPCOM_EBADFD;
- goto select_done;
- }
- if (session->session_state & STATE_LISTEN)
- {
- vce_event_handler_reg_t *reg = 0;
- vce_event_key_t evk;
-
- /* Check if handler already registered for this
- * event.
- * If not, register handler for connect_request event
- * on listen_session_index
- */
- evk.session_index = session_index;
- evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
- reg = vce_get_event_handler (&vcm->event_thread, &evk);
- if (!reg)
- reg = vce_register_handler (&vcm->event_thread, &evk,
- vce_poll_wait_connect_request_handler_fn,
- 0 /* No callback args */);
- rv = vppcom_session_read_ready (session, session_index);
- if (rv > 0)
- {
- vce_unregister_handler (&vcm->event_thread, reg);
- }
- }
- else
- rv = vppcom_session_read_ready (session, session_index);
- VCL_SESSION_UNLOCK();
- if (except_map && vcm->ex_bitmap &&
- clib_bitmap_get (vcm->ex_bitmap, session_index) &&
- (rv < 0))
- {
- clib_bitmap_set_no_check (except_map, session_index, 1);
- bits_set++;
- }
- else if (rv > 0)
- {
- clib_bitmap_set_no_check (read_map, session_index, 1);
- bits_set++;
- }
- }));
- }
-
- if (write_map)
- {
- clib_bitmap_foreach (session_index, vcm->wr_bitmap,
- ({
- VCL_SESSION_LOCK();
- rv = vppcom_session_at_index (session_index, &session);
- if (rv < 0)
- {
- VCL_SESSION_UNLOCK();
- VDBG (0, "VCL<%d>: session %d specified in "
- "write_map is closed.", getpid (),
- session_index);
- bits_set = VPPCOM_EBADFD;
- goto select_done;
- }
-
- rv = vppcom_session_write_ready (session, session_index);
- VCL_SESSION_UNLOCK();
- if (write_map && (rv > 0))
- {
- clib_bitmap_set_no_check (write_map, session_index, 1);
- bits_set++;
- }
- }));
- }
-
- if (except_map)
- {
- clib_bitmap_foreach (session_index, vcm->ex_bitmap,
- ({
- VCL_SESSION_LOCK();
- rv = vppcom_session_at_index (session_index, &session);
- if (rv < 0)
- {
- VCL_SESSION_UNLOCK();
- VDBG (1, "VCL<%d>: session %d specified in except_map "
- "is closed.", getpid (),
- session_index);
- bits_set = VPPCOM_EBADFD;
- goto select_done;
- }
-
- rv = vppcom_session_read_ready (session, session_index);
- VCL_SESSION_UNLOCK();
- if (rv < 0)
- {
- clib_bitmap_set_no_check (except_map, session_index, 1);
- bits_set++;
- }
- }));
- }
- }
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
+ 0, &bits_set);
+ }));
/* *INDENT-ON* */
+
+ vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
+ except_map, time_to_wait, &bits_set);
+ total_wait += wait_slice;
+ if (bits_set)
+ return bits_set;
}
- while ((time_to_wait == -1) || (clib_time_now (&vcm->clib_time) < timeout));
+ while (total_wait < time_to_wait);
-select_done:
return (bits_set);
}
@@ -1857,193 +2110,230 @@ done:
return rv;
}
-int
-vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
- int maxevents, double wait_for_time)
+static int
+vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
+ u32 maxevents, double wait_for_time, u32 * num_ev)
{
- vcl_session_t *vep_session;
- int rv;
- f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
- u32 keep_trying = 1;
- int num_ev = 0;
- u32 vep_next_sid, wait_cont_idx;
- u8 is_vep;
-
- if (PREDICT_FALSE (maxevents <= 0))
- {
- clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
- getpid (), maxevents);
- return VPPCOM_EINVAL;
- }
- memset (events, 0, sizeof (*events) * maxevents);
-
- VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
- vep_next_sid = vep_session->vep.next_sid;
- is_vep = vep_session->is_vep;
- wait_cont_idx = vep_session->wait_cont_idx;
- VCL_SESSION_UNLOCK ();
+ session_disconnected_msg_t *disconnected_msg;
+ session_connected_msg_t *connected_msg;
+ session_accepted_msg_t *accepted_msg;
+ u32 sid = ~0, session_events, n_msgs;
+ u64 session_evt_data = ~0, handle;
+ vcl_session_msg_t *vcl_msg;
+ vcl_session_t *session;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ u8 add_event;
+ int i;
- if (PREDICT_FALSE (!is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
- getpid (), vep_idx);
- rv = VPPCOM_EINVAL;
- goto done;
- }
- if (PREDICT_FALSE (vep_next_sid == ~0))
+ svm_msg_q_lock (mq);
+ if (svm_msg_q_is_empty (mq))
{
- VDBG (1, "VCL<%d>: WARNING: vep_idx (%u) is empty!",
- getpid (), vep_idx);
- goto done;
+ if (!wait_for_time)
+ {
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ else if (wait_for_time < 0)
+ {
+ svm_msg_q_wait (mq);
+ }
+ else
+ {
+ if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
+ {
+ svm_msg_q_unlock (mq);
+ return 0;
+ }
+ }
}
+ svm_msg_q_unlock (mq);
- do
+ n_msgs = svm_msg_q_size (mq);
+ for (i = 0; i < n_msgs; i++)
{
- u32 sid;
- u32 next_sid = ~0;
- vcl_session_t *session;
-
- for (sid = (wait_cont_idx == ~0) ? vep_next_sid : wait_cont_idx;
- sid != ~0; sid = next_sid)
+ if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
{
- u32 session_events, et_mask, clear_et_mask, session_vep_idx;
- u8 add_event, is_vep_session;
- int ready;
- u64 session_ev_data;
-
- VCL_SESSION_LOCK_AND_GET (sid, &session);
- next_sid = session->vep.next_sid;
+ clib_warning ("message queue returned");
+ continue;
+ }
+ e = svm_msg_q_msg_data (mq, &msg);
+ add_event = 0;
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ sid = e->fifo->client_session_index;
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ session = vcl_session_get (sid);
session_events = session->vep.ev.events;
- et_mask = session->vep.et_mask;
- is_vep = session->is_vep;
- is_vep_session = session->is_vep_session;
- session_vep_idx = session->vep.vep_idx;
- session_ev_data = session->vep.ev.data.u64;
-
- VCL_SESSION_UNLOCK ();
-
- if (PREDICT_FALSE (is_vep))
+ if ((EPOLLIN & session->vep.ev.events)
+ && !svm_fifo_is_empty (session->rx_fifo))
{
- VDBG (0, "VCL<%d>: ERROR: sid (%u) is a vep!",
- getpid (), vep_idx);
- rv = VPPCOM_EINVAL;
- goto done;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
}
- if (PREDICT_FALSE (!is_vep_session))
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ session = vcl_session_get (sid);
+ session_events = session->vep.ev.events;
+ if ((EPOLLOUT & session_events)
+ && !svm_fifo_is_full (session->tx_fifo))
{
- VDBG (0, "VCL<%d>: ERROR: session (%u) is not "
- "a vep session!", getpid (), sid);
- rv = VPPCOM_EINVAL;
- goto done;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
}
- if (PREDICT_FALSE (session_vep_idx != vep_idx))
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ session = vcl_ct_session_get_from_fifo (e->fifo, 0);
+ sid = vcl_session_index (session);
+ session_events = session->vep.ev.events;
+ if ((EPOLLIN & session->vep.ev.events)
+ && !svm_fifo_is_empty (session->rx_fifo))
{
- clib_warning ("VCL<%d>: ERROR: session (%u) "
- "vep_idx (%u) != vep_idx (%u)!",
- getpid (), sid, session_vep_idx, vep_idx);
- rv = VPPCOM_EINVAL;
- goto done;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
}
-
- add_event = clear_et_mask = 0;
-
- if (EPOLLIN & session_events)
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (e->fifo, 1);
+ sid = vcl_session_index (session);
+ session_events = session->vep.ev.events;
+ if ((EPOLLOUT & session_events)
+ && !svm_fifo_is_full (session->tx_fifo))
{
- VCL_SESSION_LOCK_AND_GET (sid, &session);
- ready = vppcom_session_read_ready (session, sid);
- VCL_SESSION_UNLOCK ();
- if ((ready > 0) && (EPOLLIN & et_mask))
- {
- add_event = 1;
- events[num_ev].events |= EPOLLIN;
- if (((EPOLLET | EPOLLIN) & session_events) ==
- (EPOLLET | EPOLLIN))
- clear_et_mask |= EPOLLIN;
- }
- else if (ready < 0)
- {
- add_event = 1;
- switch (ready)
- {
- case VPPCOM_ECONNRESET:
- events[num_ev].events |= EPOLLHUP | EPOLLRDHUP;
- break;
-
- default:
- events[num_ev].events |= EPOLLERR;
- break;
- }
- }
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
}
-
- if (EPOLLOUT & session_events)
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vppcom_session_table_lookup_listener (handle);
+ if (!session)
{
- VCL_SESSION_LOCK_AND_GET (sid, &session);
- ready = vppcom_session_write_ready (session, sid);
- VCL_SESSION_UNLOCK ();
- if ((ready > 0) && (EPOLLOUT & et_mask))
- {
- add_event = 1;
- events[num_ev].events |= EPOLLOUT;
- if (((EPOLLET | EPOLLOUT) & session_events) ==
- (EPOLLET | EPOLLOUT))
- clear_et_mask |= EPOLLOUT;
- }
- else if (ready < 0)
- {
- add_event = 1;
- switch (ready)
- {
- case VPPCOM_ECONNRESET:
- events[num_ev].events |= EPOLLHUP;
- break;
-
- default:
- events[num_ev].events |= EPOLLERR;
- break;
- }
- }
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
}
- if (add_event)
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session_events))
+ break;
+
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (connected_msg);
+ /* Generate EPOLLOUT because there's no connected event */
+ sid = vcl_session_get_index_from_handle (connected_msg->handle);
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ session = vcl_session_get (sid);
+ session_events = session->vep.ev.events;
+ if (EPOLLOUT & session_events)
{
- events[num_ev].data.u64 = session_ev_data;
- if (EPOLLONESHOT & session_events)
- {
- VCL_SESSION_LOCK_AND_GET (sid, &session);
- session->vep.ev.events = 0;
- VCL_SESSION_UNLOCK ();
- }
- num_ev++;
- if (num_ev == maxevents)
- {
- VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
- vep_session->wait_cont_idx = next_sid;
- VCL_SESSION_UNLOCK ();
- goto done;
- }
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
}
- if (wait_cont_idx != ~0)
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ session = vcl_session_get (sid);
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+ break;
+ default:
+ clib_warning ("unhandled: %u", e->event_type);
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+
+ svm_msg_q_free_msg (mq, &msg);
+
+ if (add_event)
+ {
+ events[*num_ev].data.u64 = session_evt_data;
+ if (EPOLLONESHOT & session_events)
{
- if (next_sid == ~0)
- next_sid = vep_next_sid;
- else if (next_sid == wait_cont_idx)
- next_sid = ~0;
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ session = vcl_session_get (sid);
+ session->vep.ev.events = 0;
+ clib_spinlock_unlock (&vcm->sessions_lockp);
}
+ *num_ev += 1;
+ if (*num_ev == maxevents)
+ break;
}
- if (wait_for_time != -1)
- keep_trying = (clib_time_now (&vcm->clib_time) <= timeout) ? 1 : 0;
}
- while ((num_ev == 0) && keep_trying);
+ return *num_ev;
+}
- if (wait_cont_idx != ~0)
+int
+vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
+ int maxevents, double wait_for_time)
+{
+ vcl_cut_through_registration_t *cr;
+ vcl_session_t *vep_session;
+ double total_wait = 0, wait_slice;
+ u32 num_ev = 0;
+
+ if (PREDICT_FALSE (maxevents <= 0))
{
- VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
- vep_session->wait_cont_idx = ~0;
- VCL_SESSION_UNLOCK ();
+ clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
+ getpid (), maxevents);
+ return VPPCOM_EINVAL;
}
-done:
- return (rv != VPPCOM_OK) ? rv : num_ev;
+
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ vep_session = vcl_session_get (vep_idx);
+ if (PREDICT_FALSE (!vep_session->is_vep))
+ {
+ clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
+ getpid (), vep_idx);
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+ return VPPCOM_EINVAL;
+ }
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+
+ memset (events, 0, sizeof (*events) * maxevents);
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
+
+ do
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
+ }));
+ /* *INDENT-ON* */
+
+ vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
+ num_ev ? 0 : wait_slice, &num_ev);
+ total_wait += wait_slice;
+ if (num_ev)
+ return num_ev;
+ }
+ while (total_wait < wait_for_time);
+
+ return num_ev;
}
int
@@ -2062,7 +2352,7 @@ vppcom_session_attr (uint32_t session_index, uint32_t op,
switch (op)
{
case VPPCOM_ATTR_GET_NREAD:
- rv = vppcom_session_read_ready (session, session_index);
+ rv = vppcom_session_read_ready (session);
VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
getpid (), rv);
break;
@@ -2678,7 +2968,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
if (POLLIN & vp[i].events)
{
VCL_SESSION_LOCK_AND_GET (vp[i].sid, &session);
- rv = vppcom_session_read_ready (session, vp[i].sid);
+ rv = vppcom_session_read_ready (session);
VCL_SESSION_UNLOCK ();
if (rv > 0)
{