diff options
author | Florin Coras <fcoras@cisco.com> | 2018-02-08 15:10:09 -0800 |
---|---|---|
committer | Florin Coras <fcoras@cisco.com> | 2018-02-14 00:54:43 -0800 |
commit | f8f516a8b0ccab2f5d9796f90419bf2661c750af (patch) | |
tree | f02f6c01ed1bf33aeb4ebb5714af470e537f87c2 /src/vnet/session/application.c | |
parent | 7758bf68a03a32f17c07154172157f5bdf30e684 (diff) |
session: support local sessions and deprecate redirects
Memfd backed shared memory segments can only be negotiated over sockets.
For such scenarios, the existing redirect mechanism that establishes
cut-through sessions does not work anymore as the two peer application
do not share such a socket.
This patch adds support for local sessions, as opposed to sessions
backed by a transport connection, in a way that is almost transparent to
the two applications by reusing the existing binary api messages.
Moreover, all segment allocations are now entirely done through the
segment manager valloc, so segment overlaps due to independent
allocations previously required for redirects are completely avoided.
The one notable characteristic of local sessions (cut-through from app
perspective) notification messages is that they carry pointers to two
event queues, one for each app peer, instead of one. For
transport-backed sessions one of the queues can be inferred but for
local session they cannot.
Change-Id: Ia443fb63e2d9d8e43490275062a708f039038175
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session/application.c')
-rw-r--r-- | src/vnet/session/application.c | 654 |
1 files changed, 595 insertions, 59 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 71fc93f960f..9020d1c2864 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -28,11 +28,6 @@ static application_t *app_pool; */ static uword *app_by_api_client_index; -/** - * Default application event queue size - */ -static u32 default_app_evt_queue_size = 128; - static u8 * app_get_name_from_reg_index (application_t * app) { @@ -138,6 +133,7 @@ application_new () app->index = application_get_index (app); app->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; app->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; + app->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; if (CLIB_DEBUG > 1) clib_warning ("[%d] New app (%d)", getpid (), app->index); return app; @@ -147,8 +143,8 @@ void application_del (application_t * app) { vnet_unbind_args_t _a, *a = &_a; - segment_manager_t *sm; u64 handle, *handles = 0; + segment_manager_t *sm; u32 index; int i; @@ -207,6 +203,12 @@ application_del (application_t * app) segment_manager_del (sm); } } + + /* + * Local connections cleanup + */ + application_local_sessions_del (app); + application_table_del (app); pool_put (app_pool, app); } @@ -255,8 +257,8 @@ int application_init (application_t * app, u32 api_client_index, u64 * options, session_cb_vft_t * cb_fns) { - u32 app_evt_queue_size, first_seg_size, prealloc_fifo_pairs; ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD; + u32 first_seg_size, prealloc_fifo_pairs; segment_manager_properties_t *props; vl_api_registration_t *reg; segment_manager_t *sm; @@ -298,15 +300,14 @@ application_init (application_t * app, u32 api_client_index, u64 * options, props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE]; if (options[APP_OPTIONS_TX_FIFO_SIZE]) props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE]; + if (options[APP_OPTIONS_EVT_QUEUE_SIZE]) + props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE]; props->segment_type = seg_type; - app_evt_queue_size = options[APP_OPTIONS_EVT_QUEUE_SIZE] > 0 ? - options[APP_OPTIONS_EVT_QUEUE_SIZE] : default_app_evt_queue_size; first_seg_size = options[APP_OPTIONS_SEGMENT_SIZE]; prealloc_fifo_pairs = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS]; - if ((rv = segment_manager_init (sm, first_seg_size, app_evt_queue_size, - prealloc_fifo_pairs))) + if ((rv = segment_manager_init (sm, first_seg_size, prealloc_fifo_pairs))) return rv; sm->first_is_protected = 1; @@ -319,6 +320,7 @@ application_init (application_t * app, u32 api_client_index, u64 * options, app->cb_fns = *cb_fns; app->ns_index = options[APP_OPTIONS_NAMESPACE]; app->listeners_table = hash_create (0, sizeof (u64)); + app->local_connects = hash_create (0, sizeof (u64)); app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT]; app->event_queue = segment_manager_event_queue (sm); @@ -333,6 +335,13 @@ application_init (application_t * app, u32 api_client_index, u64 * options, /* Add app to lookup by api_client_index table */ application_table_add (app); + /* + * Segment manager for local sessions + */ + sm = segment_manager_new (); + sm->app_index = app->index; + app->local_segment_manager = segment_manager_index (sm); + return 0; } @@ -388,11 +397,11 @@ application_alloc_segment_manager (application_t * app) */ int application_start_listen (application_t * srv, session_endpoint_t * sep, - u64 * res) + session_handle_t * res) { segment_manager_t *sm; stream_session_t *s; - u64 handle; + session_handle_t handle; session_type_t sst; sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); @@ -425,7 +434,7 @@ err: * Stop listening on session associated to handle */ int -application_stop_listen (application_t * srv, u64 handle) +application_stop_listen (application_t * srv, session_handle_t handle) { stream_session_t *listener; uword *indexp; @@ -499,6 +508,26 @@ application_get_listen_segment_manager (application_t * app, return segment_manager_get (*smp); } +segment_manager_t * +application_get_local_segment_manager (application_t * app) +{ + return segment_manager_get (app->local_segment_manager); +} + +segment_manager_t * +application_get_local_segment_manager_w_session (application_t * app, + local_session_t * ls) +{ + stream_session_t *listener; + if (application_local_session_listener_has_transport (ls)) + { + listener = listen_session_get (ls->listener_session_type, + ls->listener_index); + return application_get_listen_segment_manager (app, listener); + } + return segment_manager_get (app->local_segment_manager); +} + int application_is_proxy (application_t * app) { @@ -731,12 +760,407 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } +local_session_t * +application_alloc_local_session (application_t * app) +{ + local_session_t *s; + pool_get (app->local_sessions, s); + memset (s, 0, sizeof (*s)); + s->app_index = app->index; + s->session_index = s - app->local_sessions; + s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); + return s; +} + +void +application_free_local_session (application_t * app, local_session_t * s) +{ + pool_put (app->local_sessions, s); + if (CLIB_DEBUG) + memset (s, 0xfc, sizeof (*s)); +} + +local_session_t * +application_get_local_session (application_t * app, u32 session_index) +{ + return pool_elt_at_index (app->local_sessions, session_index); +} + +local_session_t * +application_get_local_session_from_handle (session_handle_t handle) +{ + application_t *server; + u32 session_index, server_index; + local_session_parse_handle (handle, &server_index, &session_index); + server = application_get (server_index); + return application_get_local_session (server, session_index); +} + +always_inline void +application_local_listener_session_endpoint (local_session_t * ll, + session_endpoint_t * sep) +{ + sep->transport_proto = + session_type_transport_proto (ll->listener_session_type); + sep->port = ll->port; + sep->is_ip4 = ll->listener_session_type & 1; +} + +int +application_start_local_listen (application_t * server, + session_endpoint_t * sep, + session_handle_t * handle) +{ + session_handle_t lh; + local_session_t *ll; + u32 table_index; + + table_index = application_local_session_table (server); + + /* An exact sep match, as opposed to session_lookup_local_listener */ + lh = session_lookup_endpoint_listener (table_index, sep, 1); + if (lh != SESSION_INVALID_HANDLE) + return VNET_API_ERROR_ADDRESS_IN_USE; + + pool_get (server->local_listen_sessions, ll); + memset (ll, 0, sizeof (*ll)); + ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); + ll->app_index = server->index; + ll->session_index = ll - server->local_listen_sessions; + ll->port = sep->port; + /* Store the original session type for the unbind */ + ll->listener_session_type = + session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); + + *handle = application_local_session_handle (ll); + session_lookup_add_session_endpoint (table_index, sep, *handle); + + return 0; +} + +/** + * Clean up local session table. If we have a listener session use it to + * find the port and proto. If not, the handle must be a local table handle + * so parse it. + */ +int +application_stop_local_listen (application_t * server, session_handle_t lh) +{ + session_endpoint_t sep = SESSION_ENDPOINT_NULL; + u32 table_index, ll_index, server_index; + stream_session_t *sl = 0; + local_session_t *ll, *ls; + + table_index = application_local_session_table (server); + + /* We have both local and global table binds. Figure from global what + * the sep we should be cleaning up is. + */ + if (!session_handle_is_local (lh)) + { + sl = listen_session_get_from_handle (lh); + if (!sl || listen_session_get_local_session_endpoint (sl, &sep)) + { + clib_warning ("broken listener"); + return -1; + } + lh = session_lookup_endpoint_listener (table_index, &sep, 0); + if (lh == SESSION_INVALID_HANDLE) + return -1; + } + + local_session_parse_handle (lh, &server_index, &ll_index); + ASSERT (server->index == server_index); + if (!(ll = application_get_local_listen_session (server, ll_index))) + { + clib_warning ("no local listener"); + return -1; + } + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + + /* *INDENT-OFF* */ + pool_foreach (ls, server->local_sessions, ({ + if (ls->listener_index == ll->session_index) + application_local_session_disconnect (server->index, ls); + })); + /* *INDENT-ON* */ + pool_put_index (server->local_listen_sessions, ll->session_index); + + return 0; +} + +int +application_local_session_connect (u32 table_index, application_t * client, + application_t * server, + local_session_t * ll, u32 opaque) +{ + u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; + segment_manager_properties_t *props, *cprops; + int rv, has_transport, seg_index; + svm_fifo_segment_private_t *seg; + segment_manager_t *sm; + local_session_t *ls; + svm_queue_t *sq, *cq; + + ls = application_alloc_local_session (server); + + props = application_segment_manager_properties (server); + cprops = application_segment_manager_properties (client); + evt_q_elts = props->evt_q_size + cprops->evt_q_size; + evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t); + seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin; + + has_transport = session_has_transport ((stream_session_t *) ll); + if (!has_transport) + { + /* Local sessions don't have backing transport */ + ls->port = ll->port; + sm = application_get_local_segment_manager (server); + } + else + { + stream_session_t *sl = (stream_session_t *) ll; + transport_connection_t *tc; + tc = listen_session_get_transport (sl); + ls->port = tc->lcl_port; + sm = application_get_listen_segment_manager (server, sl); + } + + seg_index = segment_manager_add_segment (sm, seg_size); + if (seg_index < 0) + { + clib_warning ("failed to add new cut-through segment"); + return seg_index; + } + seg = segment_manager_get_segment_w_lock (sm, seg_index); + sq = segment_manager_alloc_queue (seg, props->evt_q_size); + cq = segment_manager_alloc_queue (seg, cprops->evt_q_size); + ls->server_evt_q = pointer_to_uword (sq); + ls->client_evt_q = pointer_to_uword (cq); + rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, + props->tx_fifo_size, + &ls->server_rx_fifo, + &ls->server_tx_fifo); + if (rv) + { + clib_warning ("failed to add fifos in cut-through segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + ls->server_rx_fifo->master_session_index = ls->session_index; + ls->server_tx_fifo->master_session_index = ls->session_index; + ls->server_rx_fifo->master_thread_index = ~0; + ls->server_tx_fifo->master_thread_index = ~0; + ls->svm_segment_index = seg_index; + ls->listener_index = ll->session_index; + ls->client_index = client->index; + ls->client_opaque = opaque; + ls->listener_session_type = ll->session_type; + + if ((rv = server->cb_fns.add_segment_callback (server->api_client_index, + &seg->ssvm))) + { + clib_warning ("failed to notify server of new segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + segment_manager_segment_reader_unlock (sm); + if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls))) + { + clib_warning ("failed to send accept cut-through notify to server"); + goto failed; + } + if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN) + application_local_session_connect_notify (ls); + + return 0; + +failed: + if (!has_transport) + segment_manager_del_segment (sm, seg); + return rv; +} + +static uword +application_client_local_connect_key (local_session_t * ls) +{ + return ((uword) ls->app_index << 32 | (uword) ls->session_index); +} + +static void +application_client_local_connect_key_parse (uword key, u32 * app_index, + u32 * session_index) +{ + *app_index = key >> 32; + *session_index = key & 0xFFFFFFFF; +} + +int +application_local_session_connect_notify (local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + application_t *client, *server; + segment_manager_t *sm; + int rv, is_fail = 0; + uword client_key; + + client = application_get (ls->client_index); + server = application_get (ls->app_index); + sm = application_get_local_segment_manager_w_session (server, ls); + seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index); + if ((rv = client->cb_fns.add_segment_callback (client->api_client_index, + &seg->ssvm))) + { + clib_warning ("failed to notify client %u of new segment", + ls->client_index); + segment_manager_segment_reader_unlock (sm); + application_local_session_disconnect (ls->client_index, ls); + is_fail = 1; + } + else + { + segment_manager_segment_reader_unlock (sm); + } + + client->cb_fns.session_connected_callback (client->index, ls->client_opaque, + (stream_session_t *) ls, + is_fail); + + client_key = application_client_local_connect_key (ls); + hash_set (client->local_connects, client_key, client_key); + return 0; +} + +int +application_local_session_disconnect (u32 app_index, local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + application_t *client, *server; + segment_manager_t *sm; + uword client_key; + + client = application_get_if_valid (ls->client_index); + server = application_get (ls->app_index); + + if (ls->session_state == SESSION_STATE_CLOSED) + { + cleanup: + client_key = application_client_local_connect_key (ls); + sm = application_get_local_segment_manager_w_session (server, ls); + seg = segment_manager_get_segment (sm, ls->svm_segment_index); + + if (client) + { + hash_unset (client->local_connects, client_key); + client->cb_fns.del_segment_callback (client->api_client_index, + &seg->ssvm); + } + + server->cb_fns.del_segment_callback (server->api_client_index, + &seg->ssvm); + segment_manager_del_segment (sm, seg); + application_free_local_session (server, ls); + return 0; + } + + if (app_index == ls->client_index) + { + send_local_session_disconnect_callback (ls->app_index, ls); + } + else + { + if (!client) + { + goto cleanup; + } + else if (ls->session_state < SESSION_STATE_READY) + { + client->cb_fns.session_connected_callback (client->index, + ls->client_opaque, + (stream_session_t *) ls, + 1 /* is_fail */ ); + ls->session_state = SESSION_STATE_CLOSED; + goto cleanup; + } + else + { + send_local_session_disconnect_callback (ls->client_index, ls); + } + } + + ls->session_state = SESSION_STATE_CLOSED; + + return 0; +} + +void +application_local_sessions_del (application_t * app) +{ + u32 index, server_index, session_index, table_index; + segment_manager_t *sm; + u64 handle, *handles = 0; + local_session_t *ls, *ll; + application_t *server; + session_endpoint_t sep; + int i; + + /* + * Local listens. Don't bother with local sessions, we clean them lower + */ + table_index = application_local_session_table (app); + /* *INDENT-OFF* */ + pool_foreach (ll, app->local_listen_sessions, ({ + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + })); + /* *INDENT-ON* */ + + /* + * Local sessions + */ + if (app->local_sessions) + { + /* *INDENT-OFF* */ + pool_foreach (ls, app->local_sessions, ({ + application_local_session_disconnect (app->index, ls); + })); + /* *INDENT-ON* */ + } + + /* + * Local connects + */ + vec_reset_length (handles); + /* *INDENT-OFF* */ + hash_foreach (handle, index, app->local_connects, ({ + vec_add1 (handles, handle); + })); + /* *INDENT-ON* */ + + for (i = 0; i < vec_len (handles); i++) + { + application_client_local_connect_key_parse (handles[i], &server_index, + &session_index); + server = application_get_if_valid (server_index); + if (server) + { + ls = application_get_local_session (server, session_index); + application_local_session_disconnect (app->index, ls); + } + } + + sm = segment_manager_get (app->local_segment_manager); + sm->app_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + segment_manager_del (sm); +} + u8 * format_application_listener (u8 * s, va_list * args) { application_t *app = va_arg (*args, application_t *); u64 handle = va_arg (*args, u64); - u32 index = va_arg (*args, u32); + u32 sm_index = va_arg (*args, u32); int verbose = va_arg (*args, int); stream_session_t *listener; u8 *app_name, *str; @@ -759,7 +1183,7 @@ format_application_listener (u8 * s, va_list * args) if (verbose) { s = format (s, "%-40s%-20s%-15u%-15u%-10u", str, app_name, - app->api_client_index, handle, index); + app->api_client_index, handle, sm_index); } else s = format (s, "%-40s%-20s", str, app_name); @@ -832,6 +1256,76 @@ application_format_connects (application_t * app, int verbose) vec_free (app_name); } +void +application_format_local_sessions (application_t * app, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + local_session_t *ls; + transport_proto_t tp; + u8 *conn = 0; + + /* Header */ + if (app == 0) + { + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp", + "ClientApp"); + return; + } + + /* *INDENT-OFF* */ + pool_foreach (ls, app->local_listen_sessions, ({ + tp = session_type_transport_proto(ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_index, "*"); + vec_reset_length (conn); + })); + pool_foreach (ls, app->local_sessions, ({ + tp = session_type_transport_proto(ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index, + ls->client_index); + vec_reset_length (conn); + })); + /* *INDENT-ON* */ + + vec_free (conn); +} + +void +application_format_local_connects (application_t * app, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + u32 app_index, session_index; + application_t *server; + local_session_t *ls; + uword client_key; + u64 value; + + /* Header */ + if (app == 0) + { + if (verbose) + vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App", + "Peer App", "SegManager"); + else + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App", + "Peer App"); + return; + } + + /* *INDENT-OFF* */ + hash_foreach (client_key, value, app->local_connects, ({ + application_client_local_connect_key_parse (client_key, &app_index, + &session_index); + server = application_get (app_index); + ls = application_get_local_session (server, session_index); + vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_index, ls->client_index); + })); + /* *INDENT-ON* */ +} + u8 * format_application (u8 * s, va_list * args) { @@ -869,13 +1363,92 @@ format_application (u8 * s, va_list * args) return s; } + +void +application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose) +{ + application_t *app; + u32 sm_index; + u64 handle; + + if (!pool_elts (app_pool)) + { + vlib_cli_output (vm, "No active server bindings"); + return; + } + + if (do_local) + { + application_format_local_sessions (0, verbose); + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (!pool_elts (app->local_sessions) + && !pool_elts(app->local_connects)) + continue; + application_format_local_sessions (app, verbose); + })); + /* *INDENT-ON* */ + } + else + { + vlib_cli_output (vm, "%U", format_application_listener, 0 /* header */ , + 0, 0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (hash_elts (app->listeners_table) == 0) + continue; + hash_foreach (handle, sm_index, app->listeners_table, ({ + vlib_cli_output (vm, "%U", format_application_listener, app, + handle, sm_index, verbose); + })); + })); + /* *INDENT-ON* */ + } +} + +void +application_format_all_clients (vlib_main_t * vm, int do_local, int verbose) +{ + application_t *app; + + if (!pool_elts (app_pool)) + { + vlib_cli_output (vm, "No active apps"); + return; + } + + if (do_local) + { + application_format_local_connects (0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (app->local_connects) + application_format_local_connects (app, verbose); + })); + /* *INDENT-ON* */ + } + else + { + application_format_connects (0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (app->connects_seg_manager == (u32)~0) + continue; + application_format_connects (app, verbose); + })); + /* *INDENT-ON* */ + } +} + static clib_error_t * show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { + int do_server = 0, do_client = 0, do_local = 0; application_t *app; - int do_server = 0; - int do_client = 0; int verbose = 0; session_cli_return_if_not_enabled (); @@ -886,6 +1459,8 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, do_server = 1; else if (unformat (input, "client")) do_client = 1; + else if (unformat (input, "local")) + do_local = 1; else if (unformat (input, "verbose")) verbose = 1; else @@ -893,49 +1468,10 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, } if (do_server) - { - u64 handle; - u32 index; - if (pool_elts (app_pool)) - { - vlib_cli_output (vm, "%U", format_application_listener, - 0 /* header */ , 0, 0, verbose); - - /* *INDENT-OFF* */ - pool_foreach (app, app_pool, ({ - /* App's listener sessions */ - if (hash_elts (app->listeners_table) == 0) - continue; - hash_foreach (handle, index, app->listeners_table, ({ - vlib_cli_output (vm, "%U", format_application_listener, app, - handle, index, verbose); - })); - })); - /* *INDENT-ON* */ - - } - else - vlib_cli_output (vm, "No active server bindings"); - } + application_format_all_listeners (vm, do_local, verbose); if (do_client) - { - if (pool_elts (app_pool)) - { - application_format_connects (0, verbose); - - /* *INDENT-OFF* */ - pool_foreach (app, app_pool, - ({ - if (app->connects_seg_manager == (u32)~0) - continue; - application_format_connects (app, verbose); - })); - /* *INDENT-ON* */ - } - else - vlib_cli_output (vm, "No active client bindings"); - } + application_format_all_clients (vm, do_local, verbose); /* Print app related info */ if (!do_server && !do_client) |