From d50ff7fba167035b10def282b5ec166fa7899948 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Thu, 16 Apr 2020 04:30:22 +0000 Subject: session tcp: track half open in app wrk Type: improvement Do extra checks when establishing an active connect and cleanup pending connects if application detaches. Signed-off-by: Florin Coras Change-Id: Ibe9349db57b313ba2aa5ea3960ef5cf755f5098a --- src/vnet/session/application.h | 10 ++++++ src/vnet/session/application_worker.c | 62 +++++++++++++++++++++++++++++++++++ src/vnet/session/session.c | 45 ++++++++++++++++++++----- src/vnet/session/session.h | 4 +++ src/vnet/session/session_types.h | 16 +++++++-- src/vnet/session/transport.c | 7 ++++ src/vnet/session/transport.h | 2 ++ src/vnet/session/transport_types.h | 2 ++ 8 files changed, 138 insertions(+), 10 deletions(-) (limited to 'src/vnet/session') diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 00b60c33469..377ea1ef4ac 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -63,6 +63,9 @@ typedef struct app_worker_ u32 api_client_index; u8 app_is_builtin; + + /** Per transport proto hash tables of half-open connection handles */ + uword **half_open_table; } app_worker_t; typedef struct app_worker_map_ @@ -251,6 +254,13 @@ int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s); int app_worker_init_connected (app_worker_t * app_wrk, session_t * s); int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s, session_error_t err, u32 opaque); +int app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle, + session_handle_t wrk_handle); +int app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle); +u64 app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle); int app_worker_close_notify (app_worker_t * app_wrk, session_t * s); int app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s); diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 47b156732ef..c67aa88db05 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -75,6 +75,8 @@ app_worker_free (app_worker_t * app_wrk) })); /* *INDENT-ON* */ + hash_free (app_wrk->listeners_table); + for (i = 0; i < vec_len (handles); i++) { a->app_index = app->app_index; @@ -83,6 +85,7 @@ app_worker_free (app_worker_t * app_wrk) /* seg manager is removed when unbind completes */ (void) vnet_unlisten (a); } + vec_reset_length (handles); /* * Connects segment manager cleanup @@ -96,6 +99,31 @@ app_worker_free (app_worker_t * app_wrk) segment_manager_init_free (sm); } + /* + * Half-open cleanup + */ + + for (i = 0; i < vec_len (app_wrk->half_open_table); i++) + { + if (!app_wrk->half_open_table[i]) + continue; + + /* *INDENT-OFF* */ + hash_foreach (handle, sm_index, app_wrk->half_open_table[i], ({ + vec_add1 (handles, handle); + })); + /* *INDENT-ON* */ + + for (i = 0; i < vec_len (handles); i++) + session_cleanup_half_open (i, handles[i]); + + hash_free (app_wrk->half_open_table[i]); + vec_reset_length (handles); + } + + vec_free (app_wrk->half_open_table); + vec_free (handles); + /* If first segment manager is used by a listener */ if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX && app_wrk->first_segment_manager != app_wrk->connects_seg_manager) @@ -334,6 +362,40 @@ app_worker_connect_notify (app_worker_t * app_wrk, session_t * s, s, err); } +int +app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle, + session_handle_t wrk_handle) +{ + ASSERT (vlib_get_thread_index () == 0); + vec_validate (app_wrk->half_open_table, tp); + hash_set (app_wrk->half_open_table[tp], ho_handle, wrk_handle); + return 0; +} + +int +app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle) +{ + ASSERT (vlib_get_thread_index () == 0); + hash_unset (app_wrk->half_open_table[tp], ho_handle); + return 0; +} + +u64 +app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp, + session_handle_t ho_handle) +{ + u64 *ho_wrk_handlep; + + /* No locking because all updates are done from main thread */ + ho_wrk_handlep = hash_get (app_wrk->half_open_table[tp], ho_handle); + if (!ho_wrk_handlep) + return SESSION_INVALID_HANDLE; + + return *ho_wrk_handlep; +} + int app_worker_close_notify (app_worker_t * app_wrk, session_t * s) { diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index ed940d593c5..a709302c8a7 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -282,6 +282,20 @@ session_delete (session_t * s) session_free_w_fifos (s); } +void +session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle) +{ + transport_cleanup_half_open (tp, session_handle_index (ho_handle)); +} + +void +session_half_open_delete_notify (transport_proto_t tp, + session_handle_t ho_handle) +{ + app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle)); + app_worker_del_half_open (app_wrk, tp, ho_handle); +} + session_t * session_alloc_for_connection (transport_connection_t * tc) { @@ -738,10 +752,10 @@ int session_stream_connect_notify (transport_connection_t * tc, session_error_t err) { + session_handle_t ho_handle, wrk_handle; u32 opaque = 0, new_ti, new_si; app_worker_t *app_wrk; session_t *s = 0; - u64 ho_handle; /* * Find connection handle and cleanup half-open table @@ -757,11 +771,19 @@ session_stream_connect_notify (transport_connection_t * tc, /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app_wrk = app_worker_get_if_valid (ho_handle >> 32); + app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle)); if (!app_wrk) return -1; - opaque = tc->s_index; + wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle); + if (wrk_handle == SESSION_INVALID_HANDLE) + return -1; + + /* Make sure this is the same half-open index */ + if (session_handle_index (wrk_handle) != session_handle_index (ho_handle)) + return -1; + + opaque = session_handle_data (wrk_handle); if (err) return app_worker_connect_notify (app_wrk, s, err, opaque); @@ -1183,7 +1205,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_cfg_t *tep; - u64 handle; + u64 handle, wrk_handle; int rv; tep = session_endpoint_to_transport_cfg (rmt); @@ -1203,13 +1225,20 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) * is needed when the connect notify comes and we have to notify the * external app */ - handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index; + handle = session_make_handle (tc->c_index, app_wrk_index); session_lookup_add_half_open (tc, handle); - /* Store api_context (opaque) for when the reply comes. Not the nicest - * thing but better than allocating a separate half-open pool. + /* Store the half-open handle in the connection. Transport will use it + * when cleaning up @ref session_half_open_delete_notify + */ + tc->s_ho_handle = handle; + + /* Track the half-open connections in case we want to forcefully + * clean them up @ref session_cleanup_half_open */ - tc->s_index = opaque; + wrk_handle = session_make_handle (tc->c_index, opaque); + app_worker_add_half_open (app_worker_get (app_wrk_index), + rmt->transport_proto, handle, wrk_handle); return 0; } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 956bff068b4..5e0a832d796 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -293,6 +293,8 @@ session_evt_alloc_old (session_worker_t * wrk) session_t *session_alloc (u32 thread_index); void session_free (session_t * s); void session_free_w_fifos (session_t * s); +void session_cleanup_half_open (transport_proto_t tp, + session_handle_t ho_handle); u8 session_is_valid (u32 si, u8 thread_index); always_inline session_t * @@ -462,6 +464,8 @@ int session_dgram_connect_notify (transport_connection_t * tc, int session_stream_accept_notify (transport_connection_t * tc); void session_transport_closing_notify (transport_connection_t * tc); void session_transport_delete_notify (transport_connection_t * tc); +void session_half_open_delete_notify (transport_proto_t tp, + session_handle_t ho_handle); void session_transport_closed_notify (transport_connection_t * tc); void session_transport_reset_notify (transport_connection_t * tc); int session_stream_accept (transport_connection_t * tc, u32 listener_index, diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index cdaaad04ca1..784312d9182 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -309,9 +309,21 @@ session_parse_handle (session_handle_t handle, u32 * index, } static inline session_handle_t -session_make_handle (u32 session_index, u32 thread_index) +session_make_handle (u32 session_index, u32 data) { - return (((u64) thread_index << 32) | (u64) session_index); + return (((u64) data << 32) | (u64) session_index); +} + +always_inline u32 +session_handle_index (session_handle_t ho_handle) +{ + return (ho_handle & 0xffffffff); +} + +always_inline u32 +session_handle_data (session_handle_t ho_handle) +{ + return (ho_handle >> 32); } typedef enum diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index d2c65949519..9dd495c7d61 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -303,6 +303,13 @@ transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index) tp_vfts[tp].cleanup (conn_index, thread_index); } +void +transport_cleanup_half_open (transport_proto_t tp, u32 conn_index) +{ + if (tp_vfts[tp].cleanup) + tp_vfts[tp].cleanup_ho (conn_index); +} + int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 954db49022c..9c873b12caf 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -76,6 +76,7 @@ typedef struct _transport_proto_vft void (*close) (u32 conn_index, u32 thread_index); void (*reset) (u32 conn_index, u32 thread_index); void (*cleanup) (u32 conn_index, u32 thread_index); + void (*cleanup_ho) (u32 conn_index); clib_error_t *(*enable) (vlib_main_t * vm, u8 is_en); /* @@ -137,6 +138,7 @@ u32 transport_start_listen (transport_proto_t tp, u32 session_index, u32 transport_stop_listen (transport_proto_t tp, u32 conn_index); void transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index); +void transport_cleanup_half_open (transport_proto_t tp, u32 conn_index); void transport_get_endpoint (transport_proto_t tp, u32 conn_index, u32 thread_index, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h index 28043b56fe6..d76970f8dca 100644 --- a/src/vnet/session/transport_types.h +++ b/src/vnet/session/transport_types.h @@ -144,6 +144,8 @@ typedef struct _transport_connection #define c_stats connection.stats #define c_pacer connection.pacer #define c_flags connection.flags +#define s_ho_handle pacer.bucket +#define c_s_ho_handle connection.pacer.bucket } transport_connection_t; STATIC_ASSERT (STRUCT_OFFSET_OF (transport_connection_t, s_index) -- cgit 1.2.3-korg