summaryrefslogtreecommitdiffstats
path: root/src/vcl/vppcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vcl/vppcom.c')
-rw-r--r--src/vcl/vppcom.c432
1 files changed, 230 insertions, 202 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index c7cd1e992b4..bb248699fd9 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -21,6 +21,7 @@
#include <vpp/api/vpe_msg_enum.h>
#include <vnet/session/application_interface.h>
#include <vcl/vppcom.h>
+#include <vcl/vcl_event.h>
#include <vlib/unix/unix.h>
#include <vppinfra/vec_bootstrap.h>
#include <vppinfra/elog.h>
@@ -188,6 +189,20 @@ typedef struct vppcom_cfg_t_
u8 *vpp_api_filename;
} vppcom_cfg_t;
+/* VPPCOM Event typedefs */
+typedef enum vcl_event_id_
+{
+ VCL_EVENT_CONNECT_REQ_ACCEPTED,
+ VCL_EVENT_N_EVENTS
+} vcl_event_id_t;
+
+typedef struct vce_event_connect_request_
+{
+ u8 size;
+ u8 handled;
+ u32 accepted_session_index;
+} vce_event_connect_request_t;
+
typedef struct vppcom_main_t_
{
u8 init;
@@ -227,7 +242,10 @@ typedef struct vppcom_main_t_
vppcom_cfg_t cfg;
- /* Event logging */
+ /* Event thread */
+ vce_event_thread_t event_thread;
+
+ /* VPP Event-logger */
elog_main_t elog_main;
elog_track_t elog_track;
@@ -333,6 +351,7 @@ vppcom_session_state_str (session_state_t state)
return st;
}
+
/*
* VPPCOM Utility Functions
*/
@@ -415,6 +434,68 @@ write_elog (void)
}
+/*
+ * VPPCOM Event Functions
+ */
+
+/**
+ * * @brief vce_connect_request_handler_fn
+ * - used for listener sessions
+ * - when a vl_api_accept_session_t_handler() generates an event
+ * this callback is alerted and sets fields that consumers such as
+ * vppcom_session_accept() expect to see, ie. accepted_client_index
+ *
+ * @param arg - void* to be cast to vce_event_handler_reg_t*
+ */
+void
+vce_connect_request_handler_fn (void *arg)
+{
+ vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
+
+ vce_event_connect_request_t *ecr;
+ vce_event_t *ev;
+
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+
+ ecr = (vce_event_connect_request_t *) ev->data;
+
+ pthread_mutex_lock (&reg->handler_lock);
+ ecr->handled = 1;
+ pthread_cond_signal (&reg->handler_cond);
+ pthread_mutex_unlock (&reg->handler_lock);
+}
+
+/**
+ * @brief vce_epoll_wait_connect_request_handler_fn
+ * - used by vppcom_epoll_xxxx() for listener sessions
+ * - when a vl_api_accept_session_t_handler() generates an event
+ * this callback is alerted and sets the fields that vppcom_epoll_wait()
+ * expects to see.
+ *
+ * @param arg - void* to be cast to vce_event_handler_reg_t*
+ */
+void
+vce_epoll_wait_connect_request_handler_fn (void *arg)
+{
+ vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
+ vce_event_t *ev;
+ /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+ vce_event_connect_request_t *ecr = (vce_event_connect_request_t *) ev->data;
+
+ /* Add the accepted_session_index to the FIFO */
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ clib_fifo_add1 (vcm->client_session_index_fifo,
+ ecr->accepted_session_index);
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+
+ /* Recycling the event. */
+ clib_spinlock_lock (&(vcm->event_thread.events_lockp));
+ vcm->event_thread.recycle_event = 1;
+ clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx);
+ clib_spinlock_unlock (&(vcm->event_thread.events_lockp));
+}
+
static int
vppcom_connect_to_vpp (char *app_name)
{
@@ -438,7 +519,7 @@ vppcom_connect_to_vpp (char *app_name)
else
{
vcm->vl_input_queue = am->shmem_hdr->vl_input_queue;
- vcm->my_client_index = am->my_client_index;
+ vcm->my_client_index = (u32) am->my_client_index;
vcm->app_state = STATE_APP_CONN_VPP;
if (VPPCOM_DEBUG > 0)
@@ -471,7 +552,7 @@ vppcom_connect_to_vpp (char *app_name)
u32 data;
} *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, vcm->elog_track);
- ed->data = rv;
+ ed->data = (u32) rv;
/* *INDENT-ON* */
}
return rv;
@@ -599,47 +680,6 @@ vppcom_wait_for_session_state_change (u32 session_index,
return VPPCOM_ETIMEDOUT;
}
-static inline int
-vppcom_wait_for_client_session_index (f64 wait_for_time)
-{
- f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
-
- do
- {
- if (clib_fifo_elts (vcm->client_session_index_fifo))
- return VPPCOM_OK;
- }
- while (clib_time_now (&vcm->clib_time) < timeout);
-
- if (wait_for_time == 0)
- return VPPCOM_EAGAIN;
-
- if (VPPCOM_DEBUG > 0)
- clib_warning ("VCL<%d>: timeout waiting for client_session_index",
- getpid ());
-
- if (VPPCOM_DEBUG > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "ERR: timeout waiting for session index :%d",
- .format_args = "i4",
- };
- struct
- {
- u32 data;
- } *ed;
-
- ed = ELOG_TRACK_DATA (&vcm->elog_main, e, vcm->elog_track);
-
- ed->data = getpid();
- /* *INDENT-ON* */
- }
-
- return VPPCOM_ETIMEDOUT;
-}
-
/*
* VPP-API message functions
*/
@@ -744,6 +784,7 @@ vppcom_app_attach (void)
getpid (), rv, vppcom_retval_str (rv));
return rv;
}
+
return VPPCOM_OK;
}
@@ -1235,6 +1276,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
svm_fifo_t *rx_fifo, *tx_fifo;
session_t *session, *listen_session;
u32 session_index;
+ vce_event_connect_request_t *ecr;
+ vce_event_t *ev;
+ int rv;
+ u32 ev_idx;
+
clib_spinlock_lock (&vcm->sessions_lockp);
if (!clib_fifo_free_elts (vcm->client_session_index_fifo))
@@ -1252,10 +1298,15 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
"unknown vpp listener handle %llx",
getpid (), mp->listener_handle);
+ vppcom_send_accept_session_reply (mp->handle, mp->context,
+ VNET_API_ERROR_INVALID_ARGUMENT);
clib_spinlock_unlock (&vcm->sessions_lockp);
return;
}
+ /* TODO check listener depth and update */
+ /* TODO on "child" fd close, update listener depth */
+
/* Allocate local session and set it up */
pool_get (vcm->sessions, session);
memset (session, 0, sizeof (*session));
@@ -1283,8 +1334,26 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
session->lcl_port = listen_session->lcl_port;
session->lcl_addr = listen_session->lcl_addr;
- /* TBD: move client_session_index_fifo into listener session */
- clib_fifo_add1 (vcm->client_session_index_fifo, session_index);
+ /* Create an event for handlers */
+
+ clib_spinlock_lock (&vcm->event_thread.events_lockp);
+
+ pool_get (vcm->event_thread.vce_events, ev);
+ ev->data = clib_mem_alloc (sizeof (vce_event_connect_request_t));
+ ev->refcnt = 0;
+ ev_idx = (u32) (ev - vcm->event_thread.vce_events);
+ ecr = ev->data;
+ ev->evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
+ ev->evk.session_index = (u32) (listen_session - vcm->sessions);
+ ecr->handled = 0;
+ ecr->accepted_session_index = session_index;
+
+ clib_spinlock_unlock (&vcm->event_thread.events_lockp);
+
+ rv = vce_generate_event (&vcm->event_thread, ev_idx);
+
+ ASSERT (rv == 0);
if (VPPCOM_DEBUG > 1)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: client accept "
@@ -1336,124 +1405,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
}
static void
-vppcom_send_connect_session_reply (session_t * session, u32 session_index,
- u64 vpp_handle, u32 context, int retval)
-{
- vl_api_connect_session_reply_t *rmp;
- u32 len;
- svm_queue_t *client_q;
-
- rmp = vl_msg_api_alloc (sizeof (*rmp));
- memset (rmp, 0, sizeof (*rmp));
- rmp->_vl_msg_id = ntohs (VL_API_CONNECT_SESSION_REPLY);
-
- if (!session)
- {
- rmp->context = context;
- rmp->handle = vpp_handle;
- rmp->retval = htonl (retval);
- vl_msg_api_send_shmem (vcm->vl_input_queue, (u8 *) & rmp);
- return;
- }
-
- rmp->context = session->client_context;
- rmp->retval = htonl (retval);
- rmp->handle = session->vpp_handle;
- rmp->server_rx_fifo = pointer_to_uword (session->rx_fifo);
- rmp->server_tx_fifo = pointer_to_uword (session->tx_fifo);
- rmp->vpp_event_queue_address = pointer_to_uword (session->vpp_event_queue);
- rmp->segment_size = vcm->cfg.segment_size;
- len = vec_len (session->segment_name);
- rmp->segment_name_length = clib_min (len, sizeof (rmp->segment_name));
- clib_memcpy (rmp->segment_name, session->segment_name,
- rmp->segment_name_length - 1);
- clib_memcpy (rmp->lcl_ip, session->peer_addr.ip46.as_u8,
- sizeof (rmp->lcl_ip));
- rmp->is_ip4 = session->peer_addr.is_ip4;
- rmp->lcl_port = session->peer_port;
- client_q = uword_to_pointer (session->client_queue_address, svm_queue_t *);
- ASSERT (client_q);
- vl_msg_api_send_shmem (client_q, (u8 *) & rmp);
-}
-
-/*
- * Acting as server for redirected connect requests
- */
-static void
-vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
-{
- u32 session_index;
- session_t *session = 0;
-
- clib_spinlock_lock (&vcm->sessions_lockp);
- if (!clib_fifo_free_elts (vcm->client_session_index_fifo))
- {
- clib_spinlock_unlock (&vcm->sessions_lockp);
-
- if (VPPCOM_DEBUG > 1)
- clib_warning ("VCL<%d>: client session queue is full!", getpid ());
-
- /* TBD: Fix api to include vpp handle */
- vppcom_send_connect_session_reply (0 /* session */ , 0 /* sid */ ,
- 0 /* handle */ , mp->context,
- VNET_API_ERROR_QUEUE_FULL);
- return;
- }
-
- pool_get (vcm->sessions, session);
- memset (session, 0, sizeof (*session));
- session_index = session - vcm->sessions;
-
- session->client_context = mp->context;
- session->vpp_handle = session_index;
- session->client_queue_address = mp->client_queue_address;
- session->lcl_port = mp->port;
- session->lcl_addr.is_ip4 = mp->is_ip4;
- clib_memcpy (&session->lcl_addr.ip46, mp->ip,
- sizeof (session->lcl_addr.ip46));
-
- /* TBD: missing peer info in api msg.
- */
- session->peer_addr.is_ip4 = mp->is_ip4;
- ASSERT (session->lcl_addr.is_ip4 == session->peer_addr.is_ip4);
-
- session->state = STATE_ACCEPT;
- clib_fifo_add1 (vcm->client_session_index_fifo, session_index);
- if (VPPCOM_DEBUG > 1)
- clib_warning ("VCL<%d>: sid %u: Got a cut-thru connect request! "
- "clib_fifo_elts %u!\n", getpid (), session_index,
- clib_fifo_elts (vcm->client_session_index_fifo));
-
- if (VPPCOM_DEBUG > 0)
- {
- session->elog_track.name =
- (char *) format (0, "C:%d:S:%d%c", vcm->my_client_index,
- session_index, 0);
- elog_track_register (&vcm->elog_main, &session->elog_track);
-
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "cut-thru-connect:S:%d clib_fifo_elts:%d",
- .format_args = "i4i4",
- };
-
- struct
- {
- u32 data[2];
- } *ed;
-
- ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
-
- ed->data[0] = session_index;
- ed->data[1] = clib_fifo_elts (vcm->client_session_index_fifo);
- /* *INDENT-ON* */
- }
-
- clib_spinlock_unlock (&vcm->sessions_lockp);
-}
-
-static void
vppcom_send_bind_sock (session_t * session, u32 session_index)
{
vl_api_bind_sock_t *bmp;
@@ -1603,7 +1554,6 @@ _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
_(BIND_SOCK_REPLY, bind_sock_reply) \
_(UNBIND_SOCK_REPLY, unbind_sock_reply) \
_(ACCEPT_SESSION, accept_session) \
-_(CONNECT_SOCK, connect_sock) \
_(CONNECT_SESSION_REPLY, connect_session_reply) \
_(DISCONNECT_SESSION, disconnect_session) \
_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
@@ -2312,6 +2262,8 @@ vppcom_app_create (char *app_name)
if (vcm->my_client_index == ~0)
{
+
+ /* API hookup and connect to VPP */
vppcom_api_hookup ();
vcm->app_state = STATE_APP_START;
rv = vppcom_connect_to_vpp (app_name);
@@ -2322,6 +2274,11 @@ vppcom_app_create (char *app_name)
return rv;
}
+ /* State event handling thread */
+
+ rv = vce_start_event_thread (&(vcm->event_thread), 20);
+
+
if (VPPCOM_DEBUG > 0)
clib_warning ("VCL<%d>: sending session enable", getpid ());
@@ -2645,7 +2602,7 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
{
if (session->lcl_addr.is_ip4)
{
- /* *INDENT-OFF* */
+ /* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "bind local:%s:%d.%d.%d.%d:%d ",
@@ -2658,7 +2615,7 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
u8 proto;
u8 addr[4];
u16 port;
- }) * ed;
+ }) *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
ed->proto = session->proto;
@@ -2740,64 +2697,108 @@ done:
}
int
-vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
- uint32_t flags)
+validate_args_session_accept_ (session_t * listen_session)
{
- session_t *listen_session = 0;
- session_t *client_session = 0;
- u32 client_session_index = ~0;
- int rv;
- f64 wait_for;
- u64 listen_vpp_handle;
-
- VCL_LOCK_AND_GET_SESSION (listen_session_index, &listen_session);
+ u32 listen_session_index = listen_session - vcm->sessions;
+ /* Input validation - expects spinlock on sessions_lockp */
if (listen_session->is_vep)
{
- clib_spinlock_unlock (&vcm->sessions_lockp);
clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
"epoll session!", getpid (), listen_session_index);
- rv = VPPCOM_EBADFD;
- goto done;
+ return VPPCOM_EBADFD;
}
- listen_vpp_handle = listen_session->vpp_handle;
if (listen_session->state != STATE_LISTEN)
{
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"not in listen state! state 0x%x (%s)", getpid (),
- listen_vpp_handle, listen_session_index,
+ listen_session->vpp_handle, listen_session_index,
listen_session->state,
vppcom_session_state_str (listen_session->state));
+ return VPPCOM_EBADFD;
+ }
+ return VPPCOM_OK;
+}
+
+int
+vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
+ uint32_t flags)
+{
+ session_t *listen_session = 0;
+ session_t *client_session = 0;
+ u32 client_session_index = ~0;
+ int rv;
+ u64 listen_vpp_handle;
+ vce_event_handler_reg_t *reg;
+ vce_event_t *ev;
+ vce_event_connect_request_t *result;
+ struct timespec ts;
+ struct timeval tv;
+ int millisecond_timeout = 1;
+ int hours_timeout = 20 * 60 * 60;
+
+ VCL_LOCK_AND_GET_SESSION (listen_session_index, &listen_session);
+ listen_vpp_handle = listen_session->vpp_handle; // For debugging
+
+ rv = validate_args_session_accept_ (listen_session);
+ if (rv)
+ {
clib_spinlock_unlock (&vcm->sessions_lockp);
- rv = VPPCOM_EBADFD;
goto done;
}
- wait_for = (VCL_SESS_ATTR_TEST (listen_session->attr,
- VCL_SESS_ATTR_NONBLOCK))
- ? 0 : vcm->cfg.accept_timeout;
+
+ /* Using an aggressive timer of 1ms and a generous timer of
+ * 20 hours, we can implement a blocking and non-blocking listener
+ * as both event and time driven */
+ gettimeofday (&tv, NULL);
+ ts.tv_nsec = (tv.tv_usec * 1000) + (1000 * millisecond_timeout);
+ ts.tv_sec = tv.tv_sec;
+
+ /* Predict that the Listener is blocking more often than not */
+ if (PREDICT_TRUE (!VCL_SESS_ATTR_TEST (listen_session->attr,
+ VCL_SESS_ATTR_NONBLOCK)))
+ ts.tv_sec += hours_timeout;
clib_spinlock_unlock (&vcm->sessions_lockp);
- while (1)
+ /* Register handler for connect_request event on listen_session_index */
+ vce_event_key_t evk;
+ evk.session_index = listen_session_index;
+ evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ reg = vce_register_handler (&vcm->event_thread, &evk,
+ vce_connect_request_handler_fn);
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+
+ result = (vce_event_connect_request_t *) ev->data;
+ pthread_mutex_lock (&reg->handler_lock);
+ while (!result->handled)
{
- rv = vppcom_wait_for_client_session_index (wait_for);
- if (rv)
+ rv =
+ pthread_cond_timedwait (&reg->handler_cond, &reg->handler_lock, &ts);
+ if (rv == ETIMEDOUT)
{
- if ((VPPCOM_DEBUG > 0))
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "accept failed! returning %d (%s)", getpid (),
- listen_vpp_handle, listen_session_index,
- rv, vppcom_retval_str (rv));
- if (wait_for == 0)
- goto done;
+ rv = VPPCOM_EAGAIN;
+ goto cleanup;
}
- else
- break;
}
+ client_session_index = result->accepted_session_index;
+
+
+ /* Remove from the FIFO used to service epoll */
clib_spinlock_lock (&vcm->sessions_lockp);
- clib_fifo_sub1 (vcm->client_session_index_fifo, client_session_index);
+ if (clib_fifo_elts (vcm->client_session_index_fifo))
+ {
+ u32 tmp_client_session_index;
+ clib_fifo_sub1 (vcm->client_session_index_fifo,
+ tmp_client_session_index);
+ if (tmp_client_session_index != client_session_index)
+ clib_fifo_add1 (vcm->client_session_index_fifo,
+ tmp_client_session_index);
+ }
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+
rv = vppcom_session_at_index (client_session_index, &client_session);
if (PREDICT_FALSE (rv))
{
@@ -2887,7 +2888,7 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
u32 session;
u8 addr[4];
u16 port;
- }) * ed2;
+ }) *ed2;
ed2 =
ELOG_TRACK_DATA (&vcm->elog_main, e2, client_session->elog_track);
@@ -2903,6 +2904,11 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
clib_spinlock_unlock (&vcm->sessions_lockp);
rv = (int) client_session_index;
+
+ vce_clear_event (&vcm->event_thread, ev);
+cleanup:
+ vce_unregister_handler (&vcm->event_thread, ev);
+ pthread_mutex_unlock (&reg->handler_lock);
done:
return rv;
}
@@ -3129,7 +3135,9 @@ vppcom_session_read_ready (session_t * session, u32 session_index)
}
if (session->state & STATE_LISTEN)
- ready = clib_fifo_elts (vcm->client_session_index_fifo);
+ {
+ ready = clib_fifo_elts (vcm->client_session_index_fifo);
+ }
else
{
if (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN | STATE_LISTEN)))
@@ -3686,6 +3694,8 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
{
session_t *vep_session;
session_t *session;
+ vce_event_handler_reg_t *reg = 0;
+ vce_event_t *ev = 0;
int rv;
if (vep_idx == session_index)
@@ -3761,6 +3771,17 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
session->is_vep = 0;
session->is_vep_session = 1;
vep_session->vep.next_sid = session_index;
+
+ /* VCL Event Register handler */
+ if (session->state & STATE_LISTEN)
+ {
+ /* Register handler for connect_request event on listen_session_index */
+ vce_event_key_t evk;
+ evk.session_index = session_index;
+ evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ reg = vce_register_handler (&vcm->event_thread, &evk,
+ vce_epoll_wait_connect_request_handler_fn);
+ }
if (VPPCOM_DEBUG > 1)
clib_warning ("VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, "
"sid %u, events 0x%x, data 0x%llx!",
@@ -3839,6 +3860,13 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
goto done;
}
+ /* VCL Event Un-register handler */
+ if ((session->state & STATE_LISTEN) && reg)
+ {
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+ vce_unregister_handler (&vcm->event_thread, ev);
+ }
+
vep_session->wait_cont_idx =
(vep_session->wait_cont_idx == session_index) ?
session->vep.next_sid : vep_session->wait_cont_idx;
@@ -4460,7 +4488,7 @@ vppcom_session_attr (uint32_t session_index, uint32_t op,
};
CLIB_PACKED (struct {
i32 data;
- }) * ed;
+ }) *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
ed->data = session->libc_epfd;