From 623eb5676689f6874716857a1085d0efd4974c4c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Sun, 3 Feb 2019 19:28:34 -0800 Subject: session: cleanup part 2 Move app worker logic to app_worker.c Change-Id: Ic5e5735b2884f006c064d023f491aa6888114810 Signed-off-by: Florin Coras --- src/vnet/session/application.c | 1200 ++-------------------------------------- 1 file changed, 57 insertions(+), 1143 deletions(-) (limited to 'src/vnet/session/application.c') diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 268816561a7..3380d0bca4a 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -497,104 +497,8 @@ application_listener_select_worker (session_t * ls, u8 is_local) return application_get_worker (app, wrk_index); } -app_worker_t * -app_worker_alloc (application_t * app) -{ - app_worker_t *app_wrk; - pool_get (app_main.workers, app_wrk); - clib_memset (app_wrk, 0, sizeof (*app_wrk)); - app_wrk->wrk_index = app_wrk - app_main.workers; - app_wrk->app_index = app->app_index; - app_wrk->wrk_map_index = ~0; - app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - app_wrk->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - app_wrk->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; - APP_DBG ("New app %v worker %u", app_get_name (app), app_wrk->wrk_index); - return app_wrk; -} - -app_worker_t * -app_worker_get (u32 wrk_index) -{ - return pool_elt_at_index (app_main.workers, wrk_index); -} - -app_worker_t * -app_worker_get_if_valid (u32 wrk_index) -{ - if (pool_is_free_index (app_main.workers, wrk_index)) - return 0; - return pool_elt_at_index (app_main.workers, wrk_index); -} - -void -app_worker_free (app_worker_t * app_wrk) -{ - application_t *app = application_get (app_wrk->app_index); - vnet_unbind_args_t _a, *a = &_a; - u64 handle, *handles = 0; - segment_manager_t *sm; - u32 sm_index; - int i; - - /* - * Listener cleanup - */ - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app_wrk->listeners_table, - ({ - vec_add1 (handles, handle); - sm = segment_manager_get (sm_index); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - })); - /* *INDENT-ON* */ - - for (i = 0; i < vec_len (handles); i++) - { - a->app_index = app->app_index; - a->wrk_map_index = app_wrk->wrk_map_index; - a->handle = handles[i]; - /* seg manager is removed when unbind completes */ - vnet_unbind (a); - } - - /* - * Connects segment manager cleanup - */ - - if (app_wrk->connects_seg_manager != APP_INVALID_SEGMENT_MANAGER_INDEX) - { - sm = segment_manager_get (app_wrk->connects_seg_manager); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - segment_manager_init_del (sm); - } - - /* If first segment manager is used by a listener */ - if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX - && app_wrk->first_segment_manager != app_wrk->connects_seg_manager) - { - sm = segment_manager_get (app_wrk->first_segment_manager); - sm->first_is_protected = 0; - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - /* .. and has no fifos, e.g. it might be used for redirected sessions, - * remove it */ - if (!segment_manager_has_fifos (sm)) - segment_manager_del (sm); - } - - /* - * Local sessions - */ - app_worker_local_sessions_free (app_wrk); - - pool_put (app_main.workers, app_wrk); - if (CLIB_DEBUG) - clib_memset (app_wrk, 0xfe, sizeof (*app_wrk)); -} - int -app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk) +application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk) { app_worker_map_t *wrk_map; app_worker_t *app_wrk; @@ -641,160 +545,6 @@ app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk) return 0; } -application_t * -app_worker_get_app (u32 wrk_index) -{ - app_worker_t *app_wrk; - app_wrk = app_worker_get_if_valid (wrk_index); - if (!app_wrk) - return 0; - return application_get_if_valid (app_wrk->app_index); -} - -static segment_manager_t * -app_worker_alloc_segment_manager (app_worker_t * app_wrk) -{ - segment_manager_t *sm = 0; - - /* If the first segment manager is not in use, don't allocate a new one */ - if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX - && app_wrk->first_segment_manager_in_use == 0) - { - sm = segment_manager_get (app_wrk->first_segment_manager); - app_wrk->first_segment_manager_in_use = 1; - return sm; - } - - sm = segment_manager_new (); - sm->app_wrk_index = app_wrk->wrk_index; - - return sm; -} - -int -app_worker_start_listen (app_worker_t * app_wrk, session_t * ls) -{ - segment_manager_t *sm; - - /* Allocate segment manager. All sessions derived out of a listen session - * have fifos allocated by the same segment manager. */ - if (!(sm = app_worker_alloc_segment_manager (app_wrk))) - return -1; - - /* Add to app's listener table. Useful to find all child listeners - * when app goes down, although, just for unbinding this is not needed */ - hash_set (app_wrk->listeners_table, listen_session_get_handle (ls), - segment_manager_index (sm)); - - if (!ls->rx_fifo - && session_transport_service_type (ls) == TRANSPORT_SERVICE_CL) - { - if (session_alloc_fifos (sm, ls)) - return -1; - } - return 0; -} - -int -app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) -{ - segment_manager_t *sm; - uword *sm_indexp; - - sm_indexp = hash_get (app_wrk->listeners_table, handle); - if (PREDICT_FALSE (!sm_indexp)) - { - clib_warning ("listener handle was removed %llu!", handle); - return -1; - } - - sm = segment_manager_get (*sm_indexp); - if (app_wrk->first_segment_manager == *sm_indexp) - { - /* Delete sessions but don't remove segment manager */ - app_wrk->first_segment_manager_in_use = 0; - segment_manager_del_sessions (sm); - } - else - { - segment_manager_init_del (sm); - } - hash_unset (app_wrk->listeners_table, handle); - - return 0; -} - -int -app_worker_own_session (app_worker_t * app_wrk, session_t * s) -{ - segment_manager_t *sm; - svm_fifo_t *rxf, *txf; - - if (s->session_state == SESSION_STATE_LISTENING) - { - app_worker_t *old_wrk = app_worker_get (s->app_wrk_index); - u64 lsh = listen_session_get_handle (s); - app_listener_t *app_listener; - application_t *app; - - if (!old_wrk) - return -1; - - hash_unset (old_wrk->listeners_table, lsh); - if (!(sm = app_worker_alloc_segment_manager (app_wrk))) - return -1; - - hash_set (app_wrk->listeners_table, lsh, segment_manager_index (sm)); - s->app_wrk_index = app_wrk->wrk_index; - - app = application_get (old_wrk->app_index); - if (!app) - return -1; - - app_listener = app_listener_get (app, s->listener_db_index); - app_listener->workers = clib_bitmap_set (app_listener->workers, - app_wrk->wrk_map_index, 1); - app_listener->workers = clib_bitmap_set (app_listener->workers, - old_wrk->wrk_map_index, 0); - return 0; - } - - s->app_wrk_index = app_wrk->wrk_index; - - rxf = s->rx_fifo; - txf = s->tx_fifo; - - if (!rxf || !txf) - return 0; - - s->rx_fifo = 0; - s->tx_fifo = 0; - - sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); - if (session_alloc_fifos (sm, s)) - return -1; - - if (!svm_fifo_is_empty (rxf)) - { - clib_memcpy_fast (s->rx_fifo->data, rxf->data, rxf->nitems); - s->rx_fifo->head = rxf->head; - s->rx_fifo->tail = rxf->tail; - s->rx_fifo->cursize = rxf->cursize; - } - - if (!svm_fifo_is_empty (txf)) - { - clib_memcpy_fast (s->tx_fifo->data, txf->data, txf->nitems); - s->tx_fifo->head = txf->head; - s->tx_fifo->tail = txf->tail; - s->tx_fifo->cursize = txf->cursize; - } - - segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); - - return 0; -} - /** * Start listening local transport endpoint for requested transport. * @@ -877,6 +627,39 @@ err: return -1; } +int +application_change_listener_owner (session_t * s, app_worker_t * app_wrk) +{ + app_worker_t *old_wrk = app_worker_get (s->app_wrk_index); + app_listener_t *app_listener; + application_t *app; + + if (!old_wrk) + return -1; + + hash_unset (old_wrk->listeners_table, listen_session_get_handle (s)); + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL + && s->rx_fifo) + segment_manager_dealloc_fifos (s->rx_fifo->segment_index, s->rx_fifo, + s->tx_fifo); + + if (app_worker_start_listen (app_wrk, s)) + return -1; + + s->app_wrk_index = app_wrk->wrk_index; + + app = application_get (old_wrk->app_index); + if (!app) + return -1; + + app_listener = app_listener_get (app, s->listener_db_index); + app_listener->workers = clib_bitmap_set (app_listener->workers, + app_wrk->wrk_map_index, 1); + app_listener->workers = clib_bitmap_set (app_listener->workers, + old_wrk->wrk_map_index, 0); + return 0; +} + /** * Stop listening on session associated to handle * @@ -923,61 +706,6 @@ application_stop_listen (u32 app_index, u32 app_wrk_index, return 0; } -int -app_worker_open_session (app_worker_t * app, session_endpoint_t * sep, - u32 api_context) -{ - int rv; - - /* Make sure we have a segment manager for connects */ - app_worker_alloc_connects_segment_manager (app); - - if ((rv = session_open (app->wrk_index, sep, api_context))) - return rv; - - return 0; -} - -int -app_worker_alloc_connects_segment_manager (app_worker_t * app_wrk) -{ - segment_manager_t *sm; - - if (app_wrk->connects_seg_manager == APP_INVALID_SEGMENT_MANAGER_INDEX) - { - sm = app_worker_alloc_segment_manager (app_wrk); - if (sm == 0) - return -1; - app_wrk->connects_seg_manager = segment_manager_index (sm); - } - return 0; -} - -segment_manager_t * -app_worker_get_connect_segment_manager (app_worker_t * app) -{ - ASSERT (app->connects_seg_manager != (u32) ~ 0); - return segment_manager_get (app->connects_seg_manager); -} - -segment_manager_t * -app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) -{ - if (app_wrk->connects_seg_manager == (u32) ~ 0) - app_worker_alloc_connects_segment_manager (app_wrk); - return segment_manager_get (app_wrk->connects_seg_manager); -} - -segment_manager_t * -app_worker_get_listen_segment_manager (app_worker_t * app, - session_t * listener) -{ - uword *smp; - smp = hash_get (app->listeners_table, listen_session_get_handle (listener)); - ASSERT (smp != 0); - return segment_manager_get (*smp); -} - clib_error_t * vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) { @@ -995,7 +723,7 @@ vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) if (a->is_add) { - if ((rv = app_worker_alloc_and_init (app, &app_wrk))) + if ((rv = application_alloc_worker_and_init (app, &app_wrk))) return clib_error_return_code (0, rv, 0, "app wrk init: %d", rv); /* Map worker api index to the app */ @@ -1030,25 +758,6 @@ vnet_app_worker_add_del (vnet_app_worker_add_del_args_t * a) return 0; } -segment_manager_t * -application_get_local_segment_manager (app_worker_t * app) -{ - return segment_manager_get (app->local_segment_manager); -} - -segment_manager_t * -application_get_local_segment_manager_w_session (app_worker_t * app, - local_session_t * ls) -{ - session_t *listener; - if (application_local_session_listener_has_transport (ls)) - { - listener = listen_session_get (ls->listener_index); - return app_worker_get_listen_segment_manager (app, listener); - } - return segment_manager_get (app->local_segment_manager); -} - int application_is_proxy (application_t * app) { @@ -1085,78 +794,6 @@ application_use_mq_for_ctrl (application_t * app) return app->flags & APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS; } -/** - * Send an API message to the external app, to map new segment - */ -int -app_worker_add_segment_notify (u32 app_wrk_index, u64 segment_handle) -{ - app_worker_t *app_wrk = app_worker_get (app_wrk_index); - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.add_segment_callback (app_wrk->api_client_index, - segment_handle); -} - -u32 -application_n_listeners (app_worker_t * app) -{ - return hash_elts (app->listeners_table); -} - -session_t * -app_worker_first_listener (app_worker_t * app, u8 fib_proto, - u8 transport_proto) -{ - session_t *listener; - u64 handle; - u32 sm_index; - u8 sst; - - sst = session_type_from_proto_and_ip (transport_proto, - fib_proto == FIB_PROTOCOL_IP4); - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app->listeners_table, ({ - listener = listen_session_get_from_handle (handle); - if (listener->session_type == sst - && listener->enqueue_epoch != SESSION_PROXY_LISTENER_INDEX) - return listener; - })); - /* *INDENT-ON* */ - - return 0; -} - -u8 -app_worker_application_is_builtin (app_worker_t * app_wrk) -{ - return app_wrk->app_is_builtin; -} - -session_t * -application_proxy_listener (app_worker_t * app, u8 fib_proto, - u8 transport_proto) -{ - session_t *listener; - u64 handle; - u32 sm_index; - u8 sst; - - sst = session_type_from_proto_and_ip (transport_proto, - fib_proto == FIB_PROTOCOL_IP4); - - /* *INDENT-OFF* */ - hash_foreach (handle, sm_index, app->listeners_table, ({ - listener = listen_session_get_from_handle (handle); - if (listener->session_type == sst - && listener->enqueue_epoch == SESSION_PROXY_LISTENER_INDEX) - return listener; - })); - /* *INDENT-ON* */ - - return 0; -} - static clib_error_t * application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, u8 transport_proto, u8 is_start) @@ -1188,7 +825,7 @@ application_start_stop_proxy_fib_proto (application_t * app, u8 fib_proto, } else { - s = application_proxy_listener (app_wrk, fib_proto, transport_proto); + s = app_worker_proxy_listener (app_wrk, fib_proto, transport_proto); ASSERT (s); } @@ -1302,216 +939,29 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } -static inline int -app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +local_session_t * +application_local_listen_session_alloc (application_t * app) { - if (PREDICT_FALSE (svm_msg_q_is_full (mq))) - { - clib_warning ("evt q full"); - svm_msg_q_free_msg (mq, msg); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - if (lock) - { - svm_msg_q_add_and_unlock (mq, msg); - return 0; - } + local_session_t *ll; + pool_get (app->local_listen_sessions, ll); + clib_memset (ll, 0, sizeof (*ll)); + return ll; +} - /* Even when not locking the ring, we must wait for queue mutex */ - if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) - { - clib_warning ("msg q add returned"); - return -1; - } - return 0; +u32 +application_local_listener_index (application_t * app, local_session_t * ll) +{ + return (ll - app->local_listen_sessions); } -static inline int -app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock) +void +application_local_listen_session_free (application_t * app, + local_session_t * ll) { - session_event_t *evt; - svm_msg_q_msg_t msg; - svm_msg_q_t *mq; - - if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY - && s->session_state != SESSION_STATE_LISTENING)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - if (s->session_state == SESSION_STATE_CLOSED) - svm_fifo_dequeue_drop_all (s->rx_fifo); - return 0; - } - - if (app_worker_application_is_builtin (app_wrk)) - { - application_t *app = application_get (app_wrk->app_index); - return app->cb_fns.builtin_app_rx_callback (s); - } - - if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) - return 0; - - mq = app_wrk->event_queue; - if (lock) - svm_msg_q_lock (mq); - - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) - { - clib_warning ("evt q rings full"); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); - ASSERT (!svm_msg_q_msg_is_invalid (&msg)); - - evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); - evt->fifo = s->rx_fifo; - evt->event_type = FIFO_EVENT_APP_RX; - - (void) svm_fifo_set_event (s->rx_fifo); - - if (app_enqueue_evt (mq, &msg, lock)) - return -1; - return 0; -} - -static inline int -app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock) -{ - svm_msg_q_t *mq; - session_event_t *evt; - svm_msg_q_msg_t msg; - - if (app_worker_application_is_builtin (app_wrk)) - return 0; - - mq = app_wrk->event_queue; - if (lock) - svm_msg_q_lock (mq); - - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) - { - clib_warning ("evt q rings full"); - if (lock) - svm_msg_q_unlock (mq); - return -1; - } - - msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); - ASSERT (!svm_msg_q_msg_is_invalid (&msg)); - - evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); - evt->event_type = FIFO_EVENT_APP_TX; - evt->fifo = s->tx_fifo; - - return app_enqueue_evt (mq, &msg, lock); -} - -/* *INDENT-OFF* */ -typedef int (app_send_evt_handler_fn) (app_worker_t *app, - session_t *s, - u8 lock); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { - app_send_io_evt_rx, - 0, - app_send_io_evt_tx, -}; -/* *INDENT-ON* */ - -/** - * Send event to application - * - * Logic from queue perspective is non-blocking. If there's - * not enough space to enqueue a message, we return. - */ -int -app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type) -{ - ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); - return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); -} - -/** - * Send event to application - * - * Logic from queue perspective is blocking. However, if queue is full, - * we return. - */ -int -app_worker_lock_and_send_event (app_worker_t * app, session_t * s, - u8 evt_type) -{ - return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); -} - -local_session_t * -application_local_session_alloc (app_worker_t * app_wrk) -{ - local_session_t *s; - pool_get (app_wrk->local_sessions, s); - clib_memset (s, 0, sizeof (*s)); - s->app_wrk_index = app_wrk->wrk_index; - s->session_index = s - app_wrk->local_sessions; - s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); - return s; -} - -void -application_local_session_free (app_worker_t * app, local_session_t * s) -{ - pool_put (app->local_sessions, s); - if (CLIB_DEBUG) - clib_memset (s, 0xfc, sizeof (*s)); -} - -local_session_t * -application_get_local_session (app_worker_t * app_wrk, u32 session_index) -{ - if (pool_is_free_index (app_wrk->local_sessions, session_index)) - return 0; - return pool_elt_at_index (app_wrk->local_sessions, session_index); -} - -local_session_t * -application_get_local_session_from_handle (session_handle_t handle) -{ - app_worker_t *server_wrk; - u32 session_index, server_wrk_index; - local_session_parse_handle (handle, &server_wrk_index, &session_index); - server_wrk = app_worker_get_if_valid (server_wrk_index); - if (!server_wrk) - return 0; - return application_get_local_session (server_wrk, session_index); -} - -local_session_t * -application_local_listen_session_alloc (application_t * app) -{ - local_session_t *ll; - pool_get (app->local_listen_sessions, ll); - clib_memset (ll, 0, sizeof (*ll)); - return ll; -} - -u32 -application_local_listener_index (application_t * app, local_session_t * ll) -{ - return (ll - app->local_listen_sessions); -} - -void -application_local_listen_session_free (application_t * app, - local_session_t * ll) -{ - pool_put (app->local_listen_sessions, ll); - if (CLIB_DEBUG) - clib_memset (ll, 0xfb, sizeof (*ll)); -} + pool_put (app->local_listen_sessions, ll); + if (CLIB_DEBUG) + clib_memset (ll, 0xfb, sizeof (*ll)); +} int application_start_local_listen (application_t * app, @@ -1591,7 +1041,7 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, table_index = application_local_session_table (server); /* We have both local and global table binds. Figure from global what - * the sep we should be cleaning up is. + * sep we should be cleaning up. */ if (!session_handle_is_local (lh)) { @@ -1632,7 +1082,7 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, /* *INDENT-OFF* */ pool_foreach (ls, server_wrk->local_sessions, ({ if (ls->listener_index == ll->session_index) - application_local_session_disconnect (server_wrk->app_index, ls); + app_worker_local_session_disconnect (server_wrk->app_index, ls); })); /* *INDENT-ON* */ @@ -1648,356 +1098,6 @@ application_stop_local_listen (u32 app_index, u32 wrk_map_index, return 0; } -static void -application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) -{ - int fd; - - /* - * segment manager initializes only the producer eventds, since vpp is - * typically the producer. But for local sessions, we also pass to the - * apps the mqs they listen on for events from peer apps, so they are also - * consumer fds. - */ - fd = svm_msg_q_get_producer_eventfd (sq); - svm_msg_q_set_consumer_eventfd (sq, fd); - fd = svm_msg_q_get_producer_eventfd (cq); - svm_msg_q_set_consumer_eventfd (cq, fd); -} - -int -application_local_session_connect (app_worker_t * client_wrk, - app_worker_t * server_wrk, - local_session_t * ll, u32 opaque) -{ - u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; - u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index; - segment_manager_properties_t *props, *cprops; - int rv, has_transport, seg_index; - svm_fifo_segment_private_t *seg; - application_t *server, *client; - segment_manager_t *sm; - local_session_t *ls; - svm_msg_q_t *sq, *cq; - u64 segment_handle; - - ls = application_local_session_alloc (server_wrk); - server = application_get (server_wrk->app_index); - client = application_get (client_wrk->app_index); - - props = application_segment_manager_properties (server); - cprops = application_segment_manager_properties (client); - evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); - round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); - round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); - seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; - - has_transport = session_has_transport ((session_t *) ll); - if (!has_transport) - { - /* Local sessions don't have backing transport */ - ls->port = ll->port; - sm = application_get_local_segment_manager (server_wrk); - } - else - { - session_t *sl = (session_t *) ll; - transport_connection_t *tc; - tc = listen_session_get_transport (sl); - ls->port = tc->lcl_port; - sm = app_worker_get_listen_segment_manager (server_wrk, sl); - } - - seg_index = segment_manager_add_segment (sm, seg_size); - if (seg_index < 0) - { - clib_warning ("failed to add new cut-through segment"); - return seg_index; - } - seg = segment_manager_get_segment_w_lock (sm, seg_index); - sq = segment_manager_alloc_queue (seg, props); - cq = segment_manager_alloc_queue (seg, cprops); - - if (props->use_mq_eventfd) - application_local_session_fix_eventds (sq, cq); - - ls->server_evt_q = pointer_to_uword (sq); - ls->client_evt_q = pointer_to_uword (cq); - rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, - props->tx_fifo_size, - &ls->rx_fifo, &ls->tx_fifo); - if (rv) - { - clib_warning ("failed to add fifos in cut-through segment"); - segment_manager_segment_reader_unlock (sm); - goto failed; - } - sm_index = segment_manager_index (sm); - ls->rx_fifo->ct_session_index = ls->session_index; - ls->tx_fifo->ct_session_index = ls->session_index; - ls->rx_fifo->segment_manager = sm_index; - ls->tx_fifo->segment_manager = sm_index; - ls->rx_fifo->segment_index = seg_index; - ls->tx_fifo->segment_index = seg_index; - ls->svm_segment_index = seg_index; - ls->listener_index = ll->session_index; - ls->client_wrk_index = client_wrk->wrk_index; - ls->client_opaque = opaque; - ls->listener_session_type = ll->session_type; - ls->session_state = SESSION_STATE_READY; - - segment_handle = segment_manager_segment_handle (sm, seg); - if ((rv = server->cb_fns.add_segment_callback (server_wrk->api_client_index, - segment_handle))) - { - clib_warning ("failed to notify server of new segment"); - segment_manager_segment_reader_unlock (sm); - goto failed; - } - segment_manager_segment_reader_unlock (sm); - if ((rv = server->cb_fns.session_accept_callback ((session_t *) ls))) - { - clib_warning ("failed to send accept cut-through notify to server"); - goto failed; - } - if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN) - application_local_session_connect_notify (ls); - - return 0; - -failed: - if (!has_transport) - segment_manager_del_segment (sm, seg); - return rv; -} - -static u64 -application_client_local_connect_key (local_session_t * ls) -{ - return (((u64) ls->app_wrk_index) << 32 | (u64) ls->session_index); -} - -static void -application_client_local_connect_key_parse (u64 key, u32 * app_wrk_index, - u32 * session_index) -{ - *app_wrk_index = key >> 32; - *session_index = key & 0xFFFFFFFF; -} - -int -application_local_session_connect_notify (local_session_t * ls) -{ - svm_fifo_segment_private_t *seg; - app_worker_t *client_wrk, *server_wrk; - segment_manager_t *sm; - application_t *client; - int rv, is_fail = 0; - u64 segment_handle; - u64 client_key; - - client_wrk = app_worker_get (ls->client_wrk_index); - server_wrk = app_worker_get (ls->app_wrk_index); - client = application_get (client_wrk->app_index); - - sm = application_get_local_segment_manager_w_session (server_wrk, ls); - seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index); - segment_handle = segment_manager_segment_handle (sm, seg); - if ((rv = client->cb_fns.add_segment_callback (client_wrk->api_client_index, - segment_handle))) - { - clib_warning ("failed to notify client %u of new segment", - ls->client_wrk_index); - segment_manager_segment_reader_unlock (sm); - application_local_session_disconnect (ls->client_wrk_index, ls); - is_fail = 1; - } - else - { - segment_manager_segment_reader_unlock (sm); - } - - client->cb_fns.session_connected_callback (client_wrk->wrk_index, - ls->client_opaque, - (session_t *) ls, is_fail); - - client_key = application_client_local_connect_key (ls); - hash_set (client_wrk->local_connects, client_key, client_key); - return 0; -} - -int -application_local_session_cleanup (app_worker_t * client_wrk, - app_worker_t * server_wrk, - local_session_t * ls) -{ - svm_fifo_segment_private_t *seg; - session_t *listener; - segment_manager_t *sm; - u64 client_key; - u8 has_transport; - - /* Retrieve listener transport type as it is the one that decides where - * the fifos are allocated */ - has_transport = application_local_session_listener_has_transport (ls); - if (!has_transport) - sm = application_get_local_segment_manager_w_session (server_wrk, ls); - else - { - listener = listen_session_get (ls->listener_index); - sm = app_worker_get_listen_segment_manager (server_wrk, listener); - } - - seg = segment_manager_get_segment (sm, ls->svm_segment_index); - if (client_wrk) - { - client_key = application_client_local_connect_key (ls); - hash_unset (client_wrk->local_connects, client_key); - } - - if (!has_transport) - { - application_t *server = application_get (server_wrk->app_index); - u64 segment_handle = segment_manager_segment_handle (sm, seg); - server->cb_fns.del_segment_callback (server_wrk->api_client_index, - segment_handle); - if (client_wrk) - { - application_t *client = application_get (client_wrk->app_index); - client->cb_fns.del_segment_callback (client_wrk->api_client_index, - segment_handle); - } - segment_manager_del_segment (sm, seg); - } - - application_local_session_free (server_wrk, ls); - - return 0; -} - -int -application_local_session_disconnect (u32 app_index, local_session_t * ls) -{ - app_worker_t *client_wrk, *server_wrk; - u8 is_server = 0, is_client = 0; - application_t *app; - - app = application_get_if_valid (app_index); - if (!app) - return 0; - - client_wrk = app_worker_get_if_valid (ls->client_wrk_index); - server_wrk = app_worker_get (ls->app_wrk_index); - - if (server_wrk->app_index == app_index) - is_server = 1; - else if (client_wrk && client_wrk->app_index == app_index) - is_client = 1; - - if (!is_server && !is_client) - { - clib_warning ("app %u is neither client nor server for session 0x%lx", - app_index, application_local_session_handle (ls)); - return VNET_API_ERROR_INVALID_VALUE; - } - - if (ls->session_state == SESSION_STATE_CLOSED) - return application_local_session_cleanup (client_wrk, server_wrk, ls); - - if (app_index == ls->client_wrk_index) - { - mq_send_local_session_disconnected_cb (ls->app_wrk_index, ls); - } - else - { - if (!client_wrk) - { - return application_local_session_cleanup (client_wrk, server_wrk, - ls); - } - else if (ls->session_state < SESSION_STATE_READY) - { - application_t *client = application_get (client_wrk->app_index); - client->cb_fns.session_connected_callback (client_wrk->wrk_index, - ls->client_opaque, - (session_t *) ls, - 1 /* is_fail */ ); - ls->session_state = SESSION_STATE_CLOSED; - return application_local_session_cleanup (client_wrk, server_wrk, - ls); - } - else - { - mq_send_local_session_disconnected_cb (client_wrk->wrk_index, ls); - } - } - - ls->session_state = SESSION_STATE_CLOSED; - - return 0; -} - -int -application_local_session_disconnect_w_index (u32 app_wrk_index, u32 ls_index) -{ - app_worker_t *app_wrk; - local_session_t *ls; - app_wrk = app_worker_get (app_wrk_index); - ls = application_get_local_session (app_wrk, ls_index); - return application_local_session_disconnect (app_wrk_index, ls); -} - -void -app_worker_local_sessions_free (app_worker_t * app_wrk) -{ - u32 index, server_wrk_index, session_index; - u64 handle, *handles = 0; - app_worker_t *server_wrk; - segment_manager_t *sm; - local_session_t *ls; - int i; - - /* - * Local sessions - */ - if (app_wrk->local_sessions) - { - /* *INDENT-OFF* */ - pool_foreach (ls, app_wrk->local_sessions, ({ - application_local_session_disconnect (app_wrk->wrk_index, ls); - })); - /* *INDENT-ON* */ - } - - /* - * Local connects - */ - vec_reset_length (handles); - /* *INDENT-OFF* */ - hash_foreach (handle, index, app_wrk->local_connects, ({ - vec_add1 (handles, handle); - })); - /* *INDENT-ON* */ - - for (i = 0; i < vec_len (handles); i++) - { - application_client_local_connect_key_parse (handles[i], - &server_wrk_index, - &session_index); - server_wrk = app_worker_get_if_valid (server_wrk_index); - if (server_wrk) - { - ls = application_get_local_session (server_wrk, session_index); - application_local_session_disconnect (app_wrk->wrk_index, ls); - } - } - - sm = segment_manager_get (app_wrk->local_segment_manager); - sm->app_wrk_index = SEGMENT_MANAGER_INVALID_APP_INDEX; - segment_manager_del (sm); -} - clib_error_t * vnet_app_add_tls_cert (vnet_app_add_tls_cert_args_t * a) { @@ -2022,45 +1122,6 @@ vnet_app_add_tls_key (vnet_app_add_tls_key_args_t * a) return 0; } -u8 * -format_app_worker_listener (u8 * s, va_list * args) -{ - app_worker_t *app_wrk = va_arg (*args, app_worker_t *); - u64 handle = va_arg (*args, u64); - u32 sm_index = va_arg (*args, u32); - int verbose = va_arg (*args, int); - session_t *listener; - const u8 *app_name; - u8 *str; - - if (!app_wrk) - { - if (verbose) - s = format (s, "%-40s%-25s%=10s%-15s%-15s%-10s", "Connection", "App", - "Wrk", "API Client", "ListenerID", "SegManager"); - else - s = format (s, "%-40s%-25s%=10s", "Connection", "App", "Wrk"); - - return s; - } - - app_name = application_name_from_index (app_wrk->app_index); - listener = listen_session_get_from_handle (handle); - str = format (0, "%U", format_stream_session, listener, verbose); - - if (verbose) - { - char buf[32]; - sprintf (buf, "%u(%u)", app_wrk->wrk_map_index, app_wrk->wrk_index); - s = format (s, "%-40s%-25s%=10s%-15u%-15u%-10u", str, app_name, - buf, app_wrk->api_client_index, handle, sm_index); - } - else - s = format (s, "%-40s%-25s%=10u", str, app_name, app_wrk->wrk_map_index); - - return s; -} - static void application_format_listeners (application_t * app, int verbose) { @@ -2090,69 +1151,6 @@ application_format_listeners (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_connects (app_worker_t * app_wrk, int verbose) -{ - svm_fifo_segment_private_t *fifo_segment; - vlib_main_t *vm = vlib_get_main (); - segment_manager_t *sm; - const u8 *app_name; - u8 *s = 0; - - /* Header */ - if (!app_wrk) - { - if (verbose) - vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App", - "API Client", "SegManager"); - else - vlib_cli_output (vm, "%-40s%-20s", "Connection", "App"); - return; - } - - if (app_wrk->connects_seg_manager == (u32) ~ 0) - return; - - app_name = application_name_from_index (app_wrk->app_index); - - /* Across all fifo segments */ - sm = segment_manager_get (app_wrk->connects_seg_manager); - - /* *INDENT-OFF* */ - segment_manager_foreach_segment_w_lock (fifo_segment, sm, ({ - svm_fifo_t *fifo; - u8 *str; - - fifo = svm_fifo_segment_get_fifo_list (fifo_segment); - while (fifo) - { - u32 session_index, thread_index; - session_t *session; - - session_index = fifo->master_session_index; - thread_index = fifo->master_thread_index; - - session = session_get (session_index, thread_index); - str = format (0, "%U", format_stream_session, session, verbose); - - if (verbose) - s = format (s, "%-40s%-20s%-15u%-10u", str, app_name, - app_wrk->api_client_index, app_wrk->connects_seg_manager); - else - s = format (s, "%-40s%-20s", str, app_name); - - vlib_cli_output (vm, "%v", s); - vec_reset_length (s); - vec_free (str); - - fifo = fifo->next; - } - vec_free (s); - })); - /* *INDENT-ON* */ - -} - static void application_format_connects (application_t * app, int verbose) { @@ -2173,40 +1171,6 @@ application_format_connects (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_local_sessions (app_worker_t * app_wrk, int verbose) -{ - vlib_main_t *vm = vlib_get_main (); - local_session_t *ls; - transport_proto_t tp; - u8 *conn = 0; - - /* Header */ - if (app_wrk == 0) - { - vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp", - "ClientApp"); - return; - } - - if (!pool_elts (app_wrk->local_sessions) - && !pool_elts (app_wrk->local_connects)) - return; - - /* *INDENT-OFF* */ - pool_foreach (ls, app_wrk->local_sessions, ({ - tp = session_type_transport_proto(ls->listener_session_type); - conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, - ls->port); - vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_wrk_index, - ls->client_wrk_index); - vec_reset_length (conn); - })); - /* *INDENT-ON* */ - - vec_free (conn); -} - static void application_format_local_sessions (application_t * app, int verbose) { @@ -2248,43 +1212,6 @@ application_format_local_sessions (application_t * app, int verbose) /* *INDENT-ON* */ } -static void -app_worker_format_local_connects (app_worker_t * app, int verbose) -{ - vlib_main_t *vm = vlib_get_main (); - u32 app_wrk_index, session_index; - app_worker_t *server_wrk; - local_session_t *ls; - u64 client_key; - u64 value; - - /* Header */ - if (app == 0) - { - if (verbose) - vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App", - "Peer App", "SegManager"); - else - vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App", - "Peer App"); - return; - } - - if (!app->local_connects) - return; - - /* *INDENT-OFF* */ - hash_foreach (client_key, value, app->local_connects, ({ - application_client_local_connect_key_parse (client_key, &app_wrk_index, - &session_index); - server_wrk = app_worker_get (app_wrk_index); - ls = application_get_local_session (server_wrk, session_index); - vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_wrk_index, - ls->client_wrk_index); - })); - /* *INDENT-ON* */ -} - static void application_format_local_connects (application_t * app, int verbose) { @@ -2305,19 +1232,6 @@ application_format_local_connects (application_t * app, int verbose) /* *INDENT-ON* */ } -u8 * -format_application_worker (u8 * s, va_list * args) -{ - app_worker_t *app_wrk = va_arg (*args, app_worker_t *); - u32 indent = 1; - - s = format (s, "%U wrk-index %u app-index %u map-index %u " - "api-client-index %d\n", format_white_space, indent, - app_wrk->wrk_index, app_wrk->app_index, app_wrk->wrk_map_index, - app_wrk->api_client_index); - return s; -} - u8 * format_application (u8 * s, va_list * args) { @@ -2355,7 +1269,7 @@ format_application (u8 * s, va_list * args) /* *INDENT-OFF* */ pool_foreach (wrk_map, app->worker_maps, ({ app_wrk = app_worker_get (wrk_map->wrk_index); - s = format (s, "%U", format_application_worker, app_wrk); + s = format (s, "%U", format_app_worker, app_wrk); })); /* *INDENT-ON* */ -- cgit 1.2.3-korg