From 623eb5676689f6874716857a1085d0efd4974c4c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Sun, 3 Feb 2019 19:28:34 -0800 Subject: session: cleanup part 2 Move app worker logic to app_worker.c Change-Id: Ic5e5735b2884f006c064d023f491aa6888114810 Signed-off-by: Florin Coras --- src/vnet/CMakeLists.txt | 1 + src/vnet/session/application.c | 1200 ++---------------------------- src/vnet/session/application.h | 112 +-- src/vnet/session/application_interface.c | 16 +- src/vnet/session/application_worker.c | 1116 +++++++++++++++++++++++++++ src/vnet/session/segment_manager.c | 2 +- src/vnet/session/session_api.c | 4 +- src/vnet/session/session_node.c | 4 +- 8 files changed, 1246 insertions(+), 1209 deletions(-) create mode 100644 src/vnet/session/application_worker.c (limited to 'src/vnet') diff --git a/src/vnet/CMakeLists.txt b/src/vnet/CMakeLists.txt index e14f0877bd5..72da325a08f 100644 --- a/src/vnet/CMakeLists.txt +++ b/src/vnet/CMakeLists.txt @@ -1038,6 +1038,7 @@ list(APPEND VNET_SOURCES session/session_node.c session/transport.c session/application.c + session/application_worker.c session/session_cli.c session/application_interface.c session/application_namespace.c diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 268816561a7..3380d0bca4a 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -497,104 +497,8 @@ application_listener_select_worker (session_t * ls, u8 is_local) return application_get_worker (app, wrk_index); } -app_worker_t * -app_worker_alloc (application_t * app) -{ - app_worker_t *app_wrk; - pool_get (app_main.workers, app_wrk); - clib_memset (app_wrk, 0, sizeof (*app_wrk)); - app_wrk->wrk_index = app_wrk - app_main.workers; - app_wrk->app_index = app->app_index; - app_wrk->wrk_map_index = ~0; - app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index); - return app_wrk; -} - -app_worker_t * -app_worker_get (u32 wrk_index) -{ - return pool_elt_at_index (app_main.workers, wrk_index); -} - -app_worker_t * -app_worker_get_if_valid (u32 wrk_index) -{ - if (pool_is_free_index (app_main.workers, wrk_index)) - return 0; - return pool_elt_at_index (app_main.workers, wrk_index); -} - -void -app_worker_free (app_worker_t * app_wrk) -{ - application_t *app = application_get (app_wrk->app_index); - vnet_unbind_args_t _a, *a = &_a; - u64 handle, *handles = 0; - segment_manager_t *sm; - u32 sm_index; - int i; - - /* - * Listener cleanup - */ - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app_wrk->listeners_table, - ({ - vec_add1 (handles, handle); - sm = segment_manager_get (sm_index); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - })); - /* *INDENT-ON* */ - - for (i = 0; i < vec_len (handles); i++) - { - a->app_index = app->app_index; - a->wrk_map_index = app_wrk->wrk_map_index; - a->handle = handles[i]; - /* seg manager is removed when unbind completes */ - vnet_unbind (a); - } - - /* - * Connects segment manager cleanup - */ - - if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX) - { - sm = segment_manager_get (app_wrk->connects_seg_manager); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - segment_manager_init_del (sm); - } - - /* If first segment manager is used by a listener */ - if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX - && app_wrk->first_segment_manager != app_wrk->connects_seg_manager) - { - sm = segment_manager_get (app_wrk->first_segment_manager); - sm->first_is_protected = 0; - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - /* .. and has no fifos, e.g. it might be used for redirected sessions, - * remove it */ - if (!segment_manager_has_fifos (sm)) - segment_manager_del (sm); - } - - /* - * Local sessions - */ - app_worker_local_sessions_free (app_wrk); - - pool_put (app_main.workers, app_wrk); - if (CLIB_DEBUG) - clib_memset (app_wrk, 0xfe, sizeof (*app_wrk)); -} - int -app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk) +application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk) { app_worker_map_t *wrk_map; app_worker_t *app_wrk; @@ -641,160 +545,6 @@ app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk) return 0; } -application_t * -app_worker_get_app (u32 wrk_index) -{ - app_worker_t *app_wrk; - app_wrk = app_worker_get_if_valid (wrk_index); - if (!app_wrk) - return 0; - return application_get_if_valid (app_wrk->app_index); -} - -static segment_manager_t * -app_worker_alloc_segment_manager (app_worker_t * app_wrk) -{ - segment_manager_t *sm = 0; - - /* If the first segment manager is not in use, don't allocate a new one */ - if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX - && app_wrk->first_segment_manager_in_use == 0) - { - sm = segment_manager_get (app_wrk->first_segment_manager); - app_wrk->first_segment_manager_in_use = 1; - return sm; - } - - sm = segment_manager_new (); - sm->app_wrk_index = app_wrk->wrk_index; - - return sm; -} - -int -app_worker_start_listen (app_worker_t * app_wrk, session_t * ls) -{ - segment_manager_t *sm; - - /* Allocate segment manager. All sessions derived out of a listen session - * have fifos allocated by the same segment manager. */ - if (!(sm = app_worker_alloc_segment_manager (app_wrk))) - return -1; - - /* Add to app's listener table. Useful to find all child listeners - * when app goes down, although, just for unbinding this is not needed */ - hash_set (app_wrk->listeners_table, listen_session_get_handle (ls), - segment_manager_index (sm)); - - if (!ls->rx_fifo - && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL) - { - if (session_alloc_fifos (sm, ls)) - return -1; - } - return 0; -} - -int -app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) -{ - segment_manager_t *sm; - uword *sm_indexp; - - sm_indexp = hash_get (app_wrk->listeners_table, handle); - if (PREDICT_FALSE (!sm_indexp)) - { - clib_warning ("listener handle was removed %llu!", handle); - return -1; - } - - sm = segment_manager_get (*sm_indexp); - if (app_wrk->first_segment_manager == *sm_indexp) - { - /* Delete sessions but don't remove segment manager */ - app_wrk->first_segment_manager_in_use = 0; - segment_manager_del_sessions (sm); - } - else - { - segment_manager_init_del (sm); - } - hash_unset (app_wrk->listeners_table, handle); - - return 0; -} - -int -app_worker_own_session (app_worker_t * app_wrk, session_t * s) -{ - segment_manager_t *sm; - svm_fifo_t *rxf, *txf; - - if (s->session_state == SESSION_STATE_LISTENING) - { - app_worker_t *old_wrk = app_worker_get (s->app_wrk_index); - u64 lsh = listen_session_get_handle (s); - app_listener_t *app_listener; - application_t *app; - - if (!old_wrk) - return -1; - - hash_unset (old_wrk->listeners_table, lsh); - if (!(sm = app_worker_alloc_segment_manager (app_wrk))) - return -1; - - hash_set (app_wrk->listeners_table, lsh, segment_manager_index (sm)); - s->app_wrk_index = app_wrk->wrk_index; - - app = application_get (old_wrk->app_index); - if (!app) - return -1; - - app_listener = app_listener_get (app, s->listener_db_index); - app_listener->workers = clib_bitmap_set (app_listener->workers, - app_wrk->wrk_map_index, 1); - app_listener->workers = clib_bitmap_set (app_listener->workers, - old_wrk->wrk_map_index, 0); - return 0; - } - - s->app_wrk_index = app_wrk->wrk_index; - - rxf = s->rx_fifo; - txf = s->tx_fifo; - - if (!rxf || !txf) - return 0; - - s->rx_fifo = 0; - s->tx_fifo = 0; - - sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); - if (session_alloc_fifos (sm, s)) - return -1; - - if (!svm_fifo_is_empty (rxf)) - { - clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems); - s->rx_fifo->head = rxf->head; - s->rx_fifo->tail = rxf->tail; - s->rx_fifo->cursize = rxf->cursize; - } - - if (!svm_fifo_is_empty (txf)) - { - clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems); - s->tx_fifo->head = txf->head; - s->tx_fifo->tail = txf->tail; - s->tx_fifo->cursize = txf->cursize; - } - - segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); - - return 0; -} - /** * Start listening local transport endpoint for requested transport. * @@ -877,6 +627,39 @@ err: return -1; } +int +application_change_listener_owner (session_t * s, app_worker_t * app_wrk) +{ + app_worker_t *old_wrk = app_worker_get (s->app_wrk_index); + app_listener_t *app_listener; + application_t *app; + + if (!old_wrk) + return -1; + + hash_unset (old_wrk->listeners_table, listen_session_get_handle (s)); + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL + && s->rx_fifo) + segment_manager_dealloc_fifos (s->rx_fifo->segment_index, s->rx_fifo, + s->tx_fifo); + + if (app_worker_start_listen (app_wrk, s)) + return -1; + + s->app_wrk_index = app_wrk->wrk_index; + + app = application_get (old_wrk->app_index); + if (!app) + return -1; + + app_listener = app_listener_get (app, s->listener_db_index); + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); + app_listener->workers = clib_bitmap_set (app_listener->workers, + old_wrk->wrk_map_index, 0); + return 0; +} + /** * Stop listening on session associated to handle * @@ -923,61 +706,6 @@ application_stop_listen (u32 app_index, u32 app_wrk_index, return 0; } -int -app_worker_open_session (app_worker_t * app, session_endpoint_t * sep, - u32 api_context) -{ - int rv; - - /* Make sure we have a segment manager for connects */ - app_worker_alloc_connects_segment_manager (app); - - if ((rv = session_open (app->wrk_index, sep, api_context))) - return rv; - - return 0; -} - -int -app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk) -{ - segment_manager_t *sm; - - if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX) - { - sm = app_worker_alloc_segment_manager (app_wrk); - if (sm == 0) - return -1; - app_wrk->connects_seg_manager = segment_manager_index (sm); - } - return 0; -} - -segment_manager_t * -app_worker_get_connect_segment_manager (app_worker_t * app) -{ - ASSERT (app->connects_seg_manager != (u32) ~ 0); - return segment_manager_get (app->connects_seg_manager); -} - -segment_manager_t * -app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) -{ - if (app_wrk->connects_seg_manager == (u32) ~ 0) - app_worker_alloc_connects_segment_manager (app_wrk); - return segment_manager_get (app_wrk->connects_seg_manager); -} - -segment_manager_t * -app_worker_get_listen_segment_manager (app_worker_t * app, - session_t * listener) -{ - uword *smp; - smp = hash_get (app->listeners_table, listen_session_get_handle (listener)); - ASSERT (smp != 0); - return segment_manager_get (*smp); -} - clib_error_t * vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) { @@ -995,7 +723,7 @@ vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) if (a->is_add) { - if ((rv = app_worker_alloc_and_init (app, &app_wrk))) + if ((rv = application_alloc_worker_and_init (app, &app_wrk))) return clib_error_return_code (0, rv, 0, "app wrk init: %d", rv); /* Map worker api index to the app */ @@ -1030,25 +758,6 @@ vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) return 0; } -segment_manager_t * -application_get_local_segment_manager (app_worker_t * app) -{ - return segment_manager_get (app->local_segment_manager); -} - -segment_manager_t * -application_get_local_segment_manager_w_session (app_worker_t * app, - local_session_t * ls) -{ - session_t *listener; - if (application_local_session_listener_has_transport (ls)) - { - listener = listen_session_get (ls->listener_index); - return app_worker_get_listen_segment_manager (app, listener); - } - return segment_manager_get (app->local_segment_manager); -} - int application_is_proxy (application_t * app) { @@ -1085,78 +794,6 @@ application_use_mq_for_ctrl (application_t * app) return app->flags & APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS; } -/** - * Send an API message to the external app, to map new segment - */ -int -app_worker_add_segment_notify (u32 app_wrk_index, u64 segment_handle) -{ - app_worker_t *app_wrk = app_worker_get (app_wrk_index); - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.add_segment_callback (app_wrk->api_client_index, - segment_handle); -} - -u32 -application_n_listeners (app_worker_t * app) -{ - return hash_elts (app->listeners_table); -} - -session_t * -app_worker_first_listener (app_worker_t * app, u8 fib_proto, - u8 transport_proto) -{ - session_t *listener; - u64 handle; - u32 sm_index; - u8 sst; - - sst = session_type_from_proto_and_ip (transport_proto, - fib_proto == FIB_PROTOCOL_IP4); - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app->listeners_table, ({ - listener = listen_session_get_from_handle (handle); - if (listener->session_type == sst - && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX) - return listener; - })); - /* *INDENT-ON* */ - - return 0; -} - -u8 -app_worker_application_is_builtin (app_worker_t * app_wrk) -{ - return app_wrk->app_is_builtin; -} - -session_t * -application_proxy_listener (app_worker_t * app, u8 fib_proto, - u8 transport_proto) -{ - session_t *listener; - u64 handle; - u32 sm_index; - u8 sst; - - sst = session_type_from_proto_and_ip (transport_proto, - fib_proto == FIB_PROTOCOL_IP4); - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app->listeners_table, ({ - listener = listen_session_get_from_handle (handle); - if (listener->session_type == sst - && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX) - return listener; - })); - /* *INDENT-ON* */ - - return 0; -} - static clib_error_t * application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, u8 transport_proto, u8 is_start) @@ -1188,7 +825,7 @@ application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, } else { - s = application_proxy_listener (app_wrk, fib_proto, transport_proto); + s = app_worker_proxy_listener (app_wrk, fib_proto, transport_proto); ASSERT (s); } @@ -1302,216 +939,29 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } -static inline int -app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +local_session_t * +application_local_listen_session_alloc (application_t * app) { - if (PREDICT_FALSE (svm_msg_q_is_full (mq))) - { - clib_warning ("evt q full"); - svm_msg_q_free_msg (mq, msg); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - if (lock) - { - svm_msg_q_add_and_unlock (mq, msg); - return 0; - } + local_session_t *ll; + pool_get (app->local_listen_sessions, ll); + clib_memset (ll, 0, sizeof (*ll)); + return ll; +} - /* Even when not locking the ring, we must wait for queue mutex */ - if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) - { - clib_warning ("msg q add returned"); - return -1; - } - return 0; +u32 +application_local_listener_index (application_t * app, local_session_t * ll) +{ + return (ll - app->local_listen_sessions); } -static inline int -app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock) +void +application_local_listen_session_free (application_t * app, + local_session_t * ll) { - session_event_t *evt; - svm_msg_q_msg_t msg; - svm_msg_q_t *mq; - - if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY - && s->session_state != SESSION_STATE_LISTENING)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - if (s->session_state == SESSION_STATE_CLOSED) - svm_fifo_dequeue_drop_all (s->rx_fifo); - return 0; - } - - if (app_worker_application_is_builtin (app_wrk)) - { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.builtin_app_rx_callback (s); - } - - if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) - return 0; - - mq = app_wrk->event_queue; - if (lock) - svm_msg_q_lock (mq); - - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) - { - clib_warning ("evt q rings full"); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); - ASSERT (!svm_msg_q_msg_is_invalid (&msg)); - - evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); - evt->fifo = s->rx_fifo; - evt->event_type = FIFO_EVENT_APP_RX; - - (void) svm_fifo_set_event (s->rx_fifo); - - if (app_enqueue_evt (mq, &msg, lock)) - return -1; - return 0; -} - -static inline int -app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock) -{ - svm_msg_q_t *mq; - session_event_t *evt; - svm_msg_q_msg_t msg; - - if (app_worker_application_is_builtin (app_wrk)) - return 0; - - mq = app_wrk->event_queue; - if (lock) - svm_msg_q_lock (mq); - - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) - { - clib_warning ("evt q rings full"); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); - ASSERT (!svm_msg_q_msg_is_invalid (&msg)); - - evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); - evt->event_type = FIFO_EVENT_APP_TX; - evt->fifo = s->tx_fifo; - - return app_enqueue_evt (mq, &msg, lock); -} - -/* *INDENT-OFF* */ -typedef int (app_send_evt_handler_fn) (app_worker_t *app, - session_t *s, - u8 lock); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { - app_send_io_evt_rx, - 0, - app_send_io_evt_tx, -}; -/* *INDENT-ON* */ - -/** - * Send event to application - * - * Logic from queue perspective is non-blocking. If there's - * not enough space to enqueue a message, we return. - */ -int -app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type) -{ - ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); - return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); -} - -/** - * Send event to application - * - * Logic from queue perspective is blocking. However, if queue is full, - * we return. - */ -int -app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type) -{ - return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); -} - -local_session_t * -application_local_session_alloc (app_worker_t * app_wrk) -{ - local_session_t *s; - pool_get (app_wrk->local_sessions, s); - clib_memset (s, 0, sizeof (*s)); - s->app_wrk_index = app_wrk->wrk_index; - s->session_index = s - app_wrk->local_sessions; - s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); - return s; -} - -void -application_local_session_free (app_worker_t * app, local_session_t * s) -{ - pool_put (app->local_sessions, s); - if (CLIB_DEBUG) - clib_memset (s, 0xfc, sizeof (*s)); -} - -local_session_t * -application_get_local_session (app_worker_t * app_wrk, u32 session_index) -{ - if (pool_is_free_index (app_wrk->local_sessions, session_index)) - return 0; - return pool_elt_at_index (app_wrk->local_sessions, session_index); -} - -local_session_t * -application_get_local_session_from_handle (session_handle_t handle) -{ - app_worker_t *server_wrk; - u32 session_index, server_wrk_index; - local_session_parse_handle (handle, &server_wrk_index, &session_index); - server_wrk = app_worker_get_if_valid (server_wrk_index); - if (!server_wrk) - return 0; - return application_get_local_session (server_wrk, session_index); -} - -local_session_t * -application_local_listen_session_alloc (application_t * app) -{ - local_session_t *ll; - pool_get (app->local_listen_sessions, ll); - clib_memset (ll, 0, sizeof (*ll)); - return ll; -} - -u32 -application_local_listener_index (application_t * app, local_session_t * ll) -{ - return (ll - app->local_listen_sessions); -} - -void -application_local_listen_session_free (application_t * app, - local_session_t * ll) -{ - pool_put (app->local_listen_sessions, ll); - if (CLIB_DEBUG) - clib_memset (ll, 0xfb, sizeof (*ll)); -} + pool_put (app->local_listen_sessions, ll); + if (CLIB_DEBUG) + clib_memset (ll, 0xfb, sizeof (*ll)); +} int application_start_local_listen (application_t * app, @@ -1591,7 +1041,7 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, table_index = application_local_session_table (server); /* We have both local and global table binds. Figure from global what - * the sep we should be cleaning up is. + * sep we should be cleaning up. */ if (!session_handle_is_local (lh)) { @@ -1632,7 +1082,7 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, /* *INDENT-OFF* */ pool_foreach (ls, server_wrk->local_sessions, ({ if (ls->listener_index == ll->session_index) - application_local_session_disconnect (server_wrk->app_index, ls); + app_worker_local_session_disconnect (server_wrk->app_index, ls); })); /* *INDENT-ON* */ @@ -1648,356 +1098,6 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, return 0; } -static void -application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) -{ - int fd; - - /* - * segment manager initializes only the producer eventds, since vpp is - * typically the producer. But for local sessions, we also pass to the - * apps the mqs they listen on for events from peer apps, so they are also - * consumer fds. - */ - fd = svm_msg_q_get_producer_eventfd (sq); - svm_msg_q_set_consumer_eventfd (sq, fd); - fd = svm_msg_q_get_producer_eventfd (cq); - svm_msg_q_set_consumer_eventfd (cq, fd); -} - -int -application_local_session_connect (app_worker_t * client_wrk, - app_worker_t * server_wrk, - local_session_t * ll, u32 opaque) -{ - u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; - u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index; - segment_manager_properties_t *props, *cprops; - int rv, has_transport, seg_index; - svm_fifo_segment_private_t *seg; - application_t *server, *client; - segment_manager_t *sm; - local_session_t *ls; - svm_msg_q_t *sq, *cq; - u64 segment_handle; - - ls = application_local_session_alloc (server_wrk); - server = application_get (server_wrk->app_index); - client = application_get (client_wrk->app_index); - - props = application_segment_manager_properties (server); - cprops = application_segment_manager_properties (client); - evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); - round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); - round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); - seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; - - has_transport = session_has_transport ((session_t *) ll); - if (!has_transport) - { - /* Local sessions don't have backing transport */ - ls->port = ll->port; - sm = application_get_local_segment_manager (server_wrk); - } - else - { - session_t *sl = (session_t *) ll; - transport_connection_t *tc; - tc = listen_session_get_transport (sl); - ls->port = tc->lcl_port; - sm = app_worker_get_listen_segment_manager (server_wrk, sl); - } - - seg_index = segment_manager_add_segment (sm, seg_size); - if (seg_index < 0) - { - clib_warning ("failed to add new cut-through segment"); - return seg_index; - } - seg = segment_manager_get_segment_w_lock (sm, seg_index); - sq = segment_manager_alloc_queue (seg, props); - cq = segment_manager_alloc_queue (seg, cprops); - - if (props->use_mq_eventfd) - application_local_session_fix_eventds (sq, cq); - - ls->server_evt_q = pointer_to_uword (sq); - ls->client_evt_q = pointer_to_uword (cq); - rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, - props->tx_fifo_size, - &ls->rx_fifo, &ls->tx_fifo); - if (rv) - { - clib_warning ("failed to add fifos in cut-through segment"); - segment_manager_segment_reader_unlock (sm); - goto failed; - } - sm_index = segment_manager_index (sm); - ls->rx_fifo->ct_session_index = ls->session_index; - ls->tx_fifo->ct_session_index = ls->session_index; - ls->rx_fifo->segment_manager = sm_index; - ls->tx_fifo->segment_manager = sm_index; - ls->rx_fifo->segment_index = seg_index; - ls->tx_fifo->segment_index = seg_index; - ls->svm_segment_index = seg_index; - ls->listener_index = ll->session_index; - ls->client_wrk_index = client_wrk->wrk_index; - ls->client_opaque = opaque; - ls->listener_session_type = ll->session_type; - ls->session_state = SESSION_STATE_READY; - - segment_handle = segment_manager_segment_handle (sm, seg); - if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index, - segment_handle))) - { - clib_warning ("failed to notify server of new segment"); - segment_manager_segment_reader_unlock (sm); - goto failed; - } - segment_manager_segment_reader_unlock (sm); - if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls))) - { - clib_warning ("failed to send accept cut-through notify to server"); - goto failed; - } - if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN) - application_local_session_connect_notify (ls); - - return 0; - -failed: - if (!has_transport) - segment_manager_del_segment (sm, seg); - return rv; -} - -static u64 -application_client_local_connect_key (local_session_t * ls) -{ - return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index); -} - -static void -application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index, - u32 * session_index) -{ - *app_wrk_index = key >> 32; - *session_index = key & 0xFFFFFFFF; -} - -int -application_local_session_connect_notify (local_session_t * ls) -{ - svm_fifo_segment_private_t *seg; - app_worker_t *client_wrk, *server_wrk; - segment_manager_t *sm; - application_t *client; - int rv, is_fail = 0; - u64 segment_handle; - u64 client_key; - - client_wrk = app_worker_get (ls->client_wrk_index); - server_wrk = app_worker_get (ls->app_wrk_index); - client = application_get (client_wrk->app_index); - - sm = application_get_local_segment_manager_w_session (server_wrk, ls); - seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index); - segment_handle = segment_manager_segment_handle (sm, seg); - if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index, - segment_handle))) - { - clib_warning ("failed to notify client %u of new segment", - ls->client_wrk_index); - segment_manager_segment_reader_unlock (sm); - application_local_session_disconnect (ls->client_wrk_index, ls); - is_fail = 1; - } - else - { - segment_manager_segment_reader_unlock (sm); - } - - client->cb_fns.session_connected_callback (client_wrk->wrk_index, - ls->client_opaque, - (session_t *) ls, is_fail); - - client_key = application_client_local_connect_key (ls); - hash_set (client_wrk->local_connects, client_key, client_key); - return 0; -} - -int -application_local_session_cleanup (app_worker_t * client_wrk, - app_worker_t * server_wrk, - local_session_t * ls) -{ - svm_fifo_segment_private_t *seg; - session_t *listener; - segment_manager_t *sm; - u64 client_key; - u8 has_transport; - - /* Retrieve listener transport type as it is the one that decides where - * the fifos are allocated */ - has_transport = application_local_session_listener_has_transport (ls); - if (!has_transport) - sm = application_get_local_segment_manager_w_session (server_wrk, ls); - else - { - listener = listen_session_get (ls->listener_index); - sm = app_worker_get_listen_segment_manager (server_wrk, listener); - } - - seg = segment_manager_get_segment (sm, ls->svm_segment_index); - if (client_wrk) - { - client_key = application_client_local_connect_key (ls); - hash_unset (client_wrk->local_connects, client_key); - } - - if (!has_transport) - { - application_t *server = application_get (server_wrk->app_index); - u64 segment_handle = segment_manager_segment_handle (sm, seg); - server->cb_fns.del_segment_callback (server_wrk->api_client_index, - segment_handle); - if (client_wrk) - { - application_t *client = application_get (client_wrk->app_index); - client->cb_fns.del_segment_callback (client_wrk->api_client_index, - segment_handle); - } - segment_manager_del_segment (sm, seg); - } - - application_local_session_free (server_wrk, ls); - - return 0; -} - -int -application_local_session_disconnect (u32 app_index, local_session_t * ls) -{ - app_worker_t *client_wrk, *server_wrk; - u8 is_server = 0, is_client = 0; - application_t *app; - - app = application_get_if_valid (app_index); - if (!app) - return 0; - - client_wrk = app_worker_get_if_valid (ls->client_wrk_index); - server_wrk = app_worker_get (ls->app_wrk_index); - - if (server_wrk->app_index == app_index) - is_server = 1; - else if (client_wrk && client_wrk->app_index == app_index) - is_client = 1; - - if (!is_server && !is_client) - { - clib_warning ("app %u is neither client nor server for session 0x%lx", - app_index, application_local_session_handle (ls)); - return VNET_API_ERROR_INVALID_VALUE; - } - - if (ls->session_state == SESSION_STATE_CLOSED) - return application_local_session_cleanup (client_wrk, server_wrk, ls); - - if (app_index == ls->client_wrk_index) - { - mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls); - } - else - { - if (!client_wrk) - { - return application_local_session_cleanup (client_wrk, server_wrk, - ls); - } - else if (ls->session_state < SESSION_STATE_READY) - { - application_t *client = application_get (client_wrk->app_index); - client->cb_fns.session_connected_callback (client_wrk->wrk_index, - ls->client_opaque, - (session_t *) ls, - 1 /* is_fail */ ); - ls->session_state = SESSION_STATE_CLOSED; - return application_local_session_cleanup (client_wrk, server_wrk, - ls); - } - else - { - mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls); - } - } - - ls->session_state = SESSION_STATE_CLOSED; - - return 0; -} - -int -application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index) -{ - app_worker_t *app_wrk; - local_session_t *ls; - app_wrk = app_worker_get (app_wrk_index); - ls = application_get_local_session (app_wrk, ls_index); - return application_local_session_disconnect (app_wrk_index, ls); -} - -void -app_worker_local_sessions_free (app_worker_t * app_wrk) -{ - u32 index, server_wrk_index, session_index; - u64 handle, *handles = 0; - app_worker_t *server_wrk; - segment_manager_t *sm; - local_session_t *ls; - int i; - - /* - * Local sessions - */ - if (app_wrk->local_sessions) - { - /* *INDENT-OFF* */ - pool_foreach (ls, app_wrk->local_sessions, ({ - application_local_session_disconnect (app_wrk->wrk_index, ls); - })); - /* *INDENT-ON* */ - } - - /* - * Local connects - */ - vec_reset_length (handles); - /* *INDENT-OFF* */ - hash_foreach (handle, index, app_wrk->local_connects, ({ - vec_add1 (handles, handle); - })); - /* *INDENT-ON* */ - - for (i = 0; i < vec_len (handles); i++) - { - application_client_local_connect_key_parse (handles[i], - &server_wrk_index, - &session_index); - server_wrk = app_worker_get_if_valid (server_wrk_index); - if (server_wrk) - { - ls = application_get_local_session (server_wrk, session_index); - application_local_session_disconnect (app_wrk->wrk_index, ls); - } - } - - sm = segment_manager_get (app_wrk->local_segment_manager); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - segment_manager_del (sm); -} - clib_error_t * vnet_app_add_tls_cert (vnet_app_add_tls_cert_args_t * a) { @@ -2022,45 +1122,6 @@ vnet_app_add_tls_key (vnet_app_add_tls_key_args_t * a) return 0; } -u8 * -format_app_worker_listener (u8 * s, va_list * args) -{ - app_worker_t *app_wrk = va_arg (*args, app_worker_t *); - u64 handle = va_arg (*args, u64); - u32 sm_index = va_arg (*args, u32); - int verbose = va_arg (*args, int); - session_t *listener; - const u8 *app_name; - u8 *str; - - if (!app_wrk) - { - if (verbose) - s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App", - "Wrk", "API Client", "ListenerID", "SegManager"); - else - s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk"); - - return s; - } - - app_name = application_name_from_index (app_wrk->app_index); - listener = listen_session_get_from_handle (handle); - str = format (0, "%U", format_stream_session, listener, verbose); - - if (verbose) - { - char buf[32]; - sprintf (buf, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index); - s = format (s, "%-40s%-25s%=10s%-15u%-15u%-10u", str, app_name, - buf, app_wrk->api_client_index, handle, sm_index); - } - else - s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index); - - return s; -} - static void application_format_listeners (application_t * app, int verbose) { @@ -2090,69 +1151,6 @@ application_format_listeners (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_connects (app_worker_t * app_wrk, int verbose) -{ - svm_fifo_segment_private_t *fifo_segment; - vlib_main_t *vm = vlib_get_main (); - segment_manager_t *sm; - const u8 *app_name; - u8 *s = 0; - - /* Header */ - if (!app_wrk) - { - if (verbose) - vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App", - "API Client", "SegManager"); - else - vlib_cli_output (vm, "%-40s%-20s", "Connection", "App"); - return; - } - - if (app_wrk->connects_seg_manager == (u32) ~ 0) - return; - - app_name = application_name_from_index (app_wrk->app_index); - - /* Across all fifo segments */ - sm = segment_manager_get (app_wrk->connects_seg_manager); - - /* *INDENT-OFF* */ - segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({ - svm_fifo_t *fifo; - u8 *str; - - fifo = svm_fifo_segment_get_fifo_list (fifo_segment); - while (fifo) - { - u32 session_index, thread_index; - session_t *session; - - session_index = fifo->master_session_index; - thread_index = fifo->master_thread_index; - - session = session_get (session_index, thread_index); - str = format (0, "%U", format_stream_session, session, verbose); - - if (verbose) - s = format (s, "%-40s%-20s%-15u%-10u", str, app_name, - app_wrk->api_client_index, app_wrk->connects_seg_manager); - else - s = format (s, "%-40s%-20s", str, app_name); - - vlib_cli_output (vm, "%v", s); - vec_reset_length (s); - vec_free (str); - - fifo = fifo->next; - } - vec_free (s); - })); - /* *INDENT-ON* */ - -} - static void application_format_connects (application_t * app, int verbose) { @@ -2173,40 +1171,6 @@ application_format_connects (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose) -{ - vlib_main_t *vm = vlib_get_main (); - local_session_t *ls; - transport_proto_t tp; - u8 *conn = 0; - - /* Header */ - if (app_wrk == 0) - { - vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp", - "ClientApp"); - return; - } - - if (!pool_elts (app_wrk->local_sessions) - && !pool_elts (app_wrk->local_connects)) - return; - - /* *INDENT-OFF* */ - pool_foreach (ls, app_wrk->local_sessions, ({ - tp = session_type_transport_proto(ls->listener_session_type); - conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, - ls->port); - vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_wrk_index, - ls->client_wrk_index); - vec_reset_length (conn); - })); - /* *INDENT-ON* */ - - vec_free (conn); -} - static void application_format_local_sessions (application_t * app, int verbose) { @@ -2248,43 +1212,6 @@ application_format_local_sessions (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_local_connects (app_worker_t * app, int verbose) -{ - vlib_main_t *vm = vlib_get_main (); - u32 app_wrk_index, session_index; - app_worker_t *server_wrk; - local_session_t *ls; - u64 client_key; - u64 value; - - /* Header */ - if (app == 0) - { - if (verbose) - vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App", - "Peer App", "SegManager"); - else - vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App", - "Peer App"); - return; - } - - if (!app->local_connects) - return; - - /* *INDENT-OFF* */ - hash_foreach (client_key, value, app->local_connects, ({ - application_client_local_connect_key_parse (client_key, &app_wrk_index, - &session_index); - server_wrk = app_worker_get (app_wrk_index); - ls = application_get_local_session (server_wrk, session_index); - vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index, - ls->client_wrk_index); - })); - /* *INDENT-ON* */ -} - static void application_format_local_connects (application_t * app, int verbose) { @@ -2305,19 +1232,6 @@ application_format_local_connects (application_t * app, int verbose) /* *INDENT-ON* */ } -u8 * -format_application_worker (u8 * s, va_list * args) -{ - app_worker_t *app_wrk = va_arg (*args, app_worker_t *); - u32 indent = 1; - - s = format (s, "%U wrk-index %u app-index %u map-index %u " - "api-client-index %d\n", format_white_space, indent, - app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index, - app_wrk->api_client_index); - return s; -} - u8 * format_application (u8 * s, va_list * args) { @@ -2355,7 +1269,7 @@ format_application (u8 * s, va_list * args) /* *INDENT-OFF* */ pool_foreach (wrk_map, app->worker_maps, ({ app_wrk = app_worker_get (wrk_map->wrk_index); - s = format (s, "%U", format_application_worker, app_wrk); + s = format (s, "%U", format_app_worker, app_wrk); })); /* *INDENT-ON* */ diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 0f8dbe3909b..54f998996e9 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -174,11 +174,6 @@ typedef struct app_main_ */ application_t *app_pool; - /** - * Pool of workers associated to apps - */ - app_worker_t *workers; - /** * Hash table of apps by api client index */ @@ -220,37 +215,6 @@ typedef struct _vnet_app_worker_add_del_args #define APP_NS_INVALID_INDEX ((u32)~0) #define APP_INVALID_SEGMENT_MANAGER_INDEX ((u32) ~0) -app_worker_t *app_worker_alloc (application_t * app); -int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk); -app_worker_t *app_worker_get (u32 wrk_index); -app_worker_t *app_worker_get_if_valid (u32 wrk_index); -application_t *app_worker_get_app (u32 wrk_index); -int app_worker_own_session (app_worker_t * app_wrk, session_t * s); -void app_worker_free (app_worker_t * app_wrk); -int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, - u32 api_context); -segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, - session_t *); -segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); -segment_manager_t - * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *); -int app_worker_alloc_connects_segment_manager (app_worker_t * app); -int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle); -u32 app_worker_n_listeners (app_worker_t * app); -session_t *app_worker_first_listener (app_worker_t * app, - u8 fib_proto, u8 transport_proto); -u8 app_worker_application_is_builtin (app_worker_t * app_wrk); -int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt); -int app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type); -clib_error_t *vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a); - -int application_start_listen (application_t * app, - session_endpoint_cfg_t * tep, - session_handle_t * handle); -int application_stop_listen (u32 app_index, u32 app_or_wrk, - session_handle_t handle); - application_t *application_alloc (void); int application_alloc_and_init (app_init_args_t * args); void application_free (application_t * app); @@ -263,7 +227,12 @@ app_worker_t *application_get_worker (application_t * app, u32 wrk_index); app_worker_t *application_get_default_worker (application_t * app); app_worker_t *application_listener_select_worker (session_t * ls, u8 is_local); - +int application_start_listen (application_t * app, + session_endpoint_cfg_t * tep, + session_handle_t * handle); +int application_stop_listen (u32 app_index, u32 app_or_wrk, + session_handle_t handle); +int application_change_listener_owner (session_t * s, app_worker_t * app_wrk); int application_is_proxy (application_t * app); int application_is_builtin (application_t * app); int application_is_builtin_proxy (application_t * app); @@ -282,17 +251,53 @@ segment_manager_properties_t *application_get_segment_manager_properties (u32 segment_manager_properties_t * application_segment_manager_properties (application_t * app); +/* + * App worker + */ + +app_worker_t *app_worker_alloc (application_t * app); +int application_alloc_worker_and_init (application_t * app, + app_worker_t ** wrk); +app_worker_t *app_worker_get (u32 wrk_index); +app_worker_t *app_worker_get_if_valid (u32 wrk_index); +application_t *app_worker_get_app (u32 wrk_index); +int app_worker_own_session (app_worker_t * app_wrk, session_t * s); +void app_worker_free (app_worker_t * app_wrk); +int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, + u32 api_context); +int app_worker_start_listen (app_worker_t * app_wrk, session_t * ls); +int app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle); +segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, + session_t *); +segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); +segment_manager_t + * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *); +int app_worker_alloc_connects_segment_manager (app_worker_t * app); +int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle); +u32 app_worker_n_listeners (app_worker_t * app); +session_t *app_worker_first_listener (app_worker_t * app, + u8 fib_proto, u8 transport_proto); +u8 app_worker_application_is_builtin (app_worker_t * app_wrk); +int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt); +int app_worker_lock_and_send_event (app_worker_t * app, session_t * s, + u8 evt_type); +session_t *app_worker_proxy_listener (app_worker_t * app, u8 fib_proto, + u8 transport_proto); +u8 *format_app_worker (u8 * s, va_list * args); +u8 *format_app_worker_listener (u8 * s, va_list * args); +void app_worker_format_connects (app_worker_t * app_wrk, int verbose); +clib_error_t *vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a); + /* * Local session */ -local_session_t *application_local_session_alloc (app_worker_t * app); -void application_local_session_free (app_worker_t * app, - local_session_t * ls); -local_session_t *application_get_local_session (app_worker_t * app, - u32 session_index); -local_session_t *application_get_local_session_from_handle (session_handle_t - handle); +local_session_t *app_worker_local_session_alloc (app_worker_t * app); +void app_worker_local_session_free (app_worker_t * app, local_session_t * ls); +local_session_t *app_worker_get_local_session (app_worker_t * app, + u32 session_index); +local_session_t *app_worker_get_local_session_from_handle (session_handle_t + handle); local_session_t * application_get_local_listen_session_from_handle (session_handle_t lh); int application_start_local_listen (application_t * server, @@ -300,15 +305,16 @@ int application_start_local_listen (application_t * server, session_handle_t * handle); int application_stop_local_listen (u32 app_index, u32 app_or_wrk, session_handle_t lh); -int application_local_session_connect (app_worker_t * client, - app_worker_t * server, - local_session_t * ls, u32 opaque); -int application_local_session_connect_notify (local_session_t * ls); -int application_local_session_disconnect (u32 app_or_wrk, - local_session_t * ls); -int application_local_session_disconnect_w_index (u32 app_or_wrk, - u32 ls_index); -void app_worker_local_sessions_free (app_worker_t * app); +int app_worker_local_session_connect (app_worker_t * client, + app_worker_t * server, + local_session_t * ls, u32 opaque); +int app_worker_local_session_connect_notify (local_session_t * ls); +int app_worker_local_session_disconnect (u32 app_or_wrk, + local_session_t * ls); +int app_worker_local_session_disconnect_w_index (u32 app_or_wrk, + u32 ls_index); +void app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose); +void app_worker_format_local_connects (app_worker_t * app, int verbose); always_inline u32 local_session_id (local_session_t * ls) diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c index 0245f58d3b1..c00c4ded1c7 100644 --- a/src/vnet/session/application_interface.c +++ b/src/vnet/session/application_interface.c @@ -329,9 +329,9 @@ application_connect (vnet_connect_args_t * a) listener = (session_t *) ll; server_wrk = application_listener_select_worker (listener, 1 /* is_local */ ); - return application_local_session_connect (client_wrk, - server_wrk, ll, - a->api_context); + return app_worker_local_session_connect (client_wrk, + server_wrk, ll, + a->api_context); } } @@ -355,8 +355,8 @@ global_scope: server_wrk = application_listener_select_worker (listener, 0 /* is_local */ ); ll = (local_session_t *) listener; - return application_local_session_connect (client_wrk, server_wrk, ll, - a->api_context); + return app_worker_local_session_connect (client_wrk, server_wrk, ll, + a->api_context); } /* @@ -545,7 +545,7 @@ vnet_application_attach (vnet_app_attach_args_t * a) return clib_error_return_code (0, rv, 0, "app init: %d", rv); app = application_get (a->app_index); - if ((rv = app_worker_alloc_and_init (app, &app_wrk))) + if ((rv = application_alloc_worker_and_init (app, &app_wrk))) return clib_error_return_code (0, rv, 0, "app default wrk init: %d", rv); a->app_evt_q = app_wrk->event_queue; @@ -649,10 +649,10 @@ vnet_disconnect_session (vnet_disconnect_args_t * a) /* Disconnect reply came to worker 1 not main thread */ app_interface_check_thread_and_barrier (vnet_disconnect_session, a); - if (!(ls = application_get_local_session_from_handle (a->handle))) + if (!(ls = app_worker_get_local_session_from_handle (a->handle))) return 0; - return application_local_session_disconnect (a->app_index, ls); + return app_worker_local_session_disconnect (a->app_index, ls); } else { diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c new file mode 100644 index 00000000000..89ee6057172 --- /dev/null +++ b/src/vnet/session/application_worker.c @@ -0,0 +1,1116 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +/** + * Pool of workers associated to apps + */ +static app_worker_t *app_workers; + +static inline u64 +application_client_local_connect_key (local_session_t * ls) +{ + return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index); +} + +static inline void +application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index, + u32 * session_index) +{ + *app_wrk_index = key >> 32; + *session_index = key & 0xFFFFFFFF; +} + +local_session_t * +app_worker_local_session_alloc (app_worker_t * app_wrk) +{ + local_session_t *s; + pool_get (app_wrk->local_sessions, s); + clib_memset (s, 0, sizeof (*s)); + s->app_wrk_index = app_wrk->wrk_index; + s->session_index = s - app_wrk->local_sessions; + s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); + return s; +} + +void +app_worker_local_session_free (app_worker_t * app_wrk, local_session_t * s) +{ + pool_put (app_wrk->local_sessions, s); + if (CLIB_DEBUG) + clib_memset (s, 0xfc, sizeof (*s)); +} + +local_session_t * +app_worker_get_local_session (app_worker_t * app_wrk, u32 session_index) +{ + if (pool_is_free_index (app_wrk->local_sessions, session_index)) + return 0; + return pool_elt_at_index (app_wrk->local_sessions, session_index); +} + +local_session_t * +app_worker_get_local_session_from_handle (session_handle_t handle) +{ + app_worker_t *server_wrk; + u32 session_index, server_wrk_index; + local_session_parse_handle (handle, &server_wrk_index, &session_index); + server_wrk = app_worker_get_if_valid (server_wrk_index); + if (!server_wrk) + return 0; + return app_worker_get_local_session (server_wrk, session_index); +} + +void +app_worker_local_sessions_free (app_worker_t * app_wrk) +{ + u32 index, server_wrk_index, session_index; + u64 handle, *handles = 0; + app_worker_t *server_wrk; + segment_manager_t *sm; + local_session_t *ls; + int i; + + /* + * Local sessions + */ + if (app_wrk->local_sessions) + { + /* *INDENT-OFF* */ + pool_foreach (ls, app_wrk->local_sessions, ({ + app_worker_local_session_disconnect (app_wrk->wrk_index, ls); + })); + /* *INDENT-ON* */ + } + + /* + * Local connects + */ + vec_reset_length (handles); + /* *INDENT-OFF* */ + hash_foreach (handle, index, app_wrk->local_connects, ({ + vec_add1 (handles, handle); + })); + /* *INDENT-ON* */ + + for (i = 0; i < vec_len (handles); i++) + { + application_client_local_connect_key_parse (handles[i], + &server_wrk_index, + &session_index); + server_wrk = app_worker_get_if_valid (server_wrk_index); + if (server_wrk) + { + ls = app_worker_get_local_session (server_wrk, session_index); + app_worker_local_session_disconnect (app_wrk->wrk_index, ls); + } + } + + sm = segment_manager_get (app_wrk->local_segment_manager); + sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + segment_manager_del (sm); +} + +app_worker_t * +app_worker_alloc (application_t * app) +{ + app_worker_t *app_wrk; + pool_get (app_workers, app_wrk); + clib_memset (app_wrk, 0, sizeof (*app_wrk)); + app_wrk->wrk_index = app_wrk - app_workers; + app_wrk->app_index = app->app_index; + app_wrk->wrk_map_index = ~0; + app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; + app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; + app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; + APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index); + return app_wrk; +} + +app_worker_t * +app_worker_get (u32 wrk_index) +{ + return pool_elt_at_index (app_workers, wrk_index); +} + +app_worker_t * +app_worker_get_if_valid (u32 wrk_index) +{ + if (pool_is_free_index (app_workers, wrk_index)) + return 0; + return pool_elt_at_index (app_workers, wrk_index); +} + +void +app_worker_free (app_worker_t * app_wrk) +{ + application_t *app = application_get (app_wrk->app_index); + vnet_unbind_args_t _a, *a = &_a; + u64 handle, *handles = 0; + segment_manager_t *sm; + u32 sm_index; + int i; + + /* + * Listener cleanup + */ + + /* *INDENT-OFF* */ + hash_foreach (handle, sm_index, app_wrk->listeners_table, + ({ + vec_add1 (handles, handle); + sm = segment_manager_get (sm_index); + sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + })); + /* *INDENT-ON* */ + + for (i = 0; i < vec_len (handles); i++) + { + a->app_index = app->app_index; + a->wrk_map_index = app_wrk->wrk_map_index; + a->handle = handles[i]; + /* seg manager is removed when unbind completes */ + vnet_unbind (a); + } + + /* + * Connects segment manager cleanup + */ + + if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX) + { + sm = segment_manager_get (app_wrk->connects_seg_manager); + sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + segment_manager_init_del (sm); + } + + /* If first segment manager is used by a listener */ + if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX + && app_wrk->first_segment_manager != app_wrk->connects_seg_manager) + { + sm = segment_manager_get (app_wrk->first_segment_manager); + sm->first_is_protected = 0; + sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + /* .. and has no fifos, e.g. it might be used for redirected sessions, + * remove it */ + if (!segment_manager_has_fifos (sm)) + segment_manager_del (sm); + } + + /* + * Local sessions + */ + app_worker_local_sessions_free (app_wrk); + + pool_put (app_workers, app_wrk); + if (CLIB_DEBUG) + clib_memset (app_wrk, 0xfe, sizeof (*app_wrk)); +} + +application_t * +app_worker_get_app (u32 wrk_index) +{ + app_worker_t *app_wrk; + app_wrk = app_worker_get_if_valid (wrk_index); + if (!app_wrk) + return 0; + return application_get_if_valid (app_wrk->app_index); +} + +static segment_manager_t * +app_worker_alloc_segment_manager (app_worker_t * app_wrk) +{ + segment_manager_t *sm = 0; + + /* If the first segment manager is not in use, don't allocate a new one */ + if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX + && app_wrk->first_segment_manager_in_use == 0) + { + sm = segment_manager_get (app_wrk->first_segment_manager); + app_wrk->first_segment_manager_in_use = 1; + return sm; + } + + sm = segment_manager_new (); + sm->app_wrk_index = app_wrk->wrk_index; + + return sm; +} + +int +app_worker_start_listen (app_worker_t * app_wrk, session_t * ls) +{ + segment_manager_t *sm; + + /* Allocate segment manager. All sessions derived out of a listen session + * have fifos allocated by the same segment manager. */ + if (!(sm = app_worker_alloc_segment_manager (app_wrk))) + return -1; + + /* Add to app's listener table. Useful to find all child listeners + * when app goes down, although, just for unbinding this is not needed */ + hash_set (app_wrk->listeners_table, listen_session_get_handle (ls), + segment_manager_index (sm)); + + if (!ls->rx_fifo + && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL) + { + if (session_alloc_fifos (sm, ls)) + return -1; + } + return 0; +} + +int +app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) +{ + segment_manager_t *sm; + uword *sm_indexp; + + sm_indexp = hash_get (app_wrk->listeners_table, handle); + if (PREDICT_FALSE (!sm_indexp)) + { + clib_warning ("listener handle was removed %llu!", handle); + return -1; + } + + sm = segment_manager_get (*sm_indexp); + if (app_wrk->first_segment_manager == *sm_indexp) + { + /* Delete sessions but don't remove segment manager */ + app_wrk->first_segment_manager_in_use = 0; + segment_manager_del_sessions (sm); + } + else + { + segment_manager_init_del (sm); + } + hash_unset (app_wrk->listeners_table, handle); + + return 0; +} + +int +app_worker_own_session (app_worker_t * app_wrk, session_t * s) +{ + segment_manager_t *sm; + svm_fifo_t *rxf, *txf; + + if (s->session_state == SESSION_STATE_LISTENING) + return application_change_listener_owner (s, app_wrk); + + s->app_wrk_index = app_wrk->wrk_index; + + rxf = s->rx_fifo; + txf = s->tx_fifo; + + if (!rxf || !txf) + return 0; + + s->rx_fifo = 0; + s->tx_fifo = 0; + + sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); + if (session_alloc_fifos (sm, s)) + return -1; + + if (!svm_fifo_is_empty (rxf)) + { + clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems); + s->rx_fifo->head = rxf->head; + s->rx_fifo->tail = rxf->tail; + s->rx_fifo->cursize = rxf->cursize; + } + + if (!svm_fifo_is_empty (txf)) + { + clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems); + s->tx_fifo->head = txf->head; + s->tx_fifo->tail = txf->tail; + s->tx_fifo->cursize = txf->cursize; + } + + segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); + + return 0; +} + +int +app_worker_open_session (app_worker_t * app, session_endpoint_t * sep, + u32 api_context) +{ + int rv; + + /* Make sure we have a segment manager for connects */ + app_worker_alloc_connects_segment_manager (app); + + if ((rv = session_open (app->wrk_index, sep, api_context))) + return rv; + + return 0; +} + +int +app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk) +{ + segment_manager_t *sm; + + if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX) + { + sm = app_worker_alloc_segment_manager (app_wrk); + if (sm == 0) + return -1; + app_wrk->connects_seg_manager = segment_manager_index (sm); + } + return 0; +} + +segment_manager_t * +app_worker_get_connect_segment_manager (app_worker_t * app) +{ + ASSERT (app->connects_seg_manager != (u32) ~ 0); + return segment_manager_get (app->connects_seg_manager); +} + +segment_manager_t * +app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) +{ + if (app_wrk->connects_seg_manager == (u32) ~ 0) + app_worker_alloc_connects_segment_manager (app_wrk); + return segment_manager_get (app_wrk->connects_seg_manager); +} + +segment_manager_t * +app_worker_get_listen_segment_manager (app_worker_t * app, + session_t * listener) +{ + uword *smp; + smp = hash_get (app->listeners_table, listen_session_get_handle (listener)); + ASSERT (smp != 0); + return segment_manager_get (*smp); +} + +session_t * +app_worker_first_listener (app_worker_t * app, u8 fib_proto, + u8 transport_proto) +{ + session_t *listener; + u64 handle; + u32 sm_index; + u8 sst; + + sst = session_type_from_proto_and_ip (transport_proto, + fib_proto == FIB_PROTOCOL_IP4); + + /* *INDENT-OFF* */ + hash_foreach (handle, sm_index, app->listeners_table, ({ + listener = listen_session_get_from_handle (handle); + if (listener->session_type == sst + && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX) + return listener; + })); + /* *INDENT-ON* */ + + return 0; +} + +session_t * +app_worker_proxy_listener (app_worker_t * app, u8 fib_proto, + u8 transport_proto) +{ + session_t *listener; + u64 handle; + u32 sm_index; + u8 sst; + + sst = session_type_from_proto_and_ip (transport_proto, + fib_proto == FIB_PROTOCOL_IP4); + + /* *INDENT-OFF* */ + hash_foreach (handle, sm_index, app->listeners_table, ({ + listener = listen_session_get_from_handle (handle); + if (listener->session_type == sst + && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX) + return listener; + })); + /* *INDENT-ON* */ + + return 0; +} + +/** + * Send an API message to the external app, to map new segment + */ +int +app_worker_add_segment_notify (u32 app_wrk_index, u64 segment_handle) +{ + app_worker_t *app_wrk = app_worker_get (app_wrk_index); + application_t *app = application_get (app_wrk->app_index); + return app->cb_fns.add_segment_callback (app_wrk->api_client_index, + segment_handle); +} + +u8 +app_worker_application_is_builtin (app_worker_t * app_wrk) +{ + return app_wrk->app_is_builtin; +} + +static inline int +app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +{ + if (PREDICT_FALSE (svm_msg_q_is_full (mq))) + { + clib_warning ("evt q full"); + svm_msg_q_free_msg (mq, msg); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + if (lock) + { + svm_msg_q_add_and_unlock (mq, msg); + return 0; + } + + /* Even when not locking the ring, we must wait for queue mutex */ + if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) + { + clib_warning ("msg q add returned"); + return -1; + } + return 0; +} + +static inline int +app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock) +{ + session_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; + + if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY + && s->session_state != SESSION_STATE_LISTENING)) + { + /* Session is closed so app will never clean up. Flush rx fifo */ + if (s->session_state == SESSION_STATE_CLOSED) + svm_fifo_dequeue_drop_all (s->rx_fifo); + return 0; + } + + if (app_worker_application_is_builtin (app_wrk)) + { + application_t *app = application_get (app_wrk->app_index); + return app->cb_fns.builtin_app_rx_callback (s); + } + + if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) + return 0; + + mq = app_wrk->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = s->rx_fifo; + evt->event_type = FIFO_EVENT_APP_RX; + + (void) svm_fifo_set_event (s->rx_fifo); + + if (app_enqueue_evt (mq, &msg, lock)) + return -1; + return 0; +} + +static inline int +app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock) +{ + svm_msg_q_t *mq; + session_event_t *evt; + svm_msg_q_msg_t msg; + + if (app_worker_application_is_builtin (app_wrk)) + return 0; + + mq = app_wrk->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = FIFO_EVENT_APP_TX; + evt->fifo = s->tx_fifo; + + return app_enqueue_evt (mq, &msg, lock); +} + +/* *INDENT-OFF* */ +typedef int (app_send_evt_handler_fn) (app_worker_t *app, + session_t *s, + u8 lock); +static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { + app_send_io_evt_rx, + 0, + app_send_io_evt_tx, +}; +/* *INDENT-ON* */ + +/** + * Send event to application + * + * Logic from queue perspective is non-blocking. If there's + * not enough space to enqueue a message, we return. + */ +int +app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type) +{ + ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); + return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); +} + +/** + * Send event to application + * + * Logic from queue perspective is blocking. However, if queue is full, + * we return. + */ +int +app_worker_lock_and_send_event (app_worker_t * app, session_t * s, + u8 evt_type) +{ + return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); +} + +segment_manager_t * +app_worker_get_local_segment_manager (app_worker_t * app_worker) +{ + return segment_manager_get (app_worker->local_segment_manager); +} + +segment_manager_t * +app_worker_get_local_segment_manager_w_session (app_worker_t * app_wrk, + local_session_t * ls) +{ + session_t *listener; + if (application_local_session_listener_has_transport (ls)) + { + listener = listen_session_get (ls->listener_index); + return app_worker_get_listen_segment_manager (app_wrk, listener); + } + return segment_manager_get (app_wrk->local_segment_manager); +} + +int +app_worker_local_session_cleanup (app_worker_t * client_wrk, + app_worker_t * server_wrk, + local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + session_t *listener; + segment_manager_t *sm; + u64 client_key; + u8 has_transport; + + /* Retrieve listener transport type as it is the one that decides where + * the fifos are allocated */ + has_transport = application_local_session_listener_has_transport (ls); + if (!has_transport) + sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls); + else + { + listener = listen_session_get (ls->listener_index); + sm = app_worker_get_listen_segment_manager (server_wrk, listener); + } + + seg = segment_manager_get_segment (sm, ls->svm_segment_index); + if (client_wrk) + { + client_key = application_client_local_connect_key (ls); + hash_unset (client_wrk->local_connects, client_key); + } + + if (!has_transport) + { + application_t *server = application_get (server_wrk->app_index); + u64 segment_handle = segment_manager_segment_handle (sm, seg); + server->cb_fns.del_segment_callback (server_wrk->api_client_index, + segment_handle); + if (client_wrk) + { + application_t *client = application_get (client_wrk->app_index); + client->cb_fns.del_segment_callback (client_wrk->api_client_index, + segment_handle); + } + segment_manager_del_segment (sm, seg); + } + + app_worker_local_session_free (server_wrk, ls); + + return 0; +} + +static void +application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) +{ + int fd; + + /* + * segment manager initializes only the producer eventds, since vpp is + * typically the producer. But for local sessions, we also pass to the + * apps the mqs they listen on for events from peer apps, so they are also + * consumer fds. + */ + fd = svm_msg_q_get_producer_eventfd (sq); + svm_msg_q_set_consumer_eventfd (sq, fd); + fd = svm_msg_q_get_producer_eventfd (cq); + svm_msg_q_set_consumer_eventfd (cq, fd); +} + +int +app_worker_local_session_connect (app_worker_t * client_wrk, + app_worker_t * server_wrk, + local_session_t * ll, u32 opaque) +{ + u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; + u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index; + segment_manager_properties_t *props, *cprops; + int rv, has_transport, seg_index; + svm_fifo_segment_private_t *seg; + application_t *server, *client; + segment_manager_t *sm; + local_session_t *ls; + svm_msg_q_t *sq, *cq; + u64 segment_handle; + + ls = app_worker_local_session_alloc (server_wrk); + server = application_get (server_wrk->app_index); + client = application_get (client_wrk->app_index); + + props = application_segment_manager_properties (server); + cprops = application_segment_manager_properties (client); + evt_q_elts = props->evt_q_size + cprops->evt_q_size; + evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); + round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); + round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); + seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; + + has_transport = session_has_transport ((session_t *) ll); + if (!has_transport) + { + /* Local sessions don't have backing transport */ + ls->port = ll->port; + sm = app_worker_get_local_segment_manager (server_wrk); + } + else + { + session_t *sl = (session_t *) ll; + transport_connection_t *tc; + tc = listen_session_get_transport (sl); + ls->port = tc->lcl_port; + sm = app_worker_get_listen_segment_manager (server_wrk, sl); + } + + seg_index = segment_manager_add_segment (sm, seg_size); + if (seg_index < 0) + { + clib_warning ("failed to add new cut-through segment"); + return seg_index; + } + seg = segment_manager_get_segment_w_lock (sm, seg_index); + sq = segment_manager_alloc_queue (seg, props); + cq = segment_manager_alloc_queue (seg, cprops); + + if (props->use_mq_eventfd) + application_local_session_fix_eventds (sq, cq); + + ls->server_evt_q = pointer_to_uword (sq); + ls->client_evt_q = pointer_to_uword (cq); + rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, + props->tx_fifo_size, + &ls->rx_fifo, &ls->tx_fifo); + if (rv) + { + clib_warning ("failed to add fifos in cut-through segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + sm_index = segment_manager_index (sm); + ls->rx_fifo->ct_session_index = ls->session_index; + ls->tx_fifo->ct_session_index = ls->session_index; + ls->rx_fifo->segment_manager = sm_index; + ls->tx_fifo->segment_manager = sm_index; + ls->rx_fifo->segment_index = seg_index; + ls->tx_fifo->segment_index = seg_index; + ls->svm_segment_index = seg_index; + ls->listener_index = ll->session_index; + ls->client_wrk_index = client_wrk->wrk_index; + ls->client_opaque = opaque; + ls->listener_session_type = ll->session_type; + ls->session_state = SESSION_STATE_READY; + + segment_handle = segment_manager_segment_handle (sm, seg); + if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index, + segment_handle))) + { + clib_warning ("failed to notify server of new segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + segment_manager_segment_reader_unlock (sm); + if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls))) + { + clib_warning ("failed to send accept cut-through notify to server"); + goto failed; + } + if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN) + app_worker_local_session_connect_notify (ls); + + return 0; + +failed: + if (!has_transport) + segment_manager_del_segment (sm, seg); + return rv; +} + +int +app_worker_local_session_connect_notify (local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + app_worker_t *client_wrk, *server_wrk; + segment_manager_t *sm; + application_t *client; + int rv, is_fail = 0; + u64 segment_handle; + u64 client_key; + + client_wrk = app_worker_get (ls->client_wrk_index); + server_wrk = app_worker_get (ls->app_wrk_index); + client = application_get (client_wrk->app_index); + + sm = app_worker_get_local_segment_manager_w_session (server_wrk, ls); + seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index); + segment_handle = segment_manager_segment_handle (sm, seg); + if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index, + segment_handle))) + { + clib_warning ("failed to notify client %u of new segment", + ls->client_wrk_index); + segment_manager_segment_reader_unlock (sm); + app_worker_local_session_disconnect (ls->client_wrk_index, ls); + is_fail = 1; + } + else + { + segment_manager_segment_reader_unlock (sm); + } + + client->cb_fns.session_connected_callback (client_wrk->wrk_index, + ls->client_opaque, + (session_t *) ls, is_fail); + + client_key = application_client_local_connect_key (ls); + hash_set (client_wrk->local_connects, client_key, client_key); + return 0; +} + +int +app_worker_local_session_disconnect (u32 app_index, local_session_t * ls) +{ + app_worker_t *client_wrk, *server_wrk; + u8 is_server = 0, is_client = 0; + application_t *app; + + app = application_get_if_valid (app_index); + if (!app) + return 0; + + client_wrk = app_worker_get_if_valid (ls->client_wrk_index); + server_wrk = app_worker_get (ls->app_wrk_index); + + if (server_wrk->app_index == app_index) + is_server = 1; + else if (client_wrk && client_wrk->app_index == app_index) + is_client = 1; + + if (!is_server && !is_client) + { + clib_warning ("app %u is neither client nor server for session 0x%lx", + app_index, application_local_session_handle (ls)); + return VNET_API_ERROR_INVALID_VALUE; + } + + if (ls->session_state == SESSION_STATE_CLOSED) + return app_worker_local_session_cleanup (client_wrk, server_wrk, ls); + + if (app_index == ls->client_wrk_index) + { + mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls); + } + else + { + if (!client_wrk) + { + return app_worker_local_session_cleanup (client_wrk, server_wrk, + ls); + } + else if (ls->session_state < SESSION_STATE_READY) + { + application_t *client = application_get (client_wrk->app_index); + client->cb_fns.session_connected_callback (client_wrk->wrk_index, + ls->client_opaque, + (session_t *) ls, + 1 /* is_fail */ ); + ls->session_state = SESSION_STATE_CLOSED; + return app_worker_local_session_cleanup (client_wrk, server_wrk, + ls); + } + else + { + mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls); + } + } + + ls->session_state = SESSION_STATE_CLOSED; + + return 0; +} + +int +app_worker_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index) +{ + app_worker_t *app_wrk; + local_session_t *ls; + app_wrk = app_worker_get (app_wrk_index); + ls = app_worker_get_local_session (app_wrk, ls_index); + return app_worker_local_session_disconnect (app_wrk_index, ls); +} + +u8 * +format_app_worker_listener (u8 * s, va_list * args) +{ + app_worker_t *app_wrk = va_arg (*args, app_worker_t *); + u64 handle = va_arg (*args, u64); + u32 sm_index = va_arg (*args, u32); + int verbose = va_arg (*args, int); + session_t *listener; + const u8 *app_name; + u8 *str; + + if (!app_wrk) + { + if (verbose) + s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App", + "Wrk", "API Client", "ListenerID", "SegManager"); + else + s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk"); + + return s; + } + + app_name = application_name_from_index (app_wrk->app_index); + listener = listen_session_get_from_handle (handle); + str = format (0, "%U", format_stream_session, listener, verbose); + + if (verbose) + { + char buf[32]; + sprintf (buf, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index); + s = format (s, "%-40s%-25s%=10s%-15u%-15u%-10u", str, app_name, + buf, app_wrk->api_client_index, handle, sm_index); + } + else + s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index); + + return s; +} + +u8 * +format_app_worker (u8 * s, va_list * args) +{ + app_worker_t *app_wrk = va_arg (*args, app_worker_t *); + u32 indent = 1; + + s = format (s, "%U wrk-index %u app-index %u map-index %u " + "api-client-index %d\n", format_white_space, indent, + app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index, + app_wrk->api_client_index); + return s; +} + +void +app_worker_format_connects (app_worker_t * app_wrk, int verbose) +{ + svm_fifo_segment_private_t *fifo_segment; + vlib_main_t *vm = vlib_get_main (); + segment_manager_t *sm; + const u8 *app_name; + u8 *s = 0; + + /* Header */ + if (!app_wrk) + { + if (verbose) + vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App", + "API Client", "SegManager"); + else + vlib_cli_output (vm, "%-40s%-20s", "Connection", "App"); + return; + } + + if (app_wrk->connects_seg_manager == (u32) ~ 0) + return; + + app_name = application_name_from_index (app_wrk->app_index); + + /* Across all fifo segments */ + sm = segment_manager_get (app_wrk->connects_seg_manager); + + /* *INDENT-OFF* */ + segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({ + svm_fifo_t *fifo; + u8 *str; + + fifo = svm_fifo_segment_get_fifo_list (fifo_segment); + while (fifo) + { + u32 session_index, thread_index; + session_t *session; + + session_index = fifo->master_session_index; + thread_index = fifo->master_thread_index; + + session = session_get (session_index, thread_index); + str = format (0, "%U", format_stream_session, session, verbose); + + if (verbose) + s = format (s, "%-40s%-20s%-15u%-10u", str, app_name, + app_wrk->api_client_index, app_wrk->connects_seg_manager); + else + s = format (s, "%-40s%-20s", str, app_name); + + vlib_cli_output (vm, "%v", s); + vec_reset_length (s); + vec_free (str); + + fifo = fifo->next; + } + vec_free (s); + })); + /* *INDENT-ON* */ +} + +void +app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + local_session_t *ls; + transport_proto_t tp; + u8 *conn = 0; + + /* Header */ + if (app_wrk == 0) + { + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp", + "ClientApp"); + return; + } + + if (!pool_elts (app_wrk->local_sessions) + && !pool_elts (app_wrk->local_connects)) + return; + + /* *INDENT-OFF* */ + pool_foreach (ls, app_wrk->local_sessions, ({ + tp = session_type_transport_proto(ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_wrk_index, + ls->client_wrk_index); + vec_reset_length (conn); + })); + /* *INDENT-ON* */ + + vec_free (conn); +} + +void +app_worker_format_local_connects (app_worker_t * app, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + u32 app_wrk_index, session_index; + app_worker_t *server_wrk; + local_session_t *ls; + u64 client_key; + u64 value; + + /* Header */ + if (app == 0) + { + if (verbose) + vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App", + "Peer App", "SegManager"); + else + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App", + "Peer App"); + return; + } + + if (!app->local_connects) + return; + + /* *INDENT-OFF* */ + hash_foreach (client_key, value, app->local_connects, ({ + application_client_local_connect_key_parse (client_key, &app_wrk_index, + &session_index); + server_wrk = app_worker_get (app_wrk_index); + ls = app_worker_get_local_session (server_wrk, session_index); + vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index, + ls->client_wrk_index); + })); + /* *INDENT-ON* */ +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index a4438c7aeea..638f078e20d 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -399,7 +399,7 @@ segment_manager_del_sessions (segment_manager_t * sm) if (fifo->ct_session_index != SVM_FIFO_INVALID_SESSION_INDEX) { svm_fifo_t *next = fifo->next; - application_local_session_disconnect_w_index (sm->app_wrk_index, + app_worker_local_session_disconnect_w_index (sm->app_wrk_index, fifo->ct_session_index); fifo = next; continue; diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 6a0d77eed12..bd809367ce5 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -1144,14 +1144,14 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp) if (session_handle_is_local (mp->handle)) { - ls = application_get_local_session_from_handle (mp->handle); + ls = app_worker_get_local_session_from_handle (mp->handle); if (!ls || ls->app_wrk_index != mp->context) { clib_warning ("server %u doesn't own local handle %llu", mp->context, mp->handle); return; } - if (application_local_session_connect_notify (ls)) + if (app_worker_local_session_connect_notify (ls)) return; ls->session_state = SESSION_STATE_READY; } diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index fe7f652a98a..fa33ad37f1e 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -45,7 +45,7 @@ session_mq_accepted_reply_handler (void *data) if (session_handle_is_local (mp->handle)) { - ls = application_get_local_session_from_handle (mp->handle); + ls = app_worker_get_local_session_from_handle (mp->handle); if (!ls) { clib_warning ("unknown local handle 0x%lx", mp->handle); @@ -58,7 +58,7 @@ session_mq_accepted_reply_handler (void *data) mp->context, mp->handle); return; } - if (application_local_session_connect_notify (ls)) + if (app_worker_local_session_connect_notify (ls)) return; ls->session_state = SESSION_STATE_READY; } -- cgit 1.2.3-korg