aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKeith Burns (alagalah) <alagalah@gmail.com>2018-02-23 10:17:01 -0800
committerDave Wallace <dwallacelf@gmail.com>2018-03-05 18:57:10 +0000
commit3cf2d6403dd90083fe2c678c03565f6f483de9e3 (patch)
tree9e17bf84e49338b8b3abd3b64e0e69358c72408b
parentc7fe4f39bca709a9ca094ffd4465490fa780a576 (diff)
VCL async event handler
- provides async handling of events such as accept/connect Change-Id: Id95947237ef16629371b3c99822059d423e2f918 Signed-off-by: Keith Burns (alagalah) <alagalah@gmail.com>
-rw-r--r--src/vcl.am2
-rw-r--r--src/vcl/vcl_event.c267
-rw-r--r--src/vcl/vcl_event.h154
-rw-r--r--src/vcl/vppcom.c432
4 files changed, 653 insertions, 202 deletions
diff --git a/src/vcl.am b/src/vcl.am
index 5f48f6df168..9f1325ecaa2 100644
--- a/src/vcl.am
+++ b/src/vcl.am
@@ -22,6 +22,7 @@ libvppcom_la_DEPENDENCIES = \
libvppcom_la_LIBADD = $(libvppcom_la_DEPENDENCIES) -lpthread -lrt -ldl
libvppcom_la_SOURCES += \
+ vcl/vcl_event.c \
vcl/vppcom.c \
$(libvppinfra_la_SOURCES) \
$(libvlib_la_SOURCES) \
@@ -29,6 +30,7 @@ libvppcom_la_SOURCES += \
$(libvlibmemoryclient_la_SOURCES)
nobase_include_HEADERS += \
+ vcl/vcl_event.h \
vcl/vppcom.h
libvcl_ldpreload_la_LIBADD = $(libvppcom_la_DEPENDENCIES) -lpthread -lrt -ldl
diff --git a/src/vcl/vcl_event.c b/src/vcl/vcl_event.c
new file mode 100644
index 00000000000..b706a93e56a
--- /dev/null
+++ b/src/vcl/vcl_event.c
@@ -0,0 +1,267 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vppinfra/fifo.h>
+#include <vppinfra/pool.h>
+#include <vppinfra/hash.h>
+#include <vnet/api_errno.h>
+
+#include <vcl/vcl_event.h>
+/**
+ * @file
+ * @brief VPP Communications Library (VCL) event handler.
+ *
+ * Definitions for generic event handling in VCL.
+ */
+
+int
+vce_generate_event (vce_event_thread_t *evt, u32 ev_idx)
+{
+ int elts, rv = 0;
+ vce_event_t *p;
+
+ pthread_mutex_lock (&(evt->generator_lock));
+
+ /* Check there is event data for this event */
+
+ clib_spinlock_lock (&(evt->events_lockp));
+ p = pool_elt_at_index (evt->vce_events, ev_idx);
+ ASSERT(p);
+
+ elts = (int) clib_fifo_free_elts (evt->event_index_fifo);
+ if (PREDICT_TRUE (elts))
+ {
+ /* Add event to queue */
+ clib_fifo_add1 (evt->event_index_fifo, ev_idx);
+ pthread_cond_signal (&(evt->generator_cond));
+ }
+ else
+ {
+ rv = VNET_API_ERROR_QUEUE_FULL;
+ }
+
+ clib_spinlock_unlock (&(evt->events_lockp));
+ pthread_mutex_unlock (&(evt->generator_lock));
+
+ return rv;
+}
+
+void
+vce_clear_event (vce_event_thread_t *evt, vce_event_t *ev)
+{
+ clib_spinlock_lock (&(evt->events_lockp));
+ pool_put (evt->vce_events, ev);
+ clib_spinlock_unlock (&(evt->events_lockp));
+}
+
+vce_event_t *
+vce_get_event_from_index(vce_event_thread_t *evt, u32 ev_idx)
+{
+ vce_event_t *ev;
+
+ clib_spinlock_lock (&(evt->events_lockp));
+ ev = pool_elt_at_index (evt->vce_events, ev_idx);
+ clib_spinlock_unlock (&(evt->events_lockp));
+
+ return ev;
+
+}
+
+vce_event_handler_reg_t *
+vce_register_handler (vce_event_thread_t *evt, vce_event_key_t *evk,
+ vce_event_callback_t cb)
+{
+ vce_event_handler_reg_t *handler;
+ vce_event_handler_reg_t *old_handler = 0;
+ uword *p;
+ u32 handler_index;
+ u64 adj_key;
+
+ /* TODO - multiple handler support. For now we can replace
+ * and re-instate, which is useful for event recycling */
+
+ adj_key = evk->as_u64 | (1LL << 63); //evk can be 0, which won't hash
+
+ clib_spinlock_lock (&evt->handlers_lockp);
+
+ p = hash_get (evt->handlers_index_by_event_key, adj_key);
+ if (p)
+ {
+ old_handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
+ /* If we are just re-registering, ignore and move on
+ * else store the old handler_fn for unregister to re-instate */
+ if (old_handler->handler_fn == cb)
+ {
+
+ clib_spinlock_unlock (&evt->handlers_lockp);
+
+ /* Signal event thread that a handler exists in case any
+ * recycled events requiring this handler are pending */
+ pthread_mutex_lock (&(evt->generator_lock));
+ pthread_cond_signal (&(evt->generator_cond));
+ pthread_mutex_unlock (&(evt->generator_lock));
+ return old_handler;
+ }
+ }
+
+ pool_get (evt->vce_event_handlers, handler);
+ handler_index = (u32) (handler - evt->vce_event_handlers);
+
+ handler->handler_fn = cb;
+ handler->replaced_handler_idx = (p) ? p[0] : ~0;
+
+ hash_set (evt->handlers_index_by_event_key, adj_key, handler_index);
+
+ pthread_cond_init (&(handler->handler_cond), NULL);
+ pthread_mutex_init (&(handler->handler_lock), NULL);
+
+ clib_spinlock_unlock (&evt->handlers_lockp);
+
+ /* Signal event thread that a new handler exists in case any
+ * recycled events requiring this handler are pending */
+ pthread_mutex_lock (&(evt->generator_lock));
+ pthread_cond_signal (&(evt->generator_cond));
+ pthread_mutex_unlock (&(evt->generator_lock));
+
+ return handler;
+}
+
+int
+vce_unregister_handler (vce_event_thread_t *evt, vce_event_t *ev)
+{
+ vce_event_handler_reg_t *handler;
+ uword *p;
+ u64 adj_key = ev->evk.as_u64 | (1LL << 63);
+ u8 generate_signal = 0;
+
+ clib_spinlock_lock (&evt->handlers_lockp);
+
+ p = hash_get (evt->handlers_index_by_event_key, adj_key);
+ if (!p)
+ {
+ clib_spinlock_unlock (&evt->handlers_lockp);
+
+ return VNET_API_ERROR_NO_SUCH_ENTRY;
+ }
+
+ handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
+
+ /* If this handler replaced another handler, re-instate it */
+ if (handler->replaced_handler_idx != ~0)
+ {
+ hash_set (evt->handlers_index_by_event_key, adj_key,
+ handler->replaced_handler_idx);
+ generate_signal = 1;
+ }
+ else
+ {
+ hash_unset (evt->handlers_index_by_event_key, adj_key);
+ }
+
+ pthread_mutex_destroy (&(handler->handler_lock));
+ pthread_cond_destroy (&(handler->handler_cond));
+ pool_put (evt->vce_event_handlers, handler);
+
+ clib_spinlock_unlock (&evt->handlers_lockp);
+
+ if (generate_signal)
+ {
+ /* Signal event thread that a new handler exists in case any
+ * recycled events requiring this handler are pending */
+ pthread_mutex_lock (&(evt->generator_lock));
+ pthread_cond_signal (&(evt->generator_cond));
+ pthread_mutex_unlock (&(evt->generator_lock));
+ }
+
+ return 0;
+}
+
+void *
+vce_event_thread_fn (void *arg)
+{
+ vce_event_thread_t *evt = (vce_event_thread_t *) arg;
+ vce_event_t *ev;
+ u32 ev_idx;
+ vce_event_handler_reg_t *handler;
+ uword *p;
+ u64 adj_key;
+
+ evt->recycle_event = 1; // Used for recycling events with no handlers
+
+
+ do
+ {
+ pthread_mutex_lock (&(evt->generator_lock));
+ while ( (clib_fifo_elts (evt->event_index_fifo) == 0) ||
+ evt->recycle_event)
+ {
+ evt->recycle_event = 0;
+ pthread_cond_wait (&(evt->generator_cond),
+ &(evt->generator_lock));
+ }
+
+ /* Remove event */
+ clib_spinlock_lock (&(evt->events_lockp));
+
+ clib_fifo_sub1 (evt->event_index_fifo, ev_idx);
+ ev = pool_elt_at_index (evt->vce_events, ev_idx);
+
+ clib_spinlock_unlock (&(evt->events_lockp));
+
+ ASSERT(ev);
+ adj_key = ev->evk.as_u64 | (1LL << 63);
+
+ clib_spinlock_lock (&evt->handlers_lockp);
+
+ p = hash_get (evt->handlers_index_by_event_key, adj_key);
+ if (!p)
+ {
+ /* If an event falls in the woods, and there is no handler to hear it,
+ * does it make any sound?
+ * I don't know either, so lets try recycling the event */
+ clib_fifo_add1 (evt->event_index_fifo, ev_idx);
+ evt->recycle_event = 1;
+ clib_spinlock_unlock (&evt->handlers_lockp);
+ goto unlock;
+ }
+ handler = pool_elt_at_index (evt->vce_event_handlers, p[0]);
+ handler->ev_idx = ev_idx;
+
+ clib_spinlock_unlock (&evt->handlers_lockp);
+
+ (handler->handler_fn)(handler);
+
+ unlock:
+ pthread_mutex_unlock (&(evt->generator_lock));
+ }
+ while (1);
+ return NULL;
+}
+
+int
+vce_start_event_thread (vce_event_thread_t *evt, u8 max_events)
+{
+ clib_fifo_validate (evt->event_index_fifo, max_events);
+ evt->handlers_index_by_event_key = hash_create (0, sizeof (uword));
+
+ pthread_cond_init (&(evt->generator_cond), NULL);
+ pthread_mutex_init (&(evt->generator_lock), NULL);
+
+ clib_spinlock_init (&(evt->events_lockp));
+ clib_spinlock_init (&(evt->handlers_lockp));
+
+ return pthread_create (&(evt->thread), NULL /* attr */ ,
+ vce_event_thread_fn, evt);
+} \ No newline at end of file
diff --git a/src/vcl/vcl_event.h b/src/vcl/vcl_event.h
new file mode 100644
index 00000000000..9380f73f77d
--- /dev/null
+++ b/src/vcl/vcl_event.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2018 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef VPP_VCL_EVENT_H
+#define VPP_VCL_EVENT_H
+
+/**
+ * @file
+ * @brief VPP Communications Library (VCL) event handler.
+ *
+ * Declarations for generic event handling in VCL.
+ */
+
+#include <vppinfra/types.h>
+#include <vppinfra/lock.h>
+#include <pthread.h>
+
+typedef union vce_event_key_
+{
+ struct {
+ u32 eid;
+ u32 session_index;
+ };
+ u64 as_u64;
+} vce_event_key_t;
+
+typedef struct vce_event_
+{
+ vce_event_key_t evk;
+ u32 refcnt;
+ void *data;
+} vce_event_t;
+
+typedef void (*vce_event_callback_t) (void *reg /*vce_event_handler_reg_t* */);
+
+typedef struct vce_event_handler_reg_
+{
+ vce_event_callback_t handler_fn;
+ pthread_mutex_t handler_lock;
+ pthread_cond_t handler_cond;
+ u32 ev_idx;
+ u32 replaced_handler_idx;
+} vce_event_handler_reg_t;
+
+typedef struct vce_event_thread_
+{
+ pthread_t thread;
+ pthread_mutex_t generator_lock;
+ pthread_cond_t generator_cond;
+ u32 *event_index_fifo;
+ u8 recycle_event;
+ clib_spinlock_t events_lockp;
+ vce_event_t *vce_events; //pool
+ clib_spinlock_t handlers_lockp;
+ vce_event_handler_reg_t *vce_event_handlers; //pool
+ uword *handlers_index_by_event_key; //hash
+} vce_event_thread_t;
+
+/**
+ * @brief vce_generate_event
+ * - used to trigger an event in the event thread so that registered
+ * handlers are notified
+ *
+ * @param evt - vce_event_thread_t - event system state
+ * @param ev_idx - index to vce_event_thread_t vce_event pool
+ *
+ * @return success/failure rv
+ */
+int vce_generate_event (vce_event_thread_t *evt, u32 ev_idx);
+
+/**
+ * @brief vce_clear_event()
+ * - removes event from event_pool
+ *
+ * @param evt - vce_event_thread_t - event system state
+ * @param ev - vce_event_t - event to remove
+ */
+void vce_clear_event (vce_event_thread_t *evt, vce_event_t *ev);
+
+/**
+ * @brief vce_get_event_from_index()
+ *
+ * @param evt - vce_event_thread_t - event system state
+ * @param ev_idx - index to vce_event_thread_t vce_event pool
+ *
+ * @return vce_event_t *
+ */
+vce_event_t * vce_get_event_from_index(vce_event_thread_t *evt, u32 ev_idx);
+
+/**
+ * @brief vce_register_handler
+ * - used by functions who need to be notified that an event has occurred
+ * on a vce_event_key_t (i.e. event type (enum) and sessionID)
+ * - if a handler already exists, the index to the old handler is stored
+ * inside the new handler for re-instatement on vce_unregister_handler()
+
+ * @param evt - vce_event_thread_t - event system state
+ * @param evk - vce_event_key_t current an eventID from enum in consumer and
+ * sessionID
+ * @param cb - vce_event_callback_t function to handle event
+ * @return vce_handler_reg_t - the function that needs event notification
+ * needs to block on a condvar mutex to reduce spin. That is in here.
+ */
+vce_event_handler_reg_t * vce_register_handler (vce_event_thread_t *evt,
+ vce_event_key_t *evk,
+ vce_event_callback_t cb);
+
+/**
+ * @brief vce_unregister_handler
+ * - used by functions to remove need to be notified that an event has occurred
+ * on a vce_event_key_t (i.e. event type (enum) and sessionID)
+ * - if this handler replaced an existing one, re-instate it.
+ *
+ * @param evt - vce_event_thread_t - event system state
+ * @param ev - vce_event_t - event to remove
+ * @return success/failure rv
+ */
+int vce_unregister_handler (vce_event_thread_t *evt, vce_event_t *ev);
+
+/**
+ * @brief vce_event_thread_fn
+ * - main event thread that waits on a generic condvar/mutex that a signal
+ * has been generated.
+ * - loops through all registered handlers for that vce_event_key_t
+ * (event enum + sessionID)
+ *
+ * @param arg - cast to type of event defined in consuming program.
+ * @return
+ */
+extern void * vce_event_thread_fn (void *arg);
+
+/**
+ * @brief vce_start_event_thread
+ * - as name suggests. What is important is that vce_event_thread_t is allocated
+ * on the same heap as "everything else". ie use clib_mem_alloc.
+ * @param evt - vce_event_thread_t - event system state
+ * @param max_events - depth of event FIFO for max number of outstanding events.
+ * @return succes/failure
+ */
+int vce_start_event_thread (vce_event_thread_t *evt, u8 max_events);
+
+#endif //VPP_VCL_EVENT_H
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index c7cd1e992b4..bb248699fd9 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -21,6 +21,7 @@
#include <vpp/api/vpe_msg_enum.h>
#include <vnet/session/application_interface.h>
#include <vcl/vppcom.h>
+#include <vcl/vcl_event.h>
#include <vlib/unix/unix.h>
#include <vppinfra/vec_bootstrap.h>
#include <vppinfra/elog.h>
@@ -188,6 +189,20 @@ typedef struct vppcom_cfg_t_
u8 *vpp_api_filename;
} vppcom_cfg_t;
+/* VPPCOM Event typedefs */
+typedef enum vcl_event_id_
+{
+ VCL_EVENT_CONNECT_REQ_ACCEPTED,
+ VCL_EVENT_N_EVENTS
+} vcl_event_id_t;
+
+typedef struct vce_event_connect_request_
+{
+ u8 size;
+ u8 handled;
+ u32 accepted_session_index;
+} vce_event_connect_request_t;
+
typedef struct vppcom_main_t_
{
u8 init;
@@ -227,7 +242,10 @@ typedef struct vppcom_main_t_
vppcom_cfg_t cfg;
- /* Event logging */
+ /* Event thread */
+ vce_event_thread_t event_thread;
+
+ /* VPP Event-logger */
elog_main_t elog_main;
elog_track_t elog_track;
@@ -333,6 +351,7 @@ vppcom_session_state_str (session_state_t state)
return st;
}
+
/*
* VPPCOM Utility Functions
*/
@@ -415,6 +434,68 @@ write_elog (void)
}
+/*
+ * VPPCOM Event Functions
+ */
+
+/**
+ * * @brief vce_connect_request_handler_fn
+ * - used for listener sessions
+ * - when a vl_api_accept_session_t_handler() generates an event
+ * this callback is alerted and sets fields that consumers such as
+ * vppcom_session_accept() expect to see, ie. accepted_client_index
+ *
+ * @param arg - void* to be cast to vce_event_handler_reg_t*
+ */
+void
+vce_connect_request_handler_fn (void *arg)
+{
+ vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
+
+ vce_event_connect_request_t *ecr;
+ vce_event_t *ev;
+
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+
+ ecr = (vce_event_connect_request_t *) ev->data;
+
+ pthread_mutex_lock (&reg->handler_lock);
+ ecr->handled = 1;
+ pthread_cond_signal (&reg->handler_cond);
+ pthread_mutex_unlock (&reg->handler_lock);
+}
+
+/**
+ * @brief vce_epoll_wait_connect_request_handler_fn
+ * - used by vppcom_epoll_xxxx() for listener sessions
+ * - when a vl_api_accept_session_t_handler() generates an event
+ * this callback is alerted and sets the fields that vppcom_epoll_wait()
+ * expects to see.
+ *
+ * @param arg - void* to be cast to vce_event_handler_reg_t*
+ */
+void
+vce_epoll_wait_connect_request_handler_fn (void *arg)
+{
+ vce_event_handler_reg_t *reg = (vce_event_handler_reg_t *) arg;
+ vce_event_t *ev;
+ /* Retrieve the VCL_EVENT_CONNECT_REQ_ACCEPTED event */
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+ vce_event_connect_request_t *ecr = (vce_event_connect_request_t *) ev->data;
+
+ /* Add the accepted_session_index to the FIFO */
+ clib_spinlock_lock (&vcm->sessions_lockp);
+ clib_fifo_add1 (vcm->client_session_index_fifo,
+ ecr->accepted_session_index);
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+
+ /* Recycling the event. */
+ clib_spinlock_lock (&(vcm->event_thread.events_lockp));
+ vcm->event_thread.recycle_event = 1;
+ clib_fifo_add1 (vcm->event_thread.event_index_fifo, reg->ev_idx);
+ clib_spinlock_unlock (&(vcm->event_thread.events_lockp));
+}
+
static int
vppcom_connect_to_vpp (char *app_name)
{
@@ -438,7 +519,7 @@ vppcom_connect_to_vpp (char *app_name)
else
{
vcm->vl_input_queue = am->shmem_hdr->vl_input_queue;
- vcm->my_client_index = am->my_client_index;
+ vcm->my_client_index = (u32) am->my_client_index;
vcm->app_state = STATE_APP_CONN_VPP;
if (VPPCOM_DEBUG > 0)
@@ -471,7 +552,7 @@ vppcom_connect_to_vpp (char *app_name)
u32 data;
} *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, vcm->elog_track);
- ed->data = rv;
+ ed->data = (u32) rv;
/* *INDENT-ON* */
}
return rv;
@@ -599,47 +680,6 @@ vppcom_wait_for_session_state_change (u32 session_index,
return VPPCOM_ETIMEDOUT;
}
-static inline int
-vppcom_wait_for_client_session_index (f64 wait_for_time)
-{
- f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
-
- do
- {
- if (clib_fifo_elts (vcm->client_session_index_fifo))
- return VPPCOM_OK;
- }
- while (clib_time_now (&vcm->clib_time) < timeout);
-
- if (wait_for_time == 0)
- return VPPCOM_EAGAIN;
-
- if (VPPCOM_DEBUG > 0)
- clib_warning ("VCL<%d>: timeout waiting for client_session_index",
- getpid ());
-
- if (VPPCOM_DEBUG > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "ERR: timeout waiting for session index :%d",
- .format_args = "i4",
- };
- struct
- {
- u32 data;
- } *ed;
-
- ed = ELOG_TRACK_DATA (&vcm->elog_main, e, vcm->elog_track);
-
- ed->data = getpid();
- /* *INDENT-ON* */
- }
-
- return VPPCOM_ETIMEDOUT;
-}
-
/*
* VPP-API message functions
*/
@@ -744,6 +784,7 @@ vppcom_app_attach (void)
getpid (), rv, vppcom_retval_str (rv));
return rv;
}
+
return VPPCOM_OK;
}
@@ -1235,6 +1276,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
svm_fifo_t *rx_fifo, *tx_fifo;
session_t *session, *listen_session;
u32 session_index;
+ vce_event_connect_request_t *ecr;
+ vce_event_t *ev;
+ int rv;
+ u32 ev_idx;
+
clib_spinlock_lock (&vcm->sessions_lockp);
if (!clib_fifo_free_elts (vcm->client_session_index_fifo))
@@ -1252,10 +1298,15 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
"unknown vpp listener handle %llx",
getpid (), mp->listener_handle);
+ vppcom_send_accept_session_reply (mp->handle, mp->context,
+ VNET_API_ERROR_INVALID_ARGUMENT);
clib_spinlock_unlock (&vcm->sessions_lockp);
return;
}
+ /* TODO check listener depth and update */
+ /* TODO on "child" fd close, update listener depth */
+
/* Allocate local session and set it up */
pool_get (vcm->sessions, session);
memset (session, 0, sizeof (*session));
@@ -1283,8 +1334,26 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
session->lcl_port = listen_session->lcl_port;
session->lcl_addr = listen_session->lcl_addr;
- /* TBD: move client_session_index_fifo into listener session */
- clib_fifo_add1 (vcm->client_session_index_fifo, session_index);
+ /* Create an event for handlers */
+
+ clib_spinlock_lock (&vcm->event_thread.events_lockp);
+
+ pool_get (vcm->event_thread.vce_events, ev);
+ ev->data = clib_mem_alloc (sizeof (vce_event_connect_request_t));
+ ev->refcnt = 0;
+ ev_idx = (u32) (ev - vcm->event_thread.vce_events);
+ ecr = ev->data;
+ ev->evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
+ ev->evk.session_index = (u32) (listen_session - vcm->sessions);
+ ecr->handled = 0;
+ ecr->accepted_session_index = session_index;
+
+ clib_spinlock_unlock (&vcm->event_thread.events_lockp);
+
+ rv = vce_generate_event (&vcm->event_thread, ev_idx);
+
+ ASSERT (rv == 0);
if (VPPCOM_DEBUG > 1)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: client accept "
@@ -1336,124 +1405,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
}
static void
-vppcom_send_connect_session_reply (session_t * session, u32 session_index,
- u64 vpp_handle, u32 context, int retval)
-{
- vl_api_connect_session_reply_t *rmp;
- u32 len;
- svm_queue_t *client_q;
-
- rmp = vl_msg_api_alloc (sizeof (*rmp));
- memset (rmp, 0, sizeof (*rmp));
- rmp->_vl_msg_id = ntohs (VL_API_CONNECT_SESSION_REPLY);
-
- if (!session)
- {
- rmp->context = context;
- rmp->handle = vpp_handle;
- rmp->retval = htonl (retval);
- vl_msg_api_send_shmem (vcm->vl_input_queue, (u8 *) & rmp);
- return;
- }
-
- rmp->context = session->client_context;
- rmp->retval = htonl (retval);
- rmp->handle = session->vpp_handle;
- rmp->server_rx_fifo = pointer_to_uword (session->rx_fifo);
- rmp->server_tx_fifo = pointer_to_uword (session->tx_fifo);
- rmp->vpp_event_queue_address = pointer_to_uword (session->vpp_event_queue);
- rmp->segment_size = vcm->cfg.segment_size;
- len = vec_len (session->segment_name);
- rmp->segment_name_length = clib_min (len, sizeof (rmp->segment_name));
- clib_memcpy (rmp->segment_name, session->segment_name,
- rmp->segment_name_length - 1);
- clib_memcpy (rmp->lcl_ip, session->peer_addr.ip46.as_u8,
- sizeof (rmp->lcl_ip));
- rmp->is_ip4 = session->peer_addr.is_ip4;
- rmp->lcl_port = session->peer_port;
- client_q = uword_to_pointer (session->client_queue_address, svm_queue_t *);
- ASSERT (client_q);
- vl_msg_api_send_shmem (client_q, (u8 *) & rmp);
-}
-
-/*
- * Acting as server for redirected connect requests
- */
-static void
-vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
-{
- u32 session_index;
- session_t *session = 0;
-
- clib_spinlock_lock (&vcm->sessions_lockp);
- if (!clib_fifo_free_elts (vcm->client_session_index_fifo))
- {
- clib_spinlock_unlock (&vcm->sessions_lockp);
-
- if (VPPCOM_DEBUG > 1)
- clib_warning ("VCL<%d>: client session queue is full!", getpid ());
-
- /* TBD: Fix api to include vpp handle */
- vppcom_send_connect_session_reply (0 /* session */ , 0 /* sid */ ,
- 0 /* handle */ , mp->context,
- VNET_API_ERROR_QUEUE_FULL);
- return;
- }
-
- pool_get (vcm->sessions, session);
- memset (session, 0, sizeof (*session));
- session_index = session - vcm->sessions;
-
- session->client_context = mp->context;
- session->vpp_handle = session_index;
- session->client_queue_address = mp->client_queue_address;
- session->lcl_port = mp->port;
- session->lcl_addr.is_ip4 = mp->is_ip4;
- clib_memcpy (&session->lcl_addr.ip46, mp->ip,
- sizeof (session->lcl_addr.ip46));
-
- /* TBD: missing peer info in api msg.
- */
- session->peer_addr.is_ip4 = mp->is_ip4;
- ASSERT (session->lcl_addr.is_ip4 == session->peer_addr.is_ip4);
-
- session->state = STATE_ACCEPT;
- clib_fifo_add1 (vcm->client_session_index_fifo, session_index);
- if (VPPCOM_DEBUG > 1)
- clib_warning ("VCL<%d>: sid %u: Got a cut-thru connect request! "
- "clib_fifo_elts %u!\n", getpid (), session_index,
- clib_fifo_elts (vcm->client_session_index_fifo));
-
- if (VPPCOM_DEBUG > 0)
- {
- session->elog_track.name =
- (char *) format (0, "C:%d:S:%d%c", vcm->my_client_index,
- session_index, 0);
- elog_track_register (&vcm->elog_main, &session->elog_track);
-
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "cut-thru-connect:S:%d clib_fifo_elts:%d",
- .format_args = "i4i4",
- };
-
- struct
- {
- u32 data[2];
- } *ed;
-
- ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
-
- ed->data[0] = session_index;
- ed->data[1] = clib_fifo_elts (vcm->client_session_index_fifo);
- /* *INDENT-ON* */
- }
-
- clib_spinlock_unlock (&vcm->sessions_lockp);
-}
-
-static void
vppcom_send_bind_sock (session_t * session, u32 session_index)
{
vl_api_bind_sock_t *bmp;
@@ -1603,7 +1554,6 @@ _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
_(BIND_SOCK_REPLY, bind_sock_reply) \
_(UNBIND_SOCK_REPLY, unbind_sock_reply) \
_(ACCEPT_SESSION, accept_session) \
-_(CONNECT_SOCK, connect_sock) \
_(CONNECT_SESSION_REPLY, connect_session_reply) \
_(DISCONNECT_SESSION, disconnect_session) \
_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
@@ -2312,6 +2262,8 @@ vppcom_app_create (char *app_name)
if (vcm->my_client_index == ~0)
{
+
+ /* API hookup and connect to VPP */
vppcom_api_hookup ();
vcm->app_state = STATE_APP_START;
rv = vppcom_connect_to_vpp (app_name);
@@ -2322,6 +2274,11 @@ vppcom_app_create (char *app_name)
return rv;
}
+ /* State event handling thread */
+
+ rv = vce_start_event_thread (&(vcm->event_thread), 20);
+
+
if (VPPCOM_DEBUG > 0)
clib_warning ("VCL<%d>: sending session enable", getpid ());
@@ -2645,7 +2602,7 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
{
if (session->lcl_addr.is_ip4)
{
- /* *INDENT-OFF* */
+ /* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "bind local:%s:%d.%d.%d.%d:%d ",
@@ -2658,7 +2615,7 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
u8 proto;
u8 addr[4];
u16 port;
- }) * ed;
+ }) *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
ed->proto = session->proto;
@@ -2740,64 +2697,108 @@ done:
}
int
-vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
- uint32_t flags)
+validate_args_session_accept_ (session_t * listen_session)
{
- session_t *listen_session = 0;
- session_t *client_session = 0;
- u32 client_session_index = ~0;
- int rv;
- f64 wait_for;
- u64 listen_vpp_handle;
-
- VCL_LOCK_AND_GET_SESSION (listen_session_index, &listen_session);
+ u32 listen_session_index = listen_session - vcm->sessions;
+ /* Input validation - expects spinlock on sessions_lockp */
if (listen_session->is_vep)
{
- clib_spinlock_unlock (&vcm->sessions_lockp);
clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
"epoll session!", getpid (), listen_session_index);
- rv = VPPCOM_EBADFD;
- goto done;
+ return VPPCOM_EBADFD;
}
- listen_vpp_handle = listen_session->vpp_handle;
if (listen_session->state != STATE_LISTEN)
{
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"not in listen state! state 0x%x (%s)", getpid (),
- listen_vpp_handle, listen_session_index,
+ listen_session->vpp_handle, listen_session_index,
listen_session->state,
vppcom_session_state_str (listen_session->state));
+ return VPPCOM_EBADFD;
+ }
+ return VPPCOM_OK;
+}
+
+int
+vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
+ uint32_t flags)
+{
+ session_t *listen_session = 0;
+ session_t *client_session = 0;
+ u32 client_session_index = ~0;
+ int rv;
+ u64 listen_vpp_handle;
+ vce_event_handler_reg_t *reg;
+ vce_event_t *ev;
+ vce_event_connect_request_t *result;
+ struct timespec ts;
+ struct timeval tv;
+ int millisecond_timeout = 1;
+ int hours_timeout = 20 * 60 * 60;
+
+ VCL_LOCK_AND_GET_SESSION (listen_session_index, &listen_session);
+ listen_vpp_handle = listen_session->vpp_handle; // For debugging
+
+ rv = validate_args_session_accept_ (listen_session);
+ if (rv)
+ {
clib_spinlock_unlock (&vcm->sessions_lockp);
- rv = VPPCOM_EBADFD;
goto done;
}
- wait_for = (VCL_SESS_ATTR_TEST (listen_session->attr,
- VCL_SESS_ATTR_NONBLOCK))
- ? 0 : vcm->cfg.accept_timeout;
+
+ /* Using an aggressive timer of 1ms and a generous timer of
+ * 20 hours, we can implement a blocking and non-blocking listener
+ * as both event and time driven */
+ gettimeofday (&tv, NULL);
+ ts.tv_nsec = (tv.tv_usec * 1000) + (1000 * millisecond_timeout);
+ ts.tv_sec = tv.tv_sec;
+
+ /* Predict that the Listener is blocking more often than not */
+ if (PREDICT_TRUE (!VCL_SESS_ATTR_TEST (listen_session->attr,
+ VCL_SESS_ATTR_NONBLOCK)))
+ ts.tv_sec += hours_timeout;
clib_spinlock_unlock (&vcm->sessions_lockp);
- while (1)
+ /* Register handler for connect_request event on listen_session_index */
+ vce_event_key_t evk;
+ evk.session_index = listen_session_index;
+ evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ reg = vce_register_handler (&vcm->event_thread, &evk,
+ vce_connect_request_handler_fn);
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+
+ result = (vce_event_connect_request_t *) ev->data;
+ pthread_mutex_lock (&reg->handler_lock);
+ while (!result->handled)
{
- rv = vppcom_wait_for_client_session_index (wait_for);
- if (rv)
+ rv =
+ pthread_cond_timedwait (&reg->handler_cond, &reg->handler_lock, &ts);
+ if (rv == ETIMEDOUT)
{
- if ((VPPCOM_DEBUG > 0))
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "accept failed! returning %d (%s)", getpid (),
- listen_vpp_handle, listen_session_index,
- rv, vppcom_retval_str (rv));
- if (wait_for == 0)
- goto done;
+ rv = VPPCOM_EAGAIN;
+ goto cleanup;
}
- else
- break;
}
+ client_session_index = result->accepted_session_index;
+
+
+ /* Remove from the FIFO used to service epoll */
clib_spinlock_lock (&vcm->sessions_lockp);
- clib_fifo_sub1 (vcm->client_session_index_fifo, client_session_index);
+ if (clib_fifo_elts (vcm->client_session_index_fifo))
+ {
+ u32 tmp_client_session_index;
+ clib_fifo_sub1 (vcm->client_session_index_fifo,
+ tmp_client_session_index);
+ if (tmp_client_session_index != client_session_index)
+ clib_fifo_add1 (vcm->client_session_index_fifo,
+ tmp_client_session_index);
+ }
+ clib_spinlock_unlock (&vcm->sessions_lockp);
+
rv = vppcom_session_at_index (client_session_index, &client_session);
if (PREDICT_FALSE (rv))
{
@@ -2887,7 +2888,7 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
u32 session;
u8 addr[4];
u16 port;
- }) * ed2;
+ }) *ed2;
ed2 =
ELOG_TRACK_DATA (&vcm->elog_main, e2, client_session->elog_track);
@@ -2903,6 +2904,11 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
clib_spinlock_unlock (&vcm->sessions_lockp);
rv = (int) client_session_index;
+
+ vce_clear_event (&vcm->event_thread, ev);
+cleanup:
+ vce_unregister_handler (&vcm->event_thread, ev);
+ pthread_mutex_unlock (&reg->handler_lock);
done:
return rv;
}
@@ -3129,7 +3135,9 @@ vppcom_session_read_ready (session_t * session, u32 session_index)
}
if (session->state & STATE_LISTEN)
- ready = clib_fifo_elts (vcm->client_session_index_fifo);
+ {
+ ready = clib_fifo_elts (vcm->client_session_index_fifo);
+ }
else
{
if (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN | STATE_LISTEN)))
@@ -3686,6 +3694,8 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
{
session_t *vep_session;
session_t *session;
+ vce_event_handler_reg_t *reg = 0;
+ vce_event_t *ev = 0;
int rv;
if (vep_idx == session_index)
@@ -3761,6 +3771,17 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
session->is_vep = 0;
session->is_vep_session = 1;
vep_session->vep.next_sid = session_index;
+
+ /* VCL Event Register handler */
+ if (session->state & STATE_LISTEN)
+ {
+ /* Register handler for connect_request event on listen_session_index */
+ vce_event_key_t evk;
+ evk.session_index = session_index;
+ evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
+ reg = vce_register_handler (&vcm->event_thread, &evk,
+ vce_epoll_wait_connect_request_handler_fn);
+ }
if (VPPCOM_DEBUG > 1)
clib_warning ("VCL<%d>: EPOLL_CTL_ADD: vep_idx %u, "
"sid %u, events 0x%x, data 0x%llx!",
@@ -3839,6 +3860,13 @@ vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index,
goto done;
}
+ /* VCL Event Un-register handler */
+ if ((session->state & STATE_LISTEN) && reg)
+ {
+ ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
+ vce_unregister_handler (&vcm->event_thread, ev);
+ }
+
vep_session->wait_cont_idx =
(vep_session->wait_cont_idx == session_index) ?
session->vep.next_sid : vep_session->wait_cont_idx;
@@ -4460,7 +4488,7 @@ vppcom_session_attr (uint32_t session_index, uint32_t op,
};
CLIB_PACKED (struct {
i32 data;
- }) * ed;
+ }) *ed;
ed = ELOG_TRACK_DATA (&vcm->elog_main, e, session->elog_track);
ed->data = session->libc_epfd;