aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2019-08-21 16:20:44 -0700
committerFlorin Coras <florin.coras@gmail.com>2019-08-27 18:21:39 +0000
commit458089bbad9cf5bef6cf8119f23fc44e66b36ad3 (patch)
tree4ee7da1148172ca6f8a878746c8bd48c1e2dc95e
parent7adaa226eaa2401d6bb0dfd38a0d943c9645d7dc (diff)
session: move ctrl messages from bapi to mq
Type:refactor Moves connect, disconnect, bind, unbind and app detach to message queue from binary api. Simplifies app/vcl interaction with the session layer since all session control messages are now handled over the mq. Add/del segment messages require internal C api changes which affect all builtin applications. They'll be moved in a different patch and might not be back portable to 19.08. Change-Id: I93f6d18e551b024effa75d47f5ff25f23ba8aff5 Signed-off-by: Florin Coras <fcoras@cisco.com>
-rw-r--r--src/vcl/vcl_bapi.c159
-rw-r--r--src/vcl/vcl_locked.c2
-rw-r--r--src/vcl/vcl_private.c6
-rw-r--r--src/vcl/vcl_private.h9
-rw-r--r--src/vcl/vppcom.c124
-rw-r--r--src/vnet/session/application.c6
-rw-r--r--src/vnet/session/application.h9
-rw-r--r--src/vnet/session/application_interface.h64
-rw-r--r--src/vnet/session/session.api94
-rw-r--r--src/vnet/session/session.c8
-rw-r--r--src/vnet/session/session.h32
-rwxr-xr-xsrc/vnet/session/session_api.c164
-rw-r--r--src/vnet/session/session_node.c372
-rw-r--r--src/vnet/session/session_types.h32
14 files changed, 808 insertions, 273 deletions
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index de64809d52a..cdfc2869edc 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -108,10 +108,10 @@ vcl_vpp_worker_segment_handle (u32 wrk_index)
}
static void
-vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
- mp)
+vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
{
vcl_worker_t *wrk = vcl_worker_get (0);
+ svm_msg_q_t *ctrl_mq;
u64 segment_handle;
int *fds = 0, i;
u32 n_fds = 0;
@@ -122,8 +122,11 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
goto failed;
}
- wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
- svm_msg_q_t *);
+ wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
+ ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
+ vec_validate (wrk->vpp_event_queues, mp->vpp_ctrl_mq_thread);
+ wrk->vpp_event_queues[mp->vpp_ctrl_mq_thread] = ctrl_mq;
+ wrk->ctrl_mq = ctrl_mq;
segment_handle = clib_net_to_host_u64 (mp->segment_handle);
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
{
@@ -254,17 +257,6 @@ failed:
}
static void
-vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
- mp)
-{
- if (mp->retval)
- clib_warning ("VCL<%d>: detach failed: %U", getpid (), format_api_error,
- ntohl (mp->retval));
-
- vcm->app_state = STATE_APP_ENABLED;
-}
-
-static void
vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
{
ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
@@ -305,50 +297,12 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
}
static void
-vl_api_bind_sock_reply_t_handler (vl_api_bind_sock_reply_t * mp)
-{
- /* Expecting a similar message on mq. So ignore this */
- VDBG (0, "bapi bind retval: %u!", mp->retval);
-}
-
-static void
-vl_api_unbind_sock_reply_t_handler (vl_api_unbind_sock_reply_t * mp)
-{
- if (mp->retval)
- VDBG (0, "ERROR: sid %u: unbind failed: %U", mp->context,
- format_api_error, ntohl (mp->retval));
-
- VDBG (1, "sid %u: unbind succeeded!", mp->context);
-
-}
-
-static void
-vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
- mp)
-{
- if (mp->retval)
- VDBG (0, "ERROR: sid %u: disconnect failed: %U", mp->context,
- format_api_error, ntohl (mp->retval));
-}
-
-static void
-vl_api_connect_sock_reply_t_handler (vl_api_connect_sock_reply_t * mp)
-{
- if (mp->retval)
- VDBG (0, "ERROR: connect failed: %U", format_api_error,
- ntohl (mp->retval));
-}
-
-static void
vl_api_application_tls_cert_add_reply_t_handler
(vl_api_application_tls_cert_add_reply_t * mp)
{
if (mp->retval)
- {
- clib_warning ("VCL<%d>: add cert failed: %U", getpid (),
- format_api_error, ntohl (mp->retval));
- return;
- }
+ VDBG (0, "add cert failed: %U", format_api_error, ntohl (mp->retval));
+ vcm->app_state = STATE_APP_READY;
}
static void
@@ -356,22 +310,13 @@ static void
(vl_api_application_tls_key_add_reply_t * mp)
{
if (mp->retval)
- {
- clib_warning ("VCL<%d>: add key failed: %U", getpid (),
- format_api_error, ntohl (mp->retval));
- return;
- }
-
+ VDBG (0, "add key failed: %U", format_api_error, ntohl (mp->retval));
+ vcm->app_state = STATE_APP_READY;
}
#define foreach_sock_msg \
_(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
-_(BIND_SOCK_REPLY, bind_sock_reply) \
-_(UNBIND_SOCK_REPLY, unbind_sock_reply) \
-_(CONNECT_SOCK_REPLY, connect_sock_reply) \
-_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
-_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
-_(APPLICATION_DETACH_REPLY, application_detach_reply) \
+_(APP_ATTACH_REPLY, app_attach_reply) \
_(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \
_(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \
_(MAP_ANOTHER_SEGMENT, map_another_segment) \
@@ -414,7 +359,7 @@ void
vppcom_app_send_attach (void)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
- vl_api_application_attach_t *bmp;
+ vl_api_app_attach_t *bmp;
u8 nsid_len = vec_len (vcm->cfg.namespace_id);
u8 app_is_proxy = (vcm->cfg.app_proxy_transport_tcp ||
vcm->cfg.app_proxy_transport_udp);
@@ -422,7 +367,7 @@ vppcom_app_send_attach (void)
bmp = vl_msg_api_alloc (sizeof (*bmp));
memset (bmp, 0, sizeof (*bmp));
- bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
+ bmp->_vl_msg_id = ntohs (VL_API_APP_ATTACH);
bmp->client_index = wrk->my_client_index;
bmp->context = htonl (0xfeedface);
bmp->options[APP_OPTIONS_FLAGS] =
@@ -505,80 +450,6 @@ vcl_send_child_worker_del (vcl_worker_t * child_wrk)
}
void
-vppcom_send_connect_sock (vcl_session_t * session)
-{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- vl_api_connect_sock_t *cmp;
-
- cmp = vl_msg_api_alloc (sizeof (*cmp));
- memset (cmp, 0, sizeof (*cmp));
- cmp->_vl_msg_id = ntohs (VL_API_CONNECT_SOCK);
- cmp->client_index = wrk->my_client_index;
- cmp->context = session->session_index;
- cmp->wrk_index = wrk->vpp_wrk_index;
- cmp->is_ip4 = session->transport.is_ip4;
- cmp->parent_handle = session->parent_handle;
- clib_memcpy_fast (cmp->ip, &session->transport.rmt_ip, sizeof (cmp->ip));
- cmp->port = session->transport.rmt_port;
- cmp->proto = session->session_type;
- vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & cmp);
-}
-
-void
-vppcom_send_disconnect_session (u64 vpp_handle)
-{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- vl_api_disconnect_session_t *dmp;
-
- dmp = vl_msg_api_alloc (sizeof (*dmp));
- memset (dmp, 0, sizeof (*dmp));
- dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
- dmp->client_index = wrk->my_client_index;
- dmp->handle = vpp_handle;
- vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & dmp);
-}
-
-/* VPP combines bind and listen as one operation. VCL manages the separation
- * of bind and listen locally via vppcom_session_bind() and
- * vppcom_session_listen() */
-void
-vppcom_send_bind_sock (vcl_session_t * session)
-{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- vl_api_bind_sock_t *bmp;
-
- /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
- bmp = vl_msg_api_alloc (sizeof (*bmp));
- memset (bmp, 0, sizeof (*bmp));
-
- bmp->_vl_msg_id = ntohs (VL_API_BIND_SOCK);
- bmp->client_index = wrk->my_client_index;
- bmp->context = session->session_index;
- bmp->wrk_index = wrk->vpp_wrk_index;
- bmp->is_ip4 = session->transport.is_ip4;
- clib_memcpy_fast (bmp->ip, &session->transport.lcl_ip, sizeof (bmp->ip));
- bmp->port = session->transport.lcl_port;
- bmp->proto = session->session_type;
- vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & bmp);
-}
-
-void
-vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle)
-{
- vl_api_unbind_sock_t *ump;
-
- ump = vl_msg_api_alloc (sizeof (*ump));
- memset (ump, 0, sizeof (*ump));
-
- ump->_vl_msg_id = ntohs (VL_API_UNBIND_SOCK);
- ump->client_index = wrk->my_client_index;
- ump->wrk_index = wrk->vpp_wrk_index;
- ump->handle = vpp_handle;
- ump->context = wrk->wrk_index;
- vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & ump);
-}
-
-void
vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
u32 cert_len)
{
@@ -593,7 +464,6 @@ vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
cert_mp->cert_len = clib_host_to_net_u16 (cert_len);
clib_memcpy_fast (cert_mp->cert, cert, cert_len);
vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & cert_mp);
-
}
void
@@ -611,7 +481,6 @@ vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
key_mp->key_len = clib_host_to_net_u16 (key_len);
clib_memcpy_fast (key_mp->key, key, key_len);
vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & key_mp);
-
}
u32
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 3b6817c6423..d69d391d626 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -311,7 +311,7 @@ vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
s = vcl_session_get (wrk, vls->session_index);
if (s->session_state != STATE_LISTEN)
return;
- vppcom_send_unbind_sock (wrk, s->vpp_handle);
+ vcl_send_session_unlisten (wrk, s);
s->session_state = STATE_LISTEN_NO_MQ;
vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
}
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index 13794ea4f9c..f431396dd93 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -276,6 +276,12 @@ vcl_worker_set_bapi (void)
return -1;
}
+svm_msg_q_t *
+vcl_worker_ctrl_mq (vcl_worker_t * wrk)
+{
+ return wrk->ctrl_mq;
+}
+
void
vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index)
{
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 43c8ec304ef..cd2544c2559 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -57,6 +57,7 @@ typedef enum
STATE_APP_ENABLED,
STATE_APP_ATTACHED,
STATE_APP_ADDING_WORKER,
+ STATE_APP_ADDING_TLS_DATA,
STATE_APP_FAILED,
STATE_APP_READY
} app_state_t;
@@ -251,6 +252,9 @@ typedef struct vcl_worker_
/** VPP binary api input queue */
svm_queue_t *vl_input_queue;
+ /** VPP mq to be used for exchanging control messages */
+ svm_msg_q_t *ctrl_mq;
+
/** Message queues epoll fd. Initialized only if using mqs with eventfds */
int mqs_epfd;
@@ -547,6 +551,7 @@ vcl_worker_t *vcl_worker_alloc_and_init (void);
void vcl_worker_cleanup (vcl_worker_t * wrk, u8 notify_vpp);
int vcl_worker_register_with_vpp (void);
int vcl_worker_set_bapi (void);
+svm_msg_q_t *vcl_worker_ctrl_mq (vcl_worker_t * wrk);
void vcl_flush_mq_events (void);
void vcl_cleanup_bapi (void);
@@ -603,10 +608,8 @@ void vppcom_init_error_string_table (void);
void vppcom_send_session_enable_disable (u8 is_enable);
void vppcom_app_send_attach (void);
void vppcom_app_send_detach (void);
-void vppcom_send_connect_sock (vcl_session_t * session);
+void vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s);
void vppcom_send_disconnect_session (u64 vpp_handle);
-void vppcom_send_bind_sock (vcl_session_t * session);
-void vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle);
void vppcom_api_hookup (void);
void vppcom_send_application_tls_cert_add (vcl_session_t * session,
char *cert, u32 cert_len);
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 0060922fea6..f56c02b6d9d 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -197,6 +197,98 @@ format_ip46_address (u8 * s, va_list * args)
* VPPCOM Utility Functions
*/
+static void
+vcl_send_session_listen (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_listen_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ mq = vcl_worker_ctrl_mq (wrk);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_LISTEN);
+ mp = (session_listen_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->my_client_index;
+ mp->context = s->session_index;
+ mp->wrk_index = wrk->vpp_wrk_index;
+ mp->is_ip4 = s->transport.is_ip4;
+ clib_memcpy_fast (&mp->ip, &s->transport.lcl_ip, sizeof (mp->ip));
+ mp->port = s->transport.lcl_port;
+ mp->proto = s->session_type;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_connect_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ mq = vcl_worker_ctrl_mq (wrk);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT);
+ mp = (session_connect_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->my_client_index;
+ mp->context = s->session_index;
+ mp->wrk_index = wrk->vpp_wrk_index;
+ mp->is_ip4 = s->transport.is_ip4;
+ mp->parent_handle = s->parent_handle;
+ clib_memcpy_fast (&mp->ip, &s->transport.rmt_ip, sizeof (mp->ip));
+ mp->port = s->transport.rmt_port;
+ mp->proto = s->session_type;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+void
+vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_unlisten_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ mq = vcl_worker_ctrl_mq (wrk);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_UNLISTEN);
+ mp = (session_unlisten_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->my_client_index;
+ mp->wrk_index = wrk->vpp_wrk_index;
+ mp->handle = s->vpp_handle;
+ mp->context = wrk->wrk_index;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_disconnect_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ /* Send to thread that owns the session */
+ mq = s->vpp_evt_q;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_DISCONNECT);
+ mp = (session_disconnect_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->my_client_index;
+ mp->handle = s->vpp_handle;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
+vcl_send_app_detach (vcl_worker_t * wrk)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_app_detach_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ mq = vcl_worker_ctrl_mq (wrk);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_DETACH);
+ mp = (session_app_detach_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->my_client_index;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
static void
vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
@@ -827,7 +919,6 @@ vppcom_session_unbind (u32 session_handle)
session_accepted_msg_t *accepted_msg;
vcl_session_t *session = 0;
vcl_session_msg_t *evt;
- u64 vpp_handle;
session = vcl_session_get_w_handle (wrk, session_handle);
if (!session)
@@ -845,14 +936,14 @@ vppcom_session_unbind (u32 session_handle)
}
clib_fifo_free (session->accept_evts_fifo);
- vpp_handle = session->vpp_handle;
- session->vpp_handle = ~0;
- session->session_state = STATE_DISCONNECT;
+ vcl_send_session_unlisten (wrk, session);
VDBG (1, "session %u [0x%llx]: sending unbind!", session->session_index,
- vpp_handle);
+ session->vpp_handle);
vcl_evt (VCL_EVT_UNBIND, session);
- vppcom_send_unbind_sock (wrk, vpp_handle);
+
+ session->vpp_handle = ~0;
+ session->session_state = STATE_DISCONNECT;
return VPPCOM_OK;
}
@@ -894,7 +985,7 @@ vppcom_session_disconnect (u32 session_handle)
{
VDBG (1, "session %u [0x%llx]: sending disconnect...",
session->session_index, vpp_handle);
- vppcom_send_disconnect_session (vpp_handle);
+ vcl_send_session_disconnect (wrk, session);
}
if (session->listener_index != VCL_INVALID_SESSION_INDEX)
@@ -1005,7 +1096,7 @@ vppcom_app_destroy (void)
if (pool_elts (vcm->workers) == 1)
{
- vppcom_app_send_detach ();
+ vcl_send_app_detach (vcl_worker_get_current ());
orig_app_timeout = vcm->cfg.app_timeout;
vcm->cfg.app_timeout = 2.0;
rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
@@ -1220,7 +1311,7 @@ vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
/*
* Send listen request to vpp and wait for reply
*/
- vppcom_send_bind_sock (listen_session);
+ vcl_send_session_listen (wrk, listen_session);
rv = vppcom_wait_for_session_state_change (listen_session->session_index,
STATE_LISTEN,
vcm->cfg.session_timeout);
@@ -1256,7 +1347,8 @@ vppcom_session_tls_add_cert (uint32_t session_handle, char *cert,
* Send listen request to vpp and wait for reply
*/
vppcom_send_application_tls_cert_add (session, cert, cert_len);
-
+ vcm->app_state = STATE_APP_ADDING_TLS_DATA;
+ vcl_wait_for_app_state_change (STATE_APP_READY);
return VPPCOM_OK;
}
@@ -1276,14 +1368,10 @@ vppcom_session_tls_add_key (uint32_t session_handle, char *key,
if (key_len == 0 || key_len == ~0)
return VPPCOM_EBADFD;
- /*
- * Send listen request to vpp and wait for reply
- */
vppcom_send_application_tls_key_add (session, key, key_len);
-
+ vcm->app_state = STATE_APP_ADDING_TLS_DATA;
+ vcl_wait_for_app_state_change (STATE_APP_READY);
return VPPCOM_OK;
-
-
}
static int
@@ -1505,7 +1593,7 @@ vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
/*
* Send connect request and wait for reply from vpp
*/
- vppcom_send_connect_sock (session);
+ vcl_send_session_connect (wrk, session);
rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
vcm->cfg.session_timeout);
@@ -1564,7 +1652,7 @@ vppcom_session_stream_connect (uint32_t session_handle,
/*
* Send connect request and wait for reply from vpp
*/
- vppcom_send_connect_sock (session);
+ vcl_send_session_connect (wrk, session);
rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
vcm->cfg.session_timeout);
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index ab678888a71..d4f3d61ab61 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -951,6 +951,8 @@ vnet_listen (vnet_listen_args_t * a)
application_t *app;
int rv;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
app = application_get_if_valid (a->app_index);
if (!app)
return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
@@ -1001,6 +1003,8 @@ vnet_connect (vnet_connect_args_t * a)
app_worker_t *client_wrk;
application_t *client;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
if (session_endpoint_is_zero (&a->sep))
return VNET_API_ERROR_INVALID_VALUE;
@@ -1038,6 +1042,8 @@ vnet_unlisten (vnet_unlisten_args_t * a)
app_listener_t *al;
application_t *app;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
if (!(app = application_get_if_valid (a->app_index)))
return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 24c6fc3eeaf..9ec1055bbbc 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -284,6 +284,15 @@ int vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a);
uword unformat_application_proto (unformat_input_t * input, va_list * args);
+
+/* Needed while we support both bapi and mq ctrl messages */
+int mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
+ session_handle_t handle, int rv);
+int mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
+ session_t * s, u8 is_fail);
+void mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+ u32 context, int rv);
+
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 6cc1af4d78b..f5f684e37da 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -261,6 +261,25 @@ typedef struct
#undef _
} app_session_t;
+typedef struct session_listen_msg_
+{
+ u32 client_index;
+ u32 context; /* Not needed but keeping it for compatibility with bapi */
+ u32 wrk_index;
+ u32 vrf;
+ u16 port;
+ u8 proto;
+ u8 is_ip4;
+ ip46_address_t ip;
+} __clib_packed session_listen_msg_t;
+
+typedef struct session_listen_uri_msg_
+{
+ u32 client_index;
+ u32 context;
+ u8 uri[56];
+} __clib_packed session_listen_uri_msg_t;
+
typedef struct session_bound_msg_
{
u32 context;
@@ -277,6 +296,14 @@ typedef struct session_bound_msg_
u8 segment_name[128];
} __clib_packed session_bound_msg_t;
+typedef struct session_unlisten_msg_
+{
+ u32 client_index;
+ u32 context;
+ u32 wrk_index;
+ session_handle_t handle;
+} __clib_packed session_unlisten_msg_t;
+
typedef struct session_unlisten_reply_msg_
{
u32 context;
@@ -303,9 +330,27 @@ typedef struct session_accepted_reply_msg_
u64 handle;
} __clib_packed session_accepted_reply_msg_t;
-/* Make sure this is not too large, otherwise it won't fit when dequeued in
- * the session queue node */
-STATIC_ASSERT (sizeof (session_accepted_reply_msg_t) <= 16, "accept reply");
+typedef struct session_connect_msg_
+{
+ u32 client_index;
+ u32 context;
+ u32 wrk_index;
+ u32 vrf;
+ u16 port;
+ u8 proto;
+ u8 is_ip4;
+ ip46_address_t ip;
+ u8 hostname_len;
+ u8 hostname[16];
+ u64 parent_handle;
+} __clib_packed session_connect_msg_t;
+
+typedef struct session_connect_uri_msg_
+{
+ u32 client_index;
+ u32 context;
+ u8 uri[56];
+} __clib_packed session_connect_uri_msg_t;
typedef struct session_connected_msg_
{
@@ -325,6 +370,13 @@ typedef struct session_connected_msg_
transport_endpoint_t lcl;
} __clib_packed session_connected_msg_t;
+typedef struct session_disconnect_msg_
+{
+ u32 client_index;
+ u32 context;
+ session_handle_t handle;
+} __clib_packed session_disconnect_msg_t;
+
typedef struct session_disconnected_msg_
{
u32 client_index;
@@ -375,6 +427,12 @@ typedef struct session_worker_update_reply_msg_
u64 segment_handle;
} __clib_packed session_worker_update_reply_msg_t;
+typedef struct session_app_detach_msg_
+{
+ u32 client_index;
+ u32 context;
+} session_app_detach_msg_t;
+
typedef struct app_session_event_
{
svm_msg_q_msg_t msg;
diff --git a/src/vnet/session/session.api b/src/vnet/session/session.api
index 533f65e85a2..52e050d3978 100644
--- a/src/vnet/session/session.api
+++ b/src/vnet/session/session.api
@@ -13,12 +13,13 @@
* limitations under the License.
*/
-option version = "1.6.0";
+option version = "1.7.0";
/** \brief client->vpp, attach application to session layer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param initial_segment_size - size of the initial shm segment to be
+ @param initial_segment_size - size of the initial shm segment to be
allocated
@param options - segment size, fifo sizes, etc.
@param namespace_id_len - length of the namespace id c-string
@@ -32,17 +33,18 @@ option version = "1.6.0";
u8 namespace_id_len;
u8 namespace_id [64];
};
-
+
/** \brief Application attach reply
+ ### WILL BE DEPRECATED POST 20.01 ###
@param context - sender context, to match reply w/ request
@param retval - return code for the request
- @param app_event_queue_address - vpp event queue address or 0 if this
+ @param app_event_queue_address - vpp event queue address or 0 if this
connection shouldn't send events
@param n_fds - number of fds exchanged
@param fd_flags - set of flags that indicate which fds are to be expected
- over the socket (set only if socket transport available)
+ over the socket (set only if socket transport available)
@param segment_size - size of first shm segment
- @param segment_name_length - length of segment name
+ @param segment_name_length - length of segment name
@param segment_name - name of segment client needs to attach to
@param app_index - index of the newly created app
@param segment_handle - handle for segment
@@ -60,6 +62,52 @@ define application_attach_reply {
u64 segment_handle;
};
+/** \brief Application attach to session layer
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param options - segment size, fifo sizes, etc.
+ @param namespace_id_len - length of the namespace id c-string
+ @param namespace_id - 0 terminted c-string
+*/
+ define app_attach {
+ u32 client_index;
+ u32 context;
+ u64 options[16];
+ u8 namespace_id_len;
+ u8 namespace_id[64];
+ };
+
+ /** \brief Application attach reply
+ @param context - sender context, to match reply w/ request
+ @param retval - return code for the request
+ @param app_mq - app message queue
+ @param vpp_ctrl_mq - vpp message queue for control events that should
+ be handled in main thread, i.e., bind/connect
+ @param vpp_ctrl_mq_thread_index - thread index of the ctrl mq
+ @param app_index - index of the newly created app
+ @param n_fds - number of fds exchanged
+ @param fd_flags - set of flags that indicate which fds are to be expected
+ over the socket (set only if socket transport available)
+ @param segment_size - size of first shm segment
+ @param segment_name_length - length of segment name
+ @param segment_name - name of segment client needs to attach to
+ @param segment_handle - handle for segment
+*/
+define app_attach_reply {
+ u32 context;
+ i32 retval;
+ u64 app_mq;
+ u64 vpp_ctrl_mq;
+ u8 vpp_ctrl_mq_thread;
+ u32 app_index;
+ u8 n_fds;
+ u8 fd_flags;
+ u32 segment_size;
+ u8 segment_name_length;
+ u8 segment_name[128];
+ u64 segment_handle;
+};
+
/** \brief Application add TLS certificate
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@@ -89,6 +137,7 @@ autoreply define application_tls_key_add {
};
/** \brief client->vpp, attach application to session layer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
*/
@@ -96,12 +145,12 @@ autoreply define application_detach {
u32 client_index;
u32 context;
};
-
+
/** \brief vpp->client, please map an additional shared memory segment
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param fd_flags - set of flags that indicate which, if any, fds are
- to be expected over the socket. This is set only if
+ @param fd_flags - set of flags that indicate which, if any, fds are
+ to be expected over the socket. This is set only if
socket transport available
@param segment_size - size of the segment to be mapped
@param segment_name - name of the segment to be mapped
@@ -120,7 +169,7 @@ autoreply define map_another_segment {
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param segment_name - segment name
- @param segment_handle - handle of the segment to be unmapped
+ @param segment_handle - handle of the segment to be unmapped
*/
autoreply define unmap_segment {
u32 client_index;
@@ -129,6 +178,7 @@ autoreply define unmap_segment {
};
/** \brief Bind to a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param accept_cookie - sender accept cookie, to identify this bind flavor
@@ -144,6 +194,7 @@ autoreply define bind_uri {
};
/** \brief Unbind a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param uri - a URI, e.g. "tcp://0.0.0.0/0/80" [ipv4]
@@ -157,12 +208,13 @@ autoreply define unbind_uri {
};
/** \brief Connect to a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param client_queue_address - binary API client queue address. Used by
+ @param client_queue_address - binary API client queue address. Used by
local server when connect was redirected.
@param options - socket options, fifo sizes, etc. passed by vpp to the
- server when redirecting connects
+ server when redirecting connects
@param uri - a URI, e.g. "tcp4://0.0.0.0/0/80"
"tcp6://::/0/80" [ipv6], etc.
*/
@@ -175,6 +227,7 @@ autoreply define connect_uri {
};
/** \brief bidirectional disconnect API
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -187,6 +240,7 @@ define disconnect_session {
};
/** \brief bidirectional disconnect reply API
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -200,13 +254,14 @@ define disconnect_session_reply {
};
/** \brief Bind to an ip:port pair for a given transport protocol
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - index of worker requesting the bind
@param vrf - bind namespace
@param is_ip4 - flag that is 1 if ip address family is IPv4
@param ip - ip address
- @param port - port
+ @param port - port
@param proto - protocol 0 - TCP 1 - UDP
@param options - socket options, fifo sizes, etc.
*/
@@ -222,7 +277,8 @@ autoreply define bind_sock {
u64 options[16];
};
-/** \brief Unbind
+/** \brief Unbind
+ ### WILL BE DEPRECATED POST 20.01 ###s
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - index of worker requesting the bind
@@ -236,16 +292,17 @@ autoreply define unbind_sock {
};
/** \brief Connect to a remote peer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - worker that requests the connect
- @param client_queue_address - client's API queue address. Non-zero when
+ @param client_queue_address - client's API queue address. Non-zero when
used to perform redirects
@param options - socket options, fifo sizes, etc. when doing redirects
@param vrf - connection namespace
@param is_ip4 - flag that is 1 if ip address family is IPv4
@param ip - ip address
- @param port - port
+ @param port - port
@param proto - protocol 0 - TCP 1 - UDP
@param hostname-len - length of hostname
@param hostname - destination's hostname. If present, used by protocols
@@ -269,6 +326,7 @@ autoreply define connect_sock {
};
/** \brief ask app to add a new cut-through registration
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -314,8 +372,8 @@ define app_worker_add_del
@param app_event_queue_address - vpp event queue address of new worker
@param n_fds - number of fds exchanged
@param fd_flags - set of flags that indicate which fds are to be expected
- over the socket (set only if socket transport available)
- @param segment_name_length - length of segment name
+ over the socket (set only if socket transport available)
+ @param segment_name_length - length of segment name
@param segment_name - name of segment client needs to attach to
@param segment_handle - handle for segment
*/
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 45292454e57..c6f9d0a75ae 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -92,10 +92,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
int
session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
{
- /* only event supported for now is disconnect */
- ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
- return session_send_evt_to_thread (s, 0, s->thread_index,
- SESSION_CTRL_EVT_CLOSE);
+ /* only events supported are disconnect and reset */
+ ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
+ || evt_type == SESSION_CTRL_EVT_RESET);
+ return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
}
void
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index de44bed27c9..04fdebed791 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -62,12 +62,19 @@ typedef struct session_tx_context_
session_dgram_hdr_t hdr;
} session_tx_context_t;
+#define SESSION_CTRL_MSG_MAX_SIZE 64
+
typedef struct session_evt_elt
{
clib_llist_anchor_t evt_list;
session_event_t evt;
} session_evt_elt_t;
+typedef struct session_ctrl_evt_data_
+{
+ u8 data[SESSION_CTRL_MSG_MAX_SIZE];
+} session_evt_ctrl_data_t;
+
typedef struct session_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -96,6 +103,9 @@ typedef struct session_worker_
/** Pool of session event list elements */
session_evt_elt_t *event_elts;
+ /** Pool of ctrl events data buffers */
+ session_evt_ctrl_data_t *ctrl_evts_data;
+
/** Head of control events list */
clib_llist_index_t ctrl_head;
@@ -207,6 +217,14 @@ session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt)
pool_elt_at_index (wrk->event_elts, wrk->old_head));
}
+static inline u32
+session_evt_ctrl_data_alloc (session_worker_t * wrk)
+{
+ session_evt_ctrl_data_t *data;
+ pool_get (wrk->ctrl_evts_data, data);
+ return (data - wrk->ctrl_evts_data);
+}
+
static inline session_evt_elt_t *
session_evt_alloc_ctrl (session_worker_t * wrk)
{
@@ -217,6 +235,20 @@ session_evt_alloc_ctrl (session_worker_t * wrk)
return elt;
}
+static inline void *
+session_evt_ctrl_data (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ return (void *) (pool_elt_at_index (wrk->ctrl_evts_data,
+ elt->evt.ctrl_data_index));
+}
+
+static inline void
+session_evt_ctrl_data_free (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ ASSERT (elt->evt.event_type > SESSION_IO_EVT_BUILTIN_TX);
+ pool_put_index (wrk->ctrl_evts_data, elt->evt.ctrl_data_index);
+}
+
static inline session_evt_elt_t *
session_evt_alloc_new (session_worker_t * wrk)
{
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index e3e3bb3c596..8f9ce3f5a1e 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -43,6 +43,7 @@
#define foreach_session_api_msg \
_(MAP_ANOTHER_SEGMENT_REPLY, map_another_segment_reply) \
_(APPLICATION_ATTACH, application_attach) \
+_(APP_ATTACH, app_attach) \
_(APPLICATION_DETACH, application_detach) \
_(BIND_URI, bind_uri) \
_(UNBIND_URI, unbind_uri) \
@@ -298,7 +299,7 @@ mq_send_session_reset_cb (session_t * s)
SESSION_CTRL_EVT_RESET);
}
-static int
+int
mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
session_t * s, u8 is_fail)
{
@@ -378,7 +379,7 @@ done:
return 0;
}
-static int
+int
mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
session_handle_t handle, int rv)
{
@@ -438,13 +439,35 @@ done:
return 0;
}
+void
+mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+ u32 context, int rv)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ session_unlisten_reply_msg_t *ump;
+ svm_msg_q_t *app_mq;
+ session_event_t *evt;
+
+ app_mq = app_wrk->event_queue;
+ if (mq_try_lock_and_alloc_msg (app_mq, msg))
+ return;
+
+ evt = svm_msg_q_msg_data (app_mq, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
+ ump = (session_unlisten_reply_msg_t *) evt->data;
+ ump->context = context;
+ ump->handle = sh;
+ ump->retval = rv;
+ svm_msg_q_add_and_unlock (app_mq, msg);
+}
+
static void
mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
{
clib_warning ("not supported");
}
-
static session_cb_vft_t session_mq_cb_vft = {
.session_accept_callback = mq_send_session_accepted_cb,
.session_disconnect_callback = mq_send_session_disconnected_cb,
@@ -466,6 +489,7 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
REPLY_MACRO (VL_API_SESSION_ENABLE_DISABLE_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
{
@@ -564,6 +588,108 @@ done:
}
static void
+vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
+{
+ int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+ vl_api_app_attach_reply_t *rmp;
+ ssvm_private_t *segp, *evt_q_segment;
+ vnet_app_attach_args_t _a, *a = &_a;
+ u8 fd_flags = 0, ctrl_thread;
+ vl_api_registration_t *reg;
+ svm_msg_q_t *ctrl_mq;
+
+ reg = vl_api_client_index_to_registration (mp->client_index);
+ if (!reg)
+ return;
+
+ if (session_main_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ STATIC_ASSERT (sizeof (u64) * APP_OPTIONS_N_OPTIONS <=
+ sizeof (mp->options),
+ "Out of options, fix api message definition");
+
+ clib_memset (a, 0, sizeof (*a));
+ a->api_client_index = mp->client_index;
+ a->options = mp->options;
+ a->session_cb_vft = &session_mq_cb_vft;
+ if (mp->namespace_id_len > 64)
+ {
+ rv = VNET_API_ERROR_INVALID_VALUE;
+ goto done;
+ }
+
+ if (mp->namespace_id_len)
+ {
+ vec_validate (a->namespace_id, mp->namespace_id_len - 1);
+ clib_memcpy_fast (a->namespace_id, mp->namespace_id,
+ mp->namespace_id_len);
+ }
+
+ if ((rv = vnet_application_attach (a)))
+ {
+ clib_warning ("attach returned: %d", rv);
+ vec_free (a->namespace_id);
+ goto done;
+ }
+ vec_free (a->namespace_id);
+
+ /* Send event queues segment */
+ if ((evt_q_segment = session_main_get_evt_q_segment ()))
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = evt_q_segment->fd;
+ n_fds += 1;
+ }
+ /* Send fifo segment fd if needed */
+ if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
+ {
+ fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
+ fds[n_fds] = a->segment->fd;
+ n_fds += 1;
+ }
+ if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+ {
+ fd_flags |= SESSION_FD_F_MQ_EVENTFD;
+ fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+ n_fds += 1;
+ }
+
+done:
+
+ ctrl_thread = vlib_num_workers ()? 1 : 0;
+ ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
+ /* *INDENT-OFF* */
+ REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
+ if (!rv)
+ {
+ segp = a->segment;
+ rmp->app_index = clib_host_to_net_u32 (a->app_index);
+ rmp->app_mq = pointer_to_uword (a->app_evt_q);
+ rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+ rmp->vpp_ctrl_mq_thread = ctrl_thread;
+ rmp->n_fds = n_fds;
+ rmp->fd_flags = fd_flags;
+ if (vec_len (segp->name))
+ {
+ memcpy (rmp->segment_name, segp->name, vec_len (segp->name));
+ rmp->segment_name_length = vec_len (segp->name);
+ }
+ rmp->segment_size = segp->ssvm_size;
+ rmp->segment_handle = clib_host_to_net_u64 (a->segment_handle);
+ }
+ }));
+ /* *INDENT-ON* */
+
+ if (n_fds)
+ session_send_fds (reg, fds, n_fds);
+}
+
+/* ### WILL BE DEPRECATED POST 20.01 ### */
+static void
vl_api_application_detach_t_handler (vl_api_application_detach_t * mp)
{
vl_api_application_detach_reply_t *rmp;
@@ -589,6 +715,7 @@ done:
REPLY_MACRO (VL_API_APPLICATION_DETACH_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
{
@@ -629,6 +756,7 @@ done:
}
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_unbind_uri_t_handler (vl_api_unbind_uri_t * mp)
{
@@ -660,6 +788,7 @@ done:
REPLY_MACRO (VL_API_UNBIND_URI_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
{
@@ -701,6 +830,7 @@ done:
REPLY_MACRO (VL_API_CONNECT_URI_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
{
@@ -731,6 +861,7 @@ done:
REPLY_MACRO2 (VL_API_DISCONNECT_SESSION_REPLY, rmp->handle = mp->handle);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
@@ -762,6 +893,7 @@ vl_api_map_another_segment_reply_t_handler (vl_api_map_another_segment_reply_t
clib_warning ("not implemented");
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
{
@@ -811,6 +943,7 @@ done:
}
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
{
@@ -839,35 +972,14 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
done:
REPLY_MACRO (VL_API_UNBIND_SOCK_REPLY);
- /*
- * Send reply over msg queue
- */
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_unlisten_reply_msg_t *ump;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
-
- if (!app)
- return;
-
app_wrk = application_get_worker (app, a->wrk_map_index);
if (!app_wrk)
return;
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
- ump = (session_unlisten_reply_msg_t *) evt->data;
- ump->context = mp->context;
- ump->handle = mp->handle;
- ump->retval = rv;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 1d662a20e3c..ad18637a952 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -25,12 +25,202 @@
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
-static void session_mq_accepted_reply_handler (void *data);
+#define app_check_thread_and_barrier(_fn, _arg) \
+ if (!vlib_thread_is_main_w_barrier ()) \
+ { \
+ vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg)); \
+ return; \
+ }
static void
-accepted_notify_cb (void *data, u32 data_len)
+session_mq_listen_handler (void *data)
{
- session_mq_accepted_reply_handler (data);
+ session_listen_msg_t *mp = (session_listen_msg_t *) data;
+ vnet_listen_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_listen_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->sep.is_ip4 = mp->is_ip4;
+ clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+ a->sep.port = mp->port;
+ a->sep.fib_index = mp->vrf;
+ a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
+ a->sep.transport_proto = mp->proto;
+ a->app_index = app->app_index;
+ a->wrk_map_index = mp->wrk_index;
+
+ if ((rv = vnet_listen (a)))
+ clib_warning ("listen returned: %d", rv);
+
+ app_wrk = application_get_worker (app, mp->wrk_index);
+ mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+ return;
+}
+
+static void
+session_mq_listen_uri_handler (void *data)
+{
+ session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
+ vnet_listen_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->uri = (char *) mp->uri;
+ a->app_index = app->app_index;
+ rv = vnet_bind_uri (a);
+
+ app_wrk = application_get_worker (app, 0);
+ mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+}
+
+static void
+session_mq_connect_handler (void *data)
+{
+ session_connect_msg_t *mp = (session_connect_msg_t *) data;
+ vnet_connect_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_connect_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->sep.is_ip4 = mp->is_ip4;
+ clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+ a->sep.port = mp->port;
+ a->sep.transport_proto = mp->proto;
+ a->sep.peer.fib_index = mp->vrf;
+ a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
+ a->sep_ext.parent_handle = mp->parent_handle;
+ if (mp->hostname_len)
+ {
+ vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
+ clib_memcpy_fast (a->sep_ext.hostname, mp->hostname, mp->hostname_len);
+ }
+ a->api_context = mp->context;
+ a->app_index = app->app_index;
+ a->wrk_map_index = mp->wrk_index;
+
+ if ((rv = vnet_connect (a)))
+ {
+ clib_warning ("connect returned: %U", format_vnet_api_errno, rv);
+ app_wrk = application_get_worker (app, mp->wrk_index);
+ mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+ /* is_fail */ 1);
+ }
+
+ vec_free (a->sep_ext.hostname);
+}
+
+static void
+session_mq_connect_uri_handler (void *data)
+{
+ session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
+ vnet_connect_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->uri = (char *) mp->uri;
+ a->api_context = mp->context;
+ a->app_index = app->app_index;
+ if ((rv = vnet_connect_uri (a)))
+ {
+ clib_warning ("connect_uri returned: %d", rv);
+ app_wrk = application_get_worker (app, 0 /* default wrk only */ );
+ mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+ /* is_fail */ 1);
+ }
+}
+
+static void
+session_mq_disconnect_handler (void *data)
+{
+ session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
+ vnet_disconnect_args_t _a, *a = &_a;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ a->app_index = app->app_index;
+ a->handle = mp->handle;
+ vnet_disconnect_session (a);
+}
+
+static void
+app_mq_detach_handler (void *data)
+{
+ session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
+ vnet_app_detach_args_t _a, *a = &_a;
+ application_t *app;
+
+ app_check_thread_and_barrier (app_mq_detach_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ a->app_index = app->app_index;
+ a->api_client_index = mp->client_index;
+ vnet_application_detach (a);
+}
+
+static void
+session_mq_unlisten_handler (void *data)
+{
+ session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
+ vnet_unlisten_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->app_index = app->app_index;
+ a->handle = mp->handle;
+ a->wrk_map_index = mp->wrk_index;
+ if ((rv = vnet_unlisten (a)))
+ clib_warning ("unlisten returned: %d", rv);
+
+ app_wrk = application_get_worker (app, a->wrk_map_index);
+ if (!app_wrk)
+ return;
+
+ mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
}
static void
@@ -56,8 +246,8 @@ session_mq_accepted_reply_handler (void *data)
if (vlib_num_workers () && vlib_get_thread_index () != 0
&& session_thread_from_handle (mp->handle) == 0)
{
- vl_api_rpc_call_main_thread (accepted_notify_cb, data,
- sizeof (session_accepted_reply_msg_t));
+ vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
+ (u8 *) mp, sizeof (*mp));
return;
}
@@ -859,14 +1049,93 @@ session_event_get_session (session_event_t * e, u8 thread_index)
}
always_inline void
-session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
- session_evt_elt_t * elt, u32 thread_index,
- int *n_tx_packets)
+session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ clib_llist_index_t ei;
+ void (*fp) (void *);
+ session_event_t *e;
+ session_t *s;
+
+ ei = clib_llist_entry_index (wrk->event_elts, elt);
+ e = &elt->evt;
+
+ switch (e->event_type)
+ {
+ case SESSION_CTRL_EVT_RPC:
+ fp = e->rpc_args.fp;
+ (*fp) (e->rpc_args.arg);
+ break;
+ case SESSION_CTRL_EVT_CLOSE:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_close (s);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_reset (s);
+ break;
+ case SESSION_CTRL_EVT_LISTEN:
+ session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_LISTEN_URI:
+ session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN:
+ session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT:
+ session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT_URI:
+ session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECT:
+ session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+ session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
+ elt));
+ break;
+ case SESSION_CTRL_EVT_RESET_REPLY:
+ session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_APP_DETACH:
+ app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ default:
+ clib_warning ("unhandled event type %d", e->event_type);
+ }
+
+ /* Regrab elements in case pool moved */
+ elt = pool_elt_at_index (wrk->event_elts, ei);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ {
+ if (e->event_type >= SESSION_CTRL_EVT_BOUND)
+ session_evt_ctrl_data_free (wrk, elt);
+ session_evt_elt_free (wrk, elt);
+ }
+}
+
+always_inline void
+session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
+ session_evt_elt_t * elt, u32 thread_index,
+ int *n_tx_packets)
{
session_main_t *smm = &session_main;
app_worker_t *app_wrk;
clib_llist_index_t ei;
- void (*fp) (void *);
session_event_t *e;
session_t *s;
@@ -896,18 +1165,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
transport_app_rx_evt (session_get_transport_proto (s),
s->connection_index, s->thread_index);
break;
- case SESSION_CTRL_EVT_CLOSE:
- s = session_get_from_handle_if_valid (e->session_handle);
- if (PREDICT_FALSE (!s))
- break;
- session_transport_close (s);
- break;
- case SESSION_CTRL_EVT_RESET:
- s = session_get_from_handle_if_valid (e->session_handle);
- if (PREDICT_FALSE (!s))
- break;
- session_transport_reset (s);
- break;
case SESSION_IO_EVT_BUILTIN_RX:
s = session_event_get_session (e, thread_index);
if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
@@ -922,27 +1179,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
if (PREDICT_TRUE (s != 0))
session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
break;
- case SESSION_CTRL_EVT_RPC:
- fp = e->rpc_args.fp;
- (*fp) (e->rpc_args.arg);
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- session_mq_disconnected_handler (e->data);
- break;
- case SESSION_CTRL_EVT_ACCEPTED_REPLY:
- session_mq_accepted_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_CONNECTED_REPLY:
- break;
- case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
- session_mq_disconnected_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_RESET_REPLY:
- session_mq_reset_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_WORKER_UPDATE:
- session_mq_worker_update_handler (e->data);
- break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}
@@ -953,6 +1189,43 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
session_evt_elt_free (wrk, elt);
}
+/* *INDENT-OFF* */
+static const u32 session_evt_msg_sizes[] = {
+#define _(symc, sym) \
+ [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
+ foreach_session_ctrl_evt
+#undef _
+};
+/* *INDENT-ON* */
+
+always_inline void
+session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
+{
+ session_evt_elt_t *elt;
+
+ if (evt->event_type >= SESSION_CTRL_EVT_RPC)
+ {
+ elt = session_evt_alloc_ctrl (wrk);
+ if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
+ {
+ elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
+ elt->evt.event_type = evt->event_type;
+ clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
+ session_evt_msg_sizes[evt->event_type]);
+ }
+ else
+ {
+ /* Internal control events fit into io events footprint */
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+ }
+ else
+ {
+ elt = session_evt_alloc_new (wrk);
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -990,14 +1263,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
{
svm_msg_q_sub_w_lock (mq, msg);
evt = svm_msg_q_msg_data (mq, msg);
- if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
- elt = session_evt_alloc_ctrl (wrk);
- else
- elt = session_evt_alloc_new (wrk);
- /* Works because reply messages are smaller than a session evt.
- * If we ever need to support bigger messages this needs to be
- * fixed */
- clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ session_evt_add_to_list (wrk, evt);
svm_msg_q_free_msg (mq, msg);
}
svm_msg_q_unlock (mq);
@@ -1012,7 +1278,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_ctrl (wrk, elt);
}));
/* *INDENT-ON* */
@@ -1037,7 +1303,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
continue;
}
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
}));
/* *INDENT-ON* */
@@ -1054,7 +1320,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
ei = clib_llist_entry_index (wrk->event_elts, elt);
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index f9472ba3828..52a79e3beb5 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -290,21 +290,48 @@ typedef enum
SESSION_IO_EVT_BUILTIN_TX,
SESSION_CTRL_EVT_RPC,
SESSION_CTRL_EVT_CLOSE,
+ SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_BOUND,
SESSION_CTRL_EVT_UNLISTEN_REPLY,
SESSION_CTRL_EVT_ACCEPTED,
SESSION_CTRL_EVT_ACCEPTED_REPLY,
SESSION_CTRL_EVT_CONNECTED,
- SESSION_CTRL_EVT_CONNECTED_REPLY,
SESSION_CTRL_EVT_DISCONNECTED,
SESSION_CTRL_EVT_DISCONNECTED_REPLY,
- SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_RESET_REPLY,
SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
+ SESSION_CTRL_EVT_DISCONNECT,
+ SESSION_CTRL_EVT_CONNECT,
+ SESSION_CTRL_EVT_CONNECT_URI,
+ SESSION_CTRL_EVT_LISTEN,
+ SESSION_CTRL_EVT_LISTEN_URI,
+ SESSION_CTRL_EVT_UNLISTEN,
+ SESSION_CTRL_EVT_APP_DETACH,
} session_evt_type_t;
+#define foreach_session_ctrl_evt \
+ _(LISTEN, listen) \
+ _(LISTEN_URI, listen_uri) \
+ _(BOUND, bound) \
+ _(UNLISTEN, unlisten) \
+ _(UNLISTEN_REPLY, unlisten_reply) \
+ _(ACCEPTED, accepted) \
+ _(ACCEPTED_REPLY, accepted_reply) \
+ _(CONNECT, connect) \
+ _(CONNECT_URI, connect_uri) \
+ _(CONNECTED, connected) \
+ _(DISCONNECT, disconnect) \
+ _(DISCONNECTED, disconnected) \
+ _(DISCONNECTED_REPLY, disconnected_reply) \
+ _(RESET_REPLY, reset_reply) \
+ _(REQ_WORKER_UPDATE, req_worker_update) \
+ _(WORKER_UPDATE, worker_update) \
+ _(WORKER_UPDATE_REPLY, worker_update_reply) \
+ _(APP_DETACH, app_detach) \
+
+
/* Deprecated and will be removed. Use types above */
#define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX
#define FIFO_EVENT_APP_TX SESSION_IO_EVT_TX
@@ -334,6 +361,7 @@ typedef struct
u32 session_index;
session_handle_t session_handle;
session_rpc_args_t rpc_args;
+ u32 ctrl_data_index;
struct
{
u8 data[0];