summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorliuyacan <liuyacan@corp.netease.com>2021-05-09 03:50:40 +0000
committerFlorin Coras <florin.coras@gmail.com>2021-05-12 04:45:07 +0000
commit534468e9f768ae7465ef722520dadfd916cdc9fb (patch)
tree7433d66e807340a2b5e0abbe152b6b944f32675d /src
parent7b2917fbe2a9ec17f69ca94fcbae534927915834 (diff)
session: support half-close connection
Some app(e.g. Envoy) may call shutdown() instead of close() when draining connection. Type: improvement Signed-off-by: liuyacan <liuyacan@corp.netease.com> Change-Id: I9543b9ca3caa87b10b134fd1fc4019124e41e4d2
Diffstat (limited to 'src')
-rw-r--r--src/vcl/ldp.c2
-rw-r--r--src/vcl/vcl_locked.c18
-rw-r--r--src/vcl/vcl_locked.h1
-rw-r--r--src/vcl/vcl_private.h1
-rw-r--r--src/vcl/vppcom.c56
-rw-r--r--src/vcl/vppcom.h1
-rw-r--r--src/vnet/session/application.c21
-rw-r--r--src/vnet/session/application_interface.h14
-rw-r--r--src/vnet/session/session.c43
-rw-r--r--src/vnet/session/session.h2
-rw-r--r--src/vnet/session/session_node.c25
-rw-r--r--src/vnet/session/session_types.h3
-rw-r--r--src/vnet/session/transport.c7
-rw-r--r--src/vnet/session/transport.h3
-rw-r--r--src/vnet/tcp/tcp.c26
15 files changed, 217 insertions, 6 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c
index 64a4e7c77db..f27f6ba8e97 100644
--- a/src/vcl/ldp.c
+++ b/src/vcl/ldp.c
@@ -2219,6 +2219,8 @@ shutdown (int fd, int how)
if (flags == SHUT_RDWR)
rv = close (fd);
+ else if (flags == SHUT_WR)
+ rv = vls_shutdown (vlsh);
}
else
{
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 757c0fc45a7..69f492b8694 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -1313,6 +1313,24 @@ vls_close (vls_handle_t vlsh)
return rv;
}
+int
+vls_shutdown (vls_handle_t vlsh)
+{
+ vcl_locked_session_t *vls;
+ int rv;
+
+ vls_mt_detect ();
+ if (!(vls = vls_get_w_dlock (vlsh)))
+ return VPPCOM_EBADFD;
+
+ vls_mt_guard (vls, VLS_MT_OP_SPOOL);
+ rv = vppcom_session_shutdown (vls_to_sh (vls));
+ vls_mt_unguard ();
+ vls_get_and_unlock (vlsh);
+
+ return rv;
+}
+
vls_handle_t
vls_epoll_create (void)
{
diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h
index 11b71eee4af..3adcf62bc77 100644
--- a/src/vcl/vcl_locked.h
+++ b/src/vcl/vcl_locked.h
@@ -26,6 +26,7 @@
typedef int vls_handle_t;
vls_handle_t vls_create (uint8_t proto, uint8_t is_nonblocking);
+int vls_shutdown (vls_handle_t vlsh);
int vls_close (vls_handle_t vlsh);
int vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep);
int vls_listen (vls_handle_t vlsh, int q_len);
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 6060ef82357..956f077b880 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -137,6 +137,7 @@ typedef enum vcl_session_flags_
VCL_SESSION_F_IS_VEP = 1 << 1,
VCL_SESSION_F_IS_VEP_SESSION = 1 << 2,
VCL_SESSION_F_HAS_RX_EVT = 1 << 3,
+ VCL_SESSION_F_SHUTDOWN = 1 << 4,
} __clib_packed vcl_session_flags_t;
typedef struct vcl_session_
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 96a207b741f..0713a7b2d33 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -260,6 +260,23 @@ vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
}
static void
+vcl_send_session_shutdown (vcl_worker_t *wrk, vcl_session_t *s)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_shutdown_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ /* Send to thread that owns the session */
+ mq = s->vpp_evt_q;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_SHUTDOWN);
+ mp = (session_shutdown_msg_t *) app_evt->evt->data;
+ memset (mp, 0, sizeof (*mp));
+ mp->client_index = wrk->api_client_handle;
+ mp->handle = s->vpp_handle;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static void
vcl_send_session_disconnect (vcl_worker_t * wrk, vcl_session_t * s)
{
app_session_evt_t _app_evt, *app_evt = &_app_evt;
@@ -789,6 +806,42 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk,
return session;
}
+int
+vppcom_session_shutdown (uint32_t session_handle)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *session;
+ vcl_session_state_t state;
+ u64 vpp_handle;
+
+ session = vcl_session_get_w_handle (wrk, session_handle);
+ if (PREDICT_FALSE (!session))
+ return VPPCOM_EBADFD;
+
+ vpp_handle = session->vpp_handle;
+ state = session->session_state;
+
+ VDBG (1, "session %u [0x%llx] state 0x%x (%s)", session->session_index,
+ vpp_handle, state, vppcom_session_state_str (state));
+
+ if (PREDICT_FALSE (state == VCL_STATE_LISTEN))
+ {
+ VDBG (0, "ERROR: Cannot shutdown a listen socket!");
+ return VPPCOM_EBADFD;
+ }
+
+ if (PREDICT_TRUE (state == VCL_STATE_READY))
+ {
+ VDBG (1, "session %u [0x%llx]: sending shutdown...",
+ session->session_index, vpp_handle);
+
+ vcl_send_session_shutdown (wrk, session);
+ session->flags |= VCL_SESSION_F_SHUTDOWN;
+ }
+
+ return VPPCOM_OK;
+}
+
static int
vppcom_session_disconnect (u32 session_handle)
{
@@ -2101,7 +2154,8 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
return VPPCOM_EBADFD;
}
- if (PREDICT_FALSE (!vcl_session_is_open (s)))
+ if (PREDICT_FALSE (!vcl_session_is_open (s) ||
+ s->flags & VCL_SESSION_F_SHUTDOWN))
{
VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
s->session_index, s->vpp_handle, s->session_state,
diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h
index ae4888566c7..72e5d46bc8f 100644
--- a/src/vcl/vppcom.h
+++ b/src/vcl/vppcom.h
@@ -172,6 +172,7 @@ extern int vppcom_app_create (const char *app_name);
extern void vppcom_app_destroy (void);
extern int vppcom_session_create (uint8_t proto, uint8_t is_nonblocking);
+extern int vppcom_session_shutdown (uint32_t session_handle);
extern int vppcom_session_close (uint32_t session_handle);
extern int vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep);
extern int vppcom_session_listen (uint32_t session_handle, uint32_t q_len);
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 8abec7797ec..83106ef172e 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -1385,6 +1385,27 @@ vnet_unlisten (vnet_unlisten_args_t * a)
}
int
+vnet_shutdown_session (vnet_shutdown_args_t *a)
+{
+ app_worker_t *app_wrk;
+ session_t *s;
+
+ s = session_get_from_handle_if_valid (a->handle);
+ if (!s)
+ return SESSION_E_NOSESSION;
+
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != a->app_index)
+ return SESSION_E_OWNER;
+
+ /* We're peeking into another's thread pool. Make sure */
+ ASSERT (s->session_index == session_index_from_handle (a->handle));
+
+ session_half_close (s);
+ return 0;
+}
+
+int
vnet_disconnect_session (vnet_disconnect_args_t * a)
{
app_worker_t *app_wrk;
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 961561547d7..d3bfb3b03dd 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -141,6 +141,12 @@ typedef struct _vnet_connect_args
u32 api_context;
} vnet_connect_args_t;
+typedef struct _vnet_shutdown_args_t
+{
+ session_handle_t handle;
+ u32 app_index;
+} vnet_shutdown_args_t;
+
typedef struct _vnet_disconnect_args_t
{
session_handle_t handle;
@@ -266,6 +272,7 @@ int vnet_application_detach (vnet_app_detach_args_t * a);
int vnet_listen (vnet_listen_args_t * a);
int vnet_connect (vnet_connect_args_t * a);
int vnet_unlisten (vnet_unlisten_args_t * a);
+int vnet_shutdown_session (vnet_shutdown_args_t *a);
int vnet_disconnect_session (vnet_disconnect_args_t * a);
int vnet_app_add_cert_key_pair (vnet_app_add_cert_key_pair_args_t * a);
@@ -426,6 +433,13 @@ typedef struct session_connected_msg_
transport_endpoint_t lcl;
} __clib_packed session_connected_msg_t;
+typedef struct session_shutdown_msg_
+{
+ u32 client_index;
+ u32 context;
+ session_handle_t handle;
+} __clib_packed session_shutdown_msg_t;
+
typedef struct session_disconnect_msg_
{
u32 client_index;
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 1fac5ed2bb9..eba9f64ca22 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -97,9 +97,10 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
int
session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
{
- /* only events supported are disconnect and reset */
- ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
- || evt_type == SESSION_CTRL_EVT_RESET);
+ /* only events supported are disconnect, shutdown and reset */
+ ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE ||
+ evt_type == SESSION_CTRL_EVT_HALF_CLOSE ||
+ evt_type == SESSION_CTRL_EVT_RESET);
return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
}
@@ -1087,7 +1088,9 @@ session_transport_closed_notify (transport_connection_t * tc)
return;
/* Transport thinks that app requested close but it actually didn't.
- * Can happen for tcp if fin and rst are received in close succession. */
+ * Can happen for tcp:
+ * 1)if fin and rst are received in close succession.
+ * 2)if app shutdown the connection. */
if (s->session_state == SESSION_STATE_READY)
{
session_transport_closing_notify (tc);
@@ -1399,6 +1402,20 @@ session_stop_listen (session_t * s)
}
/**
+ * Initialize session half-closing procedure.
+ *
+ * Note that half-closing will not change the state of the session.
+ */
+void
+session_half_close (session_t *s)
+{
+ if (!s)
+ return;
+
+ session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_HALF_CLOSE);
+}
+
+/**
* Initialize session closing procedure.
*
* Request is always sent to session node to ensure that all outstanding
@@ -1439,6 +1456,24 @@ session_reset (session_t * s)
}
/**
+ * Notify transport the session can be half-disconnected.
+ *
+ * Must be called from the session's thread.
+ */
+void
+session_transport_half_close (session_t *s)
+{
+ /* Only READY session can be half-closed */
+ if (s->session_state != SESSION_STATE_READY)
+ {
+ return;
+ }
+
+ transport_half_close (session_get_transport_proto (s), s->connection_index,
+ s->thread_index);
+}
+
+/**
* Notify transport the session can be disconnected. This should eventually
* result in a delete notification that allows us to cleanup session state.
* Called for both active/passive disconnects.
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 1a59d7df403..17a870dfcf0 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -455,8 +455,10 @@ session_clone_safe (u32 session_index, u32 thread_index)
int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque);
int session_listen (session_t * s, session_endpoint_cfg_t * sep);
int session_stop_listen (session_t * s);
+void session_half_close (session_t *s);
void session_close (session_t * s);
void session_reset (session_t * s);
+void session_transport_half_close (session_t *s);
void session_transport_close (session_t * s);
void session_transport_reset (session_t * s);
void session_transport_cleanup (session_t * s);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index d30df33a5e7..b68ff53dd7a 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -195,6 +195,22 @@ session_mq_connect_uri_handler (void *data)
}
static void
+session_mq_shutdown_handler (void *data)
+{
+ session_shutdown_msg_t *mp = (session_shutdown_msg_t *) data;
+ vnet_shutdown_args_t _a, *a = &_a;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ a->app_index = app->app_index;
+ a->handle = mp->handle;
+ vnet_shutdown_session (a);
+}
+
+static void
session_mq_disconnect_handler (void *data)
{
session_disconnect_msg_t *mp = (session_disconnect_msg_t *) data;
@@ -1287,6 +1303,12 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
fp = e->rpc_args.fp;
(*fp) (e->rpc_args.arg);
break;
+ case SESSION_CTRL_EVT_HALF_CLOSE:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_half_close (s);
+ break;
case SESSION_CTRL_EVT_CLOSE:
s = session_get_from_handle_if_valid (e->session_handle);
if (PREDICT_FALSE (!s))
@@ -1314,6 +1336,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
case SESSION_CTRL_EVT_CONNECT_URI:
session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
break;
+ case SESSION_CTRL_EVT_SHUTDOWN:
+ session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
+ break;
case SESSION_CTRL_EVT_DISCONNECT:
session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
break;
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 2c3db5f18b5..821aac9394d 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -331,6 +331,7 @@ typedef enum
SESSION_IO_EVT_BUILTIN_RX,
SESSION_IO_EVT_BUILTIN_TX,
SESSION_CTRL_EVT_RPC,
+ SESSION_CTRL_EVT_HALF_CLOSE,
SESSION_CTRL_EVT_CLOSE,
SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_BOUND,
@@ -344,6 +345,7 @@ typedef enum
SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
+ SESSION_CTRL_EVT_SHUTDOWN,
SESSION_CTRL_EVT_DISCONNECT,
SESSION_CTRL_EVT_CONNECT,
SESSION_CTRL_EVT_CONNECT_URI,
@@ -371,6 +373,7 @@ typedef enum
_ (CONNECT, connect) \
_ (CONNECT_URI, connect_uri) \
_ (CONNECTED, connected) \
+ _ (SHUTDOWN, shutdown) \
_ (DISCONNECT, disconnect) \
_ (DISCONNECTED, disconnected) \
_ (DISCONNECTED_REPLY, disconnected_reply) \
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index 18ae3ca5188..2c88a4c4931 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -318,6 +318,13 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
}
void
+transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
+{
+ if (tp_vfts[tp].half_close)
+ tp_vfts[tp].half_close (conn_index, thread_index);
+}
+
+void
transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
{
tp_vfts[tp].close (conn_index, thread_index);
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 67583d23be0..447552c539e 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -74,6 +74,7 @@ typedef struct _transport_proto_vft
u32 (*start_listen) (u32 session_index, transport_endpoint_t * lcl);
u32 (*stop_listen) (u32 conn_index);
int (*connect) (transport_endpoint_cfg_t * rmt);
+ void (*half_close) (u32 conn_index, u32 thread_index);
void (*close) (u32 conn_index, u32 thread_index);
void (*reset) (u32 conn_index, u32 thread_index);
void (*cleanup) (u32 conn_index, u32 thread_index);
@@ -134,6 +135,8 @@ do { \
} while (0)
int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep);
+void transport_half_close (transport_proto_t tp, u32 conn_index,
+ u8 thread_index);
void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index);
void transport_reset (transport_proto_t tp, u32 conn_index, u8 thread_index);
u32 transport_start_listen (transport_proto_t tp, u32 session_index,
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 90b832cd73d..16bf9451709 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -354,7 +354,6 @@ tcp_program_cleanup (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
* 2) TIME_WAIT (active close) whereby after 2MSL the 2MSL timer triggers
* and cleanup is called.
*
- * N.B. Half-close connections are not supported
*/
void
tcp_connection_close (tcp_connection_t * tc)
@@ -426,6 +425,30 @@ tcp_connection_close (tcp_connection_t * tc)
}
static void
+tcp_session_half_close (u32 conn_index, u32 thread_index)
+{
+ tcp_worker_ctx_t *wrk;
+ tcp_connection_t *tc;
+
+ tc = tcp_connection_get (conn_index, thread_index);
+ wrk = tcp_get_worker (tc->c_thread_index);
+
+ /* If the connection is not in ESTABLISHED state, ignore it */
+ if (tc->state != TCP_STATE_ESTABLISHED)
+ return;
+ if (!transport_max_tx_dequeue (&tc->connection))
+ tcp_send_fin (tc);
+ else
+ tc->flags |= TCP_CONN_FINPNDG;
+ tcp_connection_set_state (tc, TCP_STATE_FIN_WAIT_1);
+ /* Set a timer in case the peer stops responding. Otherwise the
+ * connection will be stuck here forever. */
+ ASSERT (tc->timers[TCP_TIMER_WAITCLOSE] == TCP_TIMER_HANDLE_INVALID);
+ tcp_timer_set (&wrk->timer_wheel, tc, TCP_TIMER_WAITCLOSE,
+ tcp_cfg.finwait1_time);
+}
+
+static void
tcp_session_close (u32 conn_index, u32 thread_index)
{
tcp_connection_t *tc;
@@ -1316,6 +1339,7 @@ const static transport_proto_vft_t tcp_proto = {
.get_half_open = tcp_half_open_session_get_transport,
.attribute = tcp_session_attribute,
.connect = tcp_session_open,
+ .half_close = tcp_session_half_close,
.close = tcp_session_close,
.cleanup = tcp_session_cleanup,
.cleanup_ho = tcp_session_cleanup_ho,
node: VPP node. :param interface: Interface name or sw_if_index to set/unset policer classify. :param ip4_table_index: IP4 classify table index (~0 to skip). (Default value = ~0) :param ip6_table_index: IP6 classify table index (~0 to skip). (Default value = ~0) :param l2_table_index: L2 classify table index (~0 to skip). (Default value = ~0) :param is_add: Set if True, else unset. :type node: dict :type interface: str or int :type ip4_table_index: int :type ip6_table_index: int :type l2_table_index: int :type is_add: bool """ if isinstance(interface, str): sw_if_index = Topology.get_interface_sw_index(node, interface) else: sw_if_index = interface cmd = u"policer_classify_set_interface" args = dict( is_add=is_add, sw_if_index=int(sw_if_index), ip4_table_index=int(ip4_table_index), ip6_table_index=int(ip6_table_index), l2_table_index=int(l2_table_index) ) err_msg = f"Failed to set/unset policer classify interface " \ f"{interface} on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def policer_classify_get_precolor(precolor): """Return policer pre-color numeric value. :param precolor: Policer pre-color name. :type precolor: str :returns: Policer pre-color numeric value. :rtype: int """ return getattr(PolicerPreColor, precolor.upper()).value @staticmethod def get_dscp_num_value(dscp): """Return DSCP numeric value. :param dscp: DSCP name. :type dscp: str :returns: DSCP numeric value. :rtype: int """ return getattr(IpDscp, f"IP_API_DSCP_{dscp.upper()}").value