summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/application.c6
-rw-r--r--src/vnet/session/application.h9
-rw-r--r--src/vnet/session/application_interface.h64
-rw-r--r--src/vnet/session/session.api94
-rw-r--r--src/vnet/session/session.c8
-rw-r--r--src/vnet/session/session.h32
-rwxr-xr-xsrc/vnet/session/session_api.c164
-rw-r--r--src/vnet/session/session_node.c372
-rw-r--r--src/vnet/session/session_types.h32
9 files changed, 675 insertions, 106 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index ab678888a71..d4f3d61ab61 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -951,6 +951,8 @@ vnet_listen (vnet_listen_args_t * a)
application_t *app;
int rv;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
app = application_get_if_valid (a->app_index);
if (!app)
return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
@@ -1001,6 +1003,8 @@ vnet_connect (vnet_connect_args_t * a)
app_worker_t *client_wrk;
application_t *client;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
if (session_endpoint_is_zero (&a->sep))
return VNET_API_ERROR_INVALID_VALUE;
@@ -1038,6 +1042,8 @@ vnet_unlisten (vnet_unlisten_args_t * a)
app_listener_t *al;
application_t *app;
+ ASSERT (vlib_thread_is_main_w_barrier ());
+
if (!(app = application_get_if_valid (a->app_index)))
return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 24c6fc3eeaf..9ec1055bbbc 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -284,6 +284,15 @@ int vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a);
uword unformat_application_proto (unformat_input_t * input, va_list * args);
+
+/* Needed while we support both bapi and mq ctrl messages */
+int mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
+ session_handle_t handle, int rv);
+int mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
+ session_t * s, u8 is_fail);
+void mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+ u32 context, int rv);
+
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 6cc1af4d78b..f5f684e37da 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -261,6 +261,25 @@ typedef struct
#undef _
} app_session_t;
+typedef struct session_listen_msg_
+{
+ u32 client_index;
+ u32 context; /* Not needed but keeping it for compatibility with bapi */
+ u32 wrk_index;
+ u32 vrf;
+ u16 port;
+ u8 proto;
+ u8 is_ip4;
+ ip46_address_t ip;
+} __clib_packed session_listen_msg_t;
+
+typedef struct session_listen_uri_msg_
+{
+ u32 client_index;
+ u32 context;
+ u8 uri[56];
+} __clib_packed session_listen_uri_msg_t;
+
typedef struct session_bound_msg_
{
u32 context;
@@ -277,6 +296,14 @@ typedef struct session_bound_msg_
u8 segment_name[128];
} __clib_packed session_bound_msg_t;
+typedef struct session_unlisten_msg_
+{
+ u32 client_index;
+ u32 context;
+ u32 wrk_index;
+ session_handle_t handle;
+} __clib_packed session_unlisten_msg_t;
+
typedef struct session_unlisten_reply_msg_
{
u32 context;
@@ -303,9 +330,27 @@ typedef struct session_accepted_reply_msg_
u64 handle;
} __clib_packed session_accepted_reply_msg_t;
-/* Make sure this is not too large, otherwise it won't fit when dequeued in
- * the session queue node */
-STATIC_ASSERT (sizeof (session_accepted_reply_msg_t) <= 16, "accept reply");
+typedef struct session_connect_msg_
+{
+ u32 client_index;
+ u32 context;
+ u32 wrk_index;
+ u32 vrf;
+ u16 port;
+ u8 proto;
+ u8 is_ip4;
+ ip46_address_t ip;
+ u8 hostname_len;
+ u8 hostname[16];
+ u64 parent_handle;
+} __clib_packed session_connect_msg_t;
+
+typedef struct session_connect_uri_msg_
+{
+ u32 client_index;
+ u32 context;
+ u8 uri[56];
+} __clib_packed session_connect_uri_msg_t;
typedef struct session_connected_msg_
{
@@ -325,6 +370,13 @@ typedef struct session_connected_msg_
transport_endpoint_t lcl;
} __clib_packed session_connected_msg_t;
+typedef struct session_disconnect_msg_
+{
+ u32 client_index;
+ u32 context;
+ session_handle_t handle;
+} __clib_packed session_disconnect_msg_t;
+
typedef struct session_disconnected_msg_
{
u32 client_index;
@@ -375,6 +427,12 @@ typedef struct session_worker_update_reply_msg_
u64 segment_handle;
} __clib_packed session_worker_update_reply_msg_t;
+typedef struct session_app_detach_msg_
+{
+ u32 client_index;
+ u32 context;
+} session_app_detach_msg_t;
+
typedef struct app_session_event_
{
svm_msg_q_msg_t msg;
diff --git a/src/vnet/session/session.api b/src/vnet/session/session.api
index 533f65e85a2..52e050d3978 100644
--- a/src/vnet/session/session.api
+++ b/src/vnet/session/session.api
@@ -13,12 +13,13 @@
* limitations under the License.
*/
-option version = "1.6.0";
+option version = "1.7.0";
/** \brief client->vpp, attach application to session layer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param initial_segment_size - size of the initial shm segment to be
+ @param initial_segment_size - size of the initial shm segment to be
allocated
@param options - segment size, fifo sizes, etc.
@param namespace_id_len - length of the namespace id c-string
@@ -32,17 +33,18 @@ option version = "1.6.0";
u8 namespace_id_len;
u8 namespace_id [64];
};
-
+
/** \brief Application attach reply
+ ### WILL BE DEPRECATED POST 20.01 ###
@param context - sender context, to match reply w/ request
@param retval - return code for the request
- @param app_event_queue_address - vpp event queue address or 0 if this
+ @param app_event_queue_address - vpp event queue address or 0 if this
connection shouldn't send events
@param n_fds - number of fds exchanged
@param fd_flags - set of flags that indicate which fds are to be expected
- over the socket (set only if socket transport available)
+ over the socket (set only if socket transport available)
@param segment_size - size of first shm segment
- @param segment_name_length - length of segment name
+ @param segment_name_length - length of segment name
@param segment_name - name of segment client needs to attach to
@param app_index - index of the newly created app
@param segment_handle - handle for segment
@@ -60,6 +62,52 @@ define application_attach_reply {
u64 segment_handle;
};
+/** \brief Application attach to session layer
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param options - segment size, fifo sizes, etc.
+ @param namespace_id_len - length of the namespace id c-string
+ @param namespace_id - 0 terminted c-string
+*/
+ define app_attach {
+ u32 client_index;
+ u32 context;
+ u64 options[16];
+ u8 namespace_id_len;
+ u8 namespace_id[64];
+ };
+
+ /** \brief Application attach reply
+ @param context - sender context, to match reply w/ request
+ @param retval - return code for the request
+ @param app_mq - app message queue
+ @param vpp_ctrl_mq - vpp message queue for control events that should
+ be handled in main thread, i.e., bind/connect
+ @param vpp_ctrl_mq_thread_index - thread index of the ctrl mq
+ @param app_index - index of the newly created app
+ @param n_fds - number of fds exchanged
+ @param fd_flags - set of flags that indicate which fds are to be expected
+ over the socket (set only if socket transport available)
+ @param segment_size - size of first shm segment
+ @param segment_name_length - length of segment name
+ @param segment_name - name of segment client needs to attach to
+ @param segment_handle - handle for segment
+*/
+define app_attach_reply {
+ u32 context;
+ i32 retval;
+ u64 app_mq;
+ u64 vpp_ctrl_mq;
+ u8 vpp_ctrl_mq_thread;
+ u32 app_index;
+ u8 n_fds;
+ u8 fd_flags;
+ u32 segment_size;
+ u8 segment_name_length;
+ u8 segment_name[128];
+ u64 segment_handle;
+};
+
/** \brief Application add TLS certificate
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@@ -89,6 +137,7 @@ autoreply define application_tls_key_add {
};
/** \brief client->vpp, attach application to session layer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
*/
@@ -96,12 +145,12 @@ autoreply define application_detach {
u32 client_index;
u32 context;
};
-
+
/** \brief vpp->client, please map an additional shared memory segment
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param fd_flags - set of flags that indicate which, if any, fds are
- to be expected over the socket. This is set only if
+ @param fd_flags - set of flags that indicate which, if any, fds are
+ to be expected over the socket. This is set only if
socket transport available
@param segment_size - size of the segment to be mapped
@param segment_name - name of the segment to be mapped
@@ -120,7 +169,7 @@ autoreply define map_another_segment {
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param segment_name - segment name
- @param segment_handle - handle of the segment to be unmapped
+ @param segment_handle - handle of the segment to be unmapped
*/
autoreply define unmap_segment {
u32 client_index;
@@ -129,6 +178,7 @@ autoreply define unmap_segment {
};
/** \brief Bind to a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param accept_cookie - sender accept cookie, to identify this bind flavor
@@ -144,6 +194,7 @@ autoreply define bind_uri {
};
/** \brief Unbind a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param uri - a URI, e.g. "tcp://0.0.0.0/0/80" [ipv4]
@@ -157,12 +208,13 @@ autoreply define unbind_uri {
};
/** \brief Connect to a given URI
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
- @param client_queue_address - binary API client queue address. Used by
+ @param client_queue_address - binary API client queue address. Used by
local server when connect was redirected.
@param options - socket options, fifo sizes, etc. passed by vpp to the
- server when redirecting connects
+ server when redirecting connects
@param uri - a URI, e.g. "tcp4://0.0.0.0/0/80"
"tcp6://::/0/80" [ipv6], etc.
*/
@@ -175,6 +227,7 @@ autoreply define connect_uri {
};
/** \brief bidirectional disconnect API
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -187,6 +240,7 @@ define disconnect_session {
};
/** \brief bidirectional disconnect reply API
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -200,13 +254,14 @@ define disconnect_session_reply {
};
/** \brief Bind to an ip:port pair for a given transport protocol
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - index of worker requesting the bind
@param vrf - bind namespace
@param is_ip4 - flag that is 1 if ip address family is IPv4
@param ip - ip address
- @param port - port
+ @param port - port
@param proto - protocol 0 - TCP 1 - UDP
@param options - socket options, fifo sizes, etc.
*/
@@ -222,7 +277,8 @@ autoreply define bind_sock {
u64 options[16];
};
-/** \brief Unbind
+/** \brief Unbind
+ ### WILL BE DEPRECATED POST 20.01 ###s
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - index of worker requesting the bind
@@ -236,16 +292,17 @@ autoreply define unbind_sock {
};
/** \brief Connect to a remote peer
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param wrk_index - worker that requests the connect
- @param client_queue_address - client's API queue address. Non-zero when
+ @param client_queue_address - client's API queue address. Non-zero when
used to perform redirects
@param options - socket options, fifo sizes, etc. when doing redirects
@param vrf - connection namespace
@param is_ip4 - flag that is 1 if ip address family is IPv4
@param ip - ip address
- @param port - port
+ @param port - port
@param proto - protocol 0 - TCP 1 - UDP
@param hostname-len - length of hostname
@param hostname - destination's hostname. If present, used by protocols
@@ -269,6 +326,7 @@ autoreply define connect_sock {
};
/** \brief ask app to add a new cut-through registration
+ ### WILL BE DEPRECATED POST 20.01 ###
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
@@ -314,8 +372,8 @@ define app_worker_add_del
@param app_event_queue_address - vpp event queue address of new worker
@param n_fds - number of fds exchanged
@param fd_flags - set of flags that indicate which fds are to be expected
- over the socket (set only if socket transport available)
- @param segment_name_length - length of segment name
+ over the socket (set only if socket transport available)
+ @param segment_name_length - length of segment name
@param segment_name - name of segment client needs to attach to
@param segment_handle - handle for segment
*/
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 45292454e57..c6f9d0a75ae 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -92,10 +92,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
int
session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
{
- /* only event supported for now is disconnect */
- ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE);
- return session_send_evt_to_thread (s, 0, s->thread_index,
- SESSION_CTRL_EVT_CLOSE);
+ /* only events supported are disconnect and reset */
+ ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
+ || evt_type == SESSION_CTRL_EVT_RESET);
+ return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
}
void
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index de44bed27c9..04fdebed791 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -62,12 +62,19 @@ typedef struct session_tx_context_
session_dgram_hdr_t hdr;
} session_tx_context_t;
+#define SESSION_CTRL_MSG_MAX_SIZE 64
+
typedef struct session_evt_elt
{
clib_llist_anchor_t evt_list;
session_event_t evt;
} session_evt_elt_t;
+typedef struct session_ctrl_evt_data_
+{
+ u8 data[SESSION_CTRL_MSG_MAX_SIZE];
+} session_evt_ctrl_data_t;
+
typedef struct session_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -96,6 +103,9 @@ typedef struct session_worker_
/** Pool of session event list elements */
session_evt_elt_t *event_elts;
+ /** Pool of ctrl events data buffers */
+ session_evt_ctrl_data_t *ctrl_evts_data;
+
/** Head of control events list */
clib_llist_index_t ctrl_head;
@@ -207,6 +217,14 @@ session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt)
pool_elt_at_index (wrk->event_elts, wrk->old_head));
}
+static inline u32
+session_evt_ctrl_data_alloc (session_worker_t * wrk)
+{
+ session_evt_ctrl_data_t *data;
+ pool_get (wrk->ctrl_evts_data, data);
+ return (data - wrk->ctrl_evts_data);
+}
+
static inline session_evt_elt_t *
session_evt_alloc_ctrl (session_worker_t * wrk)
{
@@ -217,6 +235,20 @@ session_evt_alloc_ctrl (session_worker_t * wrk)
return elt;
}
+static inline void *
+session_evt_ctrl_data (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ return (void *) (pool_elt_at_index (wrk->ctrl_evts_data,
+ elt->evt.ctrl_data_index));
+}
+
+static inline void
+session_evt_ctrl_data_free (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ ASSERT (elt->evt.event_type > SESSION_IO_EVT_BUILTIN_TX);
+ pool_put_index (wrk->ctrl_evts_data, elt->evt.ctrl_data_index);
+}
+
static inline session_evt_elt_t *
session_evt_alloc_new (session_worker_t * wrk)
{
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index e3e3bb3c596..8f9ce3f5a1e 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -43,6 +43,7 @@
#define foreach_session_api_msg \
_(MAP_ANOTHER_SEGMENT_REPLY, map_another_segment_reply) \
_(APPLICATION_ATTACH, application_attach) \
+_(APP_ATTACH, app_attach) \
_(APPLICATION_DETACH, application_detach) \
_(BIND_URI, bind_uri) \
_(UNBIND_URI, unbind_uri) \
@@ -298,7 +299,7 @@ mq_send_session_reset_cb (session_t * s)
SESSION_CTRL_EVT_RESET);
}
-static int
+int
mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
session_t * s, u8 is_fail)
{
@@ -378,7 +379,7 @@ done:
return 0;
}
-static int
+int
mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
session_handle_t handle, int rv)
{
@@ -438,13 +439,35 @@ done:
return 0;
}
+void
+mq_send_unlisten_reply (app_worker_t * app_wrk, session_handle_t sh,
+ u32 context, int rv)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ session_unlisten_reply_msg_t *ump;
+ svm_msg_q_t *app_mq;
+ session_event_t *evt;
+
+ app_mq = app_wrk->event_queue;
+ if (mq_try_lock_and_alloc_msg (app_mq, msg))
+ return;
+
+ evt = svm_msg_q_msg_data (app_mq, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
+ ump = (session_unlisten_reply_msg_t *) evt->data;
+ ump->context = context;
+ ump->handle = sh;
+ ump->retval = rv;
+ svm_msg_q_add_and_unlock (app_mq, msg);
+}
+
static void
mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
{
clib_warning ("not supported");
}
-
static session_cb_vft_t session_mq_cb_vft = {
.session_accept_callback = mq_send_session_accepted_cb,
.session_disconnect_callback = mq_send_session_disconnected_cb,
@@ -466,6 +489,7 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
REPLY_MACRO (VL_API_SESSION_ENABLE_DISABLE_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
{
@@ -564,6 +588,108 @@ done:
}
static void
+vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
+{
+ int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
+ vl_api_app_attach_reply_t *rmp;
+ ssvm_private_t *segp, *evt_q_segment;
+ vnet_app_attach_args_t _a, *a = &_a;
+ u8 fd_flags = 0, ctrl_thread;
+ vl_api_registration_t *reg;
+ svm_msg_q_t *ctrl_mq;
+
+ reg = vl_api_client_index_to_registration (mp->client_index);
+ if (!reg)
+ return;
+
+ if (session_main_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ STATIC_ASSERT (sizeof (u64) * APP_OPTIONS_N_OPTIONS <=
+ sizeof (mp->options),
+ "Out of options, fix api message definition");
+
+ clib_memset (a, 0, sizeof (*a));
+ a->api_client_index = mp->client_index;
+ a->options = mp->options;
+ a->session_cb_vft = &session_mq_cb_vft;
+ if (mp->namespace_id_len > 64)
+ {
+ rv = VNET_API_ERROR_INVALID_VALUE;
+ goto done;
+ }
+
+ if (mp->namespace_id_len)
+ {
+ vec_validate (a->namespace_id, mp->namespace_id_len - 1);
+ clib_memcpy_fast (a->namespace_id, mp->namespace_id,
+ mp->namespace_id_len);
+ }
+
+ if ((rv = vnet_application_attach (a)))
+ {
+ clib_warning ("attach returned: %d", rv);
+ vec_free (a->namespace_id);
+ goto done;
+ }
+ vec_free (a->namespace_id);
+
+ /* Send event queues segment */
+ if ((evt_q_segment = session_main_get_evt_q_segment ()))
+ {
+ fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
+ fds[n_fds] = evt_q_segment->fd;
+ n_fds += 1;
+ }
+ /* Send fifo segment fd if needed */
+ if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
+ {
+ fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
+ fds[n_fds] = a->segment->fd;
+ n_fds += 1;
+ }
+ if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+ {
+ fd_flags |= SESSION_FD_F_MQ_EVENTFD;
+ fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+ n_fds += 1;
+ }
+
+done:
+
+ ctrl_thread = vlib_num_workers ()? 1 : 0;
+ ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
+ /* *INDENT-OFF* */
+ REPLY_MACRO2 (VL_API_APP_ATTACH_REPLY, ({
+ if (!rv)
+ {
+ segp = a->segment;
+ rmp->app_index = clib_host_to_net_u32 (a->app_index);
+ rmp->app_mq = pointer_to_uword (a->app_evt_q);
+ rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+ rmp->vpp_ctrl_mq_thread = ctrl_thread;
+ rmp->n_fds = n_fds;
+ rmp->fd_flags = fd_flags;
+ if (vec_len (segp->name))
+ {
+ memcpy (rmp->segment_name, segp->name, vec_len (segp->name));
+ rmp->segment_name_length = vec_len (segp->name);
+ }
+ rmp->segment_size = segp->ssvm_size;
+ rmp->segment_handle = clib_host_to_net_u64 (a->segment_handle);
+ }
+ }));
+ /* *INDENT-ON* */
+
+ if (n_fds)
+ session_send_fds (reg, fds, n_fds);
+}
+
+/* ### WILL BE DEPRECATED POST 20.01 ### */
+static void
vl_api_application_detach_t_handler (vl_api_application_detach_t * mp)
{
vl_api_application_detach_reply_t *rmp;
@@ -589,6 +715,7 @@ done:
REPLY_MACRO (VL_API_APPLICATION_DETACH_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
{
@@ -629,6 +756,7 @@ done:
}
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_unbind_uri_t_handler (vl_api_unbind_uri_t * mp)
{
@@ -660,6 +788,7 @@ done:
REPLY_MACRO (VL_API_UNBIND_URI_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
{
@@ -701,6 +830,7 @@ done:
REPLY_MACRO (VL_API_CONNECT_URI_REPLY);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
{
@@ -731,6 +861,7 @@ done:
REPLY_MACRO2 (VL_API_DISCONNECT_SESSION_REPLY, rmp->handle = mp->handle);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
@@ -762,6 +893,7 @@ vl_api_map_another_segment_reply_t_handler (vl_api_map_another_segment_reply_t
clib_warning ("not implemented");
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
{
@@ -811,6 +943,7 @@ done:
}
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
{
@@ -839,35 +972,14 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
done:
REPLY_MACRO (VL_API_UNBIND_SOCK_REPLY);
- /*
- * Send reply over msg queue
- */
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_unlisten_reply_msg_t *ump;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
-
- if (!app)
- return;
-
app_wrk = application_get_worker (app, a->wrk_map_index);
if (!app_wrk)
return;
- app_mq = app_wrk->event_queue;
- if (mq_try_lock_and_alloc_msg (app_mq, msg))
- return;
-
- evt = svm_msg_q_msg_data (app_mq, msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
- ump = (session_unlisten_reply_msg_t *) evt->data;
- ump->context = mp->context;
- ump->handle = mp->handle;
- ump->retval = rv;
- svm_msg_q_add_and_unlock (app_mq, msg);
+ mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
}
+/* ### WILL BE DEPRECATED POST 20.01 ### */
static void
vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 1d662a20e3c..ad18637a952 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -25,12 +25,202 @@
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
-static void session_mq_accepted_reply_handler (void *data);
+#define app_check_thread_and_barrier(_fn, _arg) \
+ if (!vlib_thread_is_main_w_barrier ()) \
+ { \
+ vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg)); \
+ return; \
+ }
static void
-accepted_notify_cb (void *data, u32 data_len)
+session_mq_listen_handler (void *data)
{
- session_mq_accepted_reply_handler (data);
+ session_listen_msg_t *mp = (session_listen_msg_t *) data;
+ vnet_listen_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_listen_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->sep.is_ip4 = mp->is_ip4;
+ clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+ a->sep.port = mp->port;
+ a->sep.fib_index = mp->vrf;
+ a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
+ a->sep.transport_proto = mp->proto;
+ a->app_index = app->app_index;
+ a->wrk_map_index = mp->wrk_index;
+
+ if ((rv = vnet_listen (a)))
+ clib_warning ("listen returned: %d", rv);
+
+ app_wrk = application_get_worker (app, mp->wrk_index);
+ mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+ return;
+}
+
+static void
+session_mq_listen_uri_handler (void *data)
+{
+ session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
+ vnet_listen_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->uri = (char *) mp->uri;
+ a->app_index = app->app_index;
+ rv = vnet_bind_uri (a);
+
+ app_wrk = application_get_worker (app, 0);
+ mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+}
+
+static void
+session_mq_connect_handler (void *data)
+{
+ session_connect_msg_t *mp = (session_connect_msg_t *) data;
+ vnet_connect_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_connect_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->sep.is_ip4 = mp->is_ip4;
+ clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
+ a->sep.port = mp->port;
+ a->sep.transport_proto = mp->proto;
+ a->sep.peer.fib_index = mp->vrf;
+ a->sep.peer.sw_if_index = ENDPOINT_INVALID_INDEX;
+ a->sep_ext.parent_handle = mp->parent_handle;
+ if (mp->hostname_len)
+ {
+ vec_validate (a->sep_ext.hostname, mp->hostname_len - 1);
+ clib_memcpy_fast (a->sep_ext.hostname, mp->hostname, mp->hostname_len);
+ }
+ a->api_context = mp->context;
+ a->app_index = app->app_index;
+ a->wrk_map_index = mp->wrk_index;
+
+ if ((rv = vnet_connect (a)))
+ {
+ clib_warning ("connect returned: %U", format_vnet_api_errno, rv);
+ app_wrk = application_get_worker (app, mp->wrk_index);
+ mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+ /* is_fail */ 1);
+ }
+
+ vec_free (a->sep_ext.hostname);
+}
+
+static void
+session_mq_connect_uri_handler (void *data)
+{
+ session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
+ vnet_connect_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->uri = (char *) mp->uri;
+ a->api_context = mp->context;
+ a->app_index = app->app_index;
+ if ((rv = vnet_connect_uri (a)))
+ {
+ clib_warning ("connect_uri returned: %d", rv);
+ app_wrk = application_get_worker (app, 0 /* default wrk only */ );
+ mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0,
+ /* is_fail */ 1);
+ }
+}
+
+static void
+session_mq_disconnect_handler (void *data)
+{
+ session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
+ vnet_disconnect_args_t _a, *a = &_a;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ a->app_index = app->app_index;
+ a->handle = mp->handle;
+ vnet_disconnect_session (a);
+}
+
+static void
+app_mq_detach_handler (void *data)
+{
+ session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
+ vnet_app_detach_args_t _a, *a = &_a;
+ application_t *app;
+
+ app_check_thread_and_barrier (app_mq_detach_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ a->app_index = app->app_index;
+ a->api_client_index = mp->client_index;
+ vnet_application_detach (a);
+}
+
+static void
+session_mq_unlisten_handler (void *data)
+{
+ session_unlisten_msg_t *mp = (session_unlisten_msg_t *) data;
+ vnet_unlisten_args_t _a, *a = &_a;
+ app_worker_t *app_wrk;
+ application_t *app;
+ int rv;
+
+ app_check_thread_and_barrier (session_mq_unlisten_handler, mp);
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->app_index = app->app_index;
+ a->handle = mp->handle;
+ a->wrk_map_index = mp->wrk_index;
+ if ((rv = vnet_unlisten (a)))
+ clib_warning ("unlisten returned: %d", rv);
+
+ app_wrk = application_get_worker (app, a->wrk_map_index);
+ if (!app_wrk)
+ return;
+
+ mq_send_unlisten_reply (app_wrk, mp->handle, mp->context, rv);
}
static void
@@ -56,8 +246,8 @@ session_mq_accepted_reply_handler (void *data)
if (vlib_num_workers () && vlib_get_thread_index () != 0
&& session_thread_from_handle (mp->handle) == 0)
{
- vl_api_rpc_call_main_thread (accepted_notify_cb, data,
- sizeof (session_accepted_reply_msg_t));
+ vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
+ (u8 *) mp, sizeof (*mp));
return;
}
@@ -859,14 +1049,93 @@ session_event_get_session (session_event_t * e, u8 thread_index)
}
always_inline void
-session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
- session_evt_elt_t * elt, u32 thread_index,
- int *n_tx_packets)
+session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ clib_llist_index_t ei;
+ void (*fp) (void *);
+ session_event_t *e;
+ session_t *s;
+
+ ei = clib_llist_entry_index (wrk->event_elts, elt);
+ e = &elt->evt;
+
+ switch (e->event_type)
+ {
+ case SESSION_CTRL_EVT_RPC:
+ fp = e->rpc_args.fp;
+ (*fp) (e->rpc_args.arg);
+ break;
+ case SESSION_CTRL_EVT_CLOSE:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_close (s);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_reset (s);
+ break;
+ case SESSION_CTRL_EVT_LISTEN:
+ session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_LISTEN_URI:
+ session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN:
+ session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT:
+ session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT_URI:
+ session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECT:
+ session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+ session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
+ elt));
+ break;
+ case SESSION_CTRL_EVT_RESET_REPLY:
+ session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_APP_DETACH:
+ app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ default:
+ clib_warning ("unhandled event type %d", e->event_type);
+ }
+
+ /* Regrab elements in case pool moved */
+ elt = pool_elt_at_index (wrk->event_elts, ei);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ {
+ if (e->event_type >= SESSION_CTRL_EVT_BOUND)
+ session_evt_ctrl_data_free (wrk, elt);
+ session_evt_elt_free (wrk, elt);
+ }
+}
+
+always_inline void
+session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
+ session_evt_elt_t * elt, u32 thread_index,
+ int *n_tx_packets)
{
session_main_t *smm = &session_main;
app_worker_t *app_wrk;
clib_llist_index_t ei;
- void (*fp) (void *);
session_event_t *e;
session_t *s;
@@ -896,18 +1165,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
transport_app_rx_evt (session_get_transport_proto (s),
s->connection_index, s->thread_index);
break;
- case SESSION_CTRL_EVT_CLOSE:
- s = session_get_from_handle_if_valid (e->session_handle);
- if (PREDICT_FALSE (!s))
- break;
- session_transport_close (s);
- break;
- case SESSION_CTRL_EVT_RESET:
- s = session_get_from_handle_if_valid (e->session_handle);
- if (PREDICT_FALSE (!s))
- break;
- session_transport_reset (s);
- break;
case SESSION_IO_EVT_BUILTIN_RX:
s = session_event_get_session (e, thread_index);
if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
@@ -922,27 +1179,6 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
if (PREDICT_TRUE (s != 0))
session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
break;
- case SESSION_CTRL_EVT_RPC:
- fp = e->rpc_args.fp;
- (*fp) (e->rpc_args.arg);
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- session_mq_disconnected_handler (e->data);
- break;
- case SESSION_CTRL_EVT_ACCEPTED_REPLY:
- session_mq_accepted_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_CONNECTED_REPLY:
- break;
- case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
- session_mq_disconnected_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_RESET_REPLY:
- session_mq_reset_reply_handler (e->data);
- break;
- case SESSION_CTRL_EVT_WORKER_UPDATE:
- session_mq_worker_update_handler (e->data);
- break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}
@@ -953,6 +1189,43 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node,
session_evt_elt_free (wrk, elt);
}
+/* *INDENT-OFF* */
+static const u32 session_evt_msg_sizes[] = {
+#define _(symc, sym) \
+ [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
+ foreach_session_ctrl_evt
+#undef _
+};
+/* *INDENT-ON* */
+
+always_inline void
+session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
+{
+ session_evt_elt_t *elt;
+
+ if (evt->event_type >= SESSION_CTRL_EVT_RPC)
+ {
+ elt = session_evt_alloc_ctrl (wrk);
+ if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
+ {
+ elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
+ elt->evt.event_type = evt->event_type;
+ clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
+ session_evt_msg_sizes[evt->event_type]);
+ }
+ else
+ {
+ /* Internal control events fit into io events footprint */
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+ }
+ else
+ {
+ elt = session_evt_alloc_new (wrk);
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -990,14 +1263,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
{
svm_msg_q_sub_w_lock (mq, msg);
evt = svm_msg_q_msg_data (mq, msg);
- if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
- elt = session_evt_alloc_ctrl (wrk);
- else
- elt = session_evt_alloc_new (wrk);
- /* Works because reply messages are smaller than a session evt.
- * If we ever need to support bigger messages this needs to be
- * fixed */
- clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ session_evt_add_to_list (wrk, evt);
svm_msg_q_free_msg (mq, msg);
}
svm_msg_q_unlock (mq);
@@ -1012,7 +1278,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_ctrl (wrk, elt);
}));
/* *INDENT-ON* */
@@ -1037,7 +1303,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
continue;
}
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
}));
/* *INDENT-ON* */
@@ -1054,7 +1320,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
ei = clib_llist_entry_index (wrk->event_elts, elt);
- session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+ session_event_dispatch_io (wrk, node, elt, thread_index, &n_tx_packets);
old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index f9472ba3828..52a79e3beb5 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -290,21 +290,48 @@ typedef enum
SESSION_IO_EVT_BUILTIN_TX,
SESSION_CTRL_EVT_RPC,
SESSION_CTRL_EVT_CLOSE,
+ SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_BOUND,
SESSION_CTRL_EVT_UNLISTEN_REPLY,
SESSION_CTRL_EVT_ACCEPTED,
SESSION_CTRL_EVT_ACCEPTED_REPLY,
SESSION_CTRL_EVT_CONNECTED,
- SESSION_CTRL_EVT_CONNECTED_REPLY,
SESSION_CTRL_EVT_DISCONNECTED,
SESSION_CTRL_EVT_DISCONNECTED_REPLY,
- SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_RESET_REPLY,
SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
+ SESSION_CTRL_EVT_DISCONNECT,
+ SESSION_CTRL_EVT_CONNECT,
+ SESSION_CTRL_EVT_CONNECT_URI,
+ SESSION_CTRL_EVT_LISTEN,
+ SESSION_CTRL_EVT_LISTEN_URI,
+ SESSION_CTRL_EVT_UNLISTEN,
+ SESSION_CTRL_EVT_APP_DETACH,
} session_evt_type_t;
+#define foreach_session_ctrl_evt \
+ _(LISTEN, listen) \
+ _(LISTEN_URI, listen_uri) \
+ _(BOUND, bound) \
+ _(UNLISTEN, unlisten) \
+ _(UNLISTEN_REPLY, unlisten_reply) \
+ _(ACCEPTED, accepted) \
+ _(ACCEPTED_REPLY, accepted_reply) \
+ _(CONNECT, connect) \
+ _(CONNECT_URI, connect_uri) \
+ _(CONNECTED, connected) \
+ _(DISCONNECT, disconnect) \
+ _(DISCONNECTED, disconnected) \
+ _(DISCONNECTED_REPLY, disconnected_reply) \
+ _(RESET_REPLY, reset_reply) \
+ _(REQ_WORKER_UPDATE, req_worker_update) \
+ _(WORKER_UPDATE, worker_update) \
+ _(WORKER_UPDATE_REPLY, worker_update_reply) \
+ _(APP_DETACH, app_detach) \
+
+
/* Deprecated and will be removed. Use types above */
#define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX
#define FIFO_EVENT_APP_TX SESSION_IO_EVT_TX
@@ -334,6 +361,7 @@ typedef struct
u32 session_index;
session_handle_t session_handle;
session_rpc_args_t rpc_args;
+ u32 ctrl_data_index;
struct
{
u8 data[0];