diff options
author | liuyacan <liuyacan@corp.netease.com> | 2021-05-09 03:50:40 +0000 |
---|---|---|
committer | Florin Coras <florin.coras@gmail.com> | 2021-05-12 04:45:07 +0000 |
commit | 534468e9f768ae7465ef722520dadfd916cdc9fb (patch) | |
tree | 7433d66e807340a2b5e0abbe152b6b944f32675d /src/vnet/session | |
parent | 7b2917fbe2a9ec17f69ca94fcbae534927915834 (diff) |
session: support half-close connection
Some app(e.g. Envoy) may call shutdown() instead of close() when
draining connection.
Type: improvement
Signed-off-by: liuyacan <liuyacan@corp.netease.com>
Change-Id: I9543b9ca3caa87b10b134fd1fc4019124e41e4d2
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 21 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 14 | ||||
-rw-r--r-- | src/vnet/session/session.c | 43 | ||||
-rw-r--r-- | src/vnet/session/session.h | 2 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 25 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 3 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 7 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 3 |
8 files changed, 114 insertions, 4 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 8abec7797ec..83106ef172e 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -1385,6 +1385,27 @@ vnet_unlisten (vnet_unlisten_args_t * a) } int +vnet_shutdown_session (vnet_shutdown_args_t *a) +{ + app_worker_t *app_wrk; + session_t *s; + + s = session_get_from_handle_if_valid (a->handle); + if (!s) + return SESSION_E_NOSESSION; + + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != a->app_index) + return SESSION_E_OWNER; + + /* We're peeking into another's thread pool. Make sure */ + ASSERT (s->session_index == session_index_from_handle (a->handle)); + + session_half_close (s); + return 0; +} + +int vnet_disconnect_session (vnet_disconnect_args_t * a) { app_worker_t *app_wrk; diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 961561547d7..d3bfb3b03dd 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -141,6 +141,12 @@ typedef struct _vnet_connect_args u32 api_context; } vnet_connect_args_t; +typedef struct _vnet_shutdown_args_t +{ + session_handle_t handle; + u32 app_index; +} vnet_shutdown_args_t; + typedef struct _vnet_disconnect_args_t { session_handle_t handle; @@ -266,6 +272,7 @@ int vnet_application_detach (vnet_app_detach_args_t * a); int vnet_listen (vnet_listen_args_t * a); int vnet_connect (vnet_connect_args_t * a); int vnet_unlisten (vnet_unlisten_args_t * a); +int vnet_shutdown_session (vnet_shutdown_args_t *a); int vnet_disconnect_session (vnet_disconnect_args_t * a); int vnet_app_add_cert_key_pair (vnet_app_add_cert_key_pair_args_t * a); @@ -426,6 +433,13 @@ typedef struct session_connected_msg_ transport_endpoint_t lcl; } __clib_packed session_connected_msg_t; +typedef struct session_shutdown_msg_ +{ + u32 client_index; + u32 context; + session_handle_t handle; +} __clib_packed session_shutdown_msg_t; + typedef struct session_disconnect_msg_ { u32 client_index; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 1fac5ed2bb9..eba9f64ca22 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -97,9 +97,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index, int session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) { - /* only events supported are disconnect and reset */ - ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE - || evt_type == SESSION_CTRL_EVT_RESET); + /* only events supported are disconnect, shutdown and reset */ + ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE || + evt_type == SESSION_CTRL_EVT_HALF_CLOSE || + evt_type == SESSION_CTRL_EVT_RESET); return session_send_evt_to_thread (s, 0, s->thread_index, evt_type); } @@ -1087,7 +1088,9 @@ session_transport_closed_notify (transport_connection_t * tc) return; /* Transport thinks that app requested close but it actually didn't. - * Can happen for tcp if fin and rst are received in close succession. */ + * Can happen for tcp: + * 1)if fin and rst are received in close succession. + * 2)if app shutdown the connection. */ if (s->session_state == SESSION_STATE_READY) { session_transport_closing_notify (tc); @@ -1399,6 +1402,20 @@ session_stop_listen (session_t * s) } /** + * Initialize session half-closing procedure. + * + * Note that half-closing will not change the state of the session. + */ +void +session_half_close (session_t *s) +{ + if (!s) + return; + + session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE); +} + +/** * Initialize session closing procedure. * * Request is always sent to session node to ensure that all outstanding @@ -1439,6 +1456,24 @@ session_reset (session_t * s) } /** + * Notify transport the session can be half-disconnected. + * + * Must be called from the session's thread. + */ +void +session_transport_half_close (session_t *s) +{ + /* Only READY session can be half-closed */ + if (s->session_state != SESSION_STATE_READY) + { + return; + } + + transport_half_close (session_get_transport_proto (s), s->connection_index, + s->thread_index); +} + +/** * Notify transport the session can be disconnected. This should eventually * result in a delete notification that allows us to cleanup session state. * Called for both active/passive disconnects. diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 1a59d7df403..17a870dfcf0 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -455,8 +455,10 @@ session_clone_safe (u32 session_index, u32 thread_index) int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque); int session_listen (session_t * s, session_endpoint_cfg_t * sep); int session_stop_listen (session_t * s); +void session_half_close (session_t *s); void session_close (session_t * s); void session_reset (session_t * s); +void session_transport_half_close (session_t *s); void session_transport_close (session_t * s); void session_transport_reset (session_t * s); void session_transport_cleanup (session_t * s); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index d30df33a5e7..b68ff53dd7a 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -195,6 +195,22 @@ session_mq_connect_uri_handler (void *data) } static void +session_mq_shutdown_handler (void *data) +{ + session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data; + vnet_shutdown_args_t _a, *a = &_a; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + + a->app_index = app->app_index; + a->handle = mp->handle; + vnet_shutdown_session (a); +} + +static void session_mq_disconnect_handler (void *data) { session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data; @@ -1287,6 +1303,12 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) fp = e->rpc_args.fp; (*fp) (e->rpc_args.arg); break; + case SESSION_CTRL_EVT_HALF_CLOSE: + s = session_get_from_handle_if_valid (e->session_handle); + if (PREDICT_FALSE (!s)) + break; + session_transport_half_close (s); + break; case SESSION_CTRL_EVT_CLOSE: s = session_get_from_handle_if_valid (e->session_handle); if (PREDICT_FALSE (!s)) @@ -1314,6 +1336,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) case SESSION_CTRL_EVT_CONNECT_URI: session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt)); break; + case SESSION_CTRL_EVT_SHUTDOWN: + session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt)); + break; case SESSION_CTRL_EVT_DISCONNECT: session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt)); break; diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 2c3db5f18b5..821aac9394d 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -331,6 +331,7 @@ typedef enum SESSION_IO_EVT_BUILTIN_RX, SESSION_IO_EVT_BUILTIN_TX, SESSION_CTRL_EVT_RPC, + SESSION_CTRL_EVT_HALF_CLOSE, SESSION_CTRL_EVT_CLOSE, SESSION_CTRL_EVT_RESET, SESSION_CTRL_EVT_BOUND, @@ -344,6 +345,7 @@ typedef enum SESSION_CTRL_EVT_REQ_WORKER_UPDATE, SESSION_CTRL_EVT_WORKER_UPDATE, SESSION_CTRL_EVT_WORKER_UPDATE_REPLY, + SESSION_CTRL_EVT_SHUTDOWN, SESSION_CTRL_EVT_DISCONNECT, SESSION_CTRL_EVT_CONNECT, SESSION_CTRL_EVT_CONNECT_URI, @@ -371,6 +373,7 @@ typedef enum _ (CONNECT, connect) \ _ (CONNECT_URI, connect_uri) \ _ (CONNECTED, connected) \ + _ (SHUTDOWN, shutdown) \ _ (DISCONNECT, disconnect) \ _ (DISCONNECTED, disconnected) \ _ (DISCONNECTED_REPLY, disconnected_reply) \ diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 18ae3ca5188..2c88a4c4931 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -318,6 +318,13 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) } void +transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index) +{ + if (tp_vfts[tp].half_close) + tp_vfts[tp].half_close (conn_index, thread_index); +} + +void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index) { tp_vfts[tp].close (conn_index, thread_index); diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 67583d23be0..447552c539e 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -74,6 +74,7 @@ typedef struct _transport_proto_vft u32 (*start_listen) (u32 session_index, transport_endpoint_t * lcl); u32 (*stop_listen) (u32 conn_index); int (*connect) (transport_endpoint_cfg_t * rmt); + void (*half_close) (u32 conn_index, u32 thread_index); void (*close) (u32 conn_index, u32 thread_index); void (*reset) (u32 conn_index, u32 thread_index); void (*cleanup) (u32 conn_index, u32 thread_index); @@ -134,6 +135,8 @@ do { \ } while (0) int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep); +void transport_half_close (transport_proto_t tp, u32 conn_index, + u8 thread_index); void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index); void transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index); u32 transport_start_listen (transport_proto_t tp, u32 session_index, |