diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.h | 2 | ||||
-rw-r--r-- | src/vnet/session/application_interface.c | 4 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 30 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 6 | ||||
-rw-r--r-- | src/vnet/session/session.c | 40 | ||||
-rw-r--r-- | src/vnet/session/session.h | 38 | ||||
-rw-r--r-- | src/vnet/session/session_api.c | 6 | ||||
-rw-r--r-- | src/vnet/session/session_input.c | 17 | ||||
-rw-r--r-- | src/vnet/session/session_lookup.c | 1 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 4 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 101 |
11 files changed, 147 insertions, 102 deletions
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 5fea61bdab4..c68a911230f 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -342,7 +342,7 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk, int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al); int app_worker_init_accepted (session_t * s); int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh, - u32 opaque, int err); + u32 opaque, session_error_t err); int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh, u32 opaque, session_error_t err); int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s); diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c index 86f3dcdece6..a62f914d43a 100644 --- a/src/vnet/session/application_interface.c +++ b/src/vnet/session/application_interface.c @@ -106,8 +106,8 @@ parse_uri (char *uri, session_endpoint_cfg_t *sep) return 0; } -int -vnet_bind_uri (vnet_listen_args_t * a) +session_error_t +vnet_bind_uri (vnet_listen_args_t *a) { session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; int rv; diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 4eee17eeda8..f175e4a58c6 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -126,7 +126,7 @@ typedef struct _vnet_bind_args_t /* * Results */ - u64 handle; + session_handle_t handle; } vnet_listen_args_t; typedef struct _vnet_unlisten_args_t @@ -134,7 +134,7 @@ typedef struct _vnet_unlisten_args_t union { char *uri; - u64 handle; /**< Session handle */ + session_handle_t handle; /**< Session handle */ }; u32 app_index; /**< Owning application index */ u32 wrk_map_index; /**< App's local pool worker index */ @@ -356,7 +356,7 @@ STATIC_ASSERT (sizeof (session_listen_uri_msg_t) <= SESSION_CTRL_MSG_MAX_SIZE, typedef struct session_bound_msg_ { u32 context; - u64 handle; + session_handle_t handle; i32 retval; u8 lcl_is_ip4; u8 lcl_ip[16]; @@ -379,15 +379,15 @@ typedef struct session_unlisten_msg_ typedef struct session_unlisten_reply_msg_ { u32 context; - u64 handle; + session_handle_t handle; i32 retval; } __clib_packed session_unlisten_reply_msg_t; typedef struct session_accepted_msg_ { u32 context; - u64 listener_handle; - u64 handle; + session_handle_t listener_handle; + session_handle_t handle; uword server_rx_fifo; uword server_tx_fifo; u64 segment_handle; @@ -404,7 +404,7 @@ typedef struct session_accepted_reply_msg_ { u32 context; i32 retval; - u64 handle; + session_handle_t handle; } __clib_packed session_accepted_reply_msg_t; typedef struct session_connect_msg_ @@ -444,7 +444,7 @@ typedef struct session_connected_msg_ { u32 context; i32 retval; - u64 handle; + session_handle_t handle; uword server_rx_fifo; uword server_tx_fifo; u64 segment_handle; @@ -474,33 +474,33 @@ typedef struct session_disconnected_msg_ { u32 client_index; u32 context; - u64 handle; + session_handle_t handle; } __clib_packed session_disconnected_msg_t; typedef struct session_disconnected_reply_msg_ { u32 context; i32 retval; - u64 handle; + session_handle_t handle; } __clib_packed session_disconnected_reply_msg_t; typedef struct session_reset_msg_ { u32 client_index; u32 context; - u64 handle; + session_handle_t handle; } __clib_packed session_reset_msg_t; typedef struct session_reset_reply_msg_ { u32 context; i32 retval; - u64 handle; + session_handle_t handle; } __clib_packed session_reset_reply_msg_t; typedef struct session_req_worker_update_msg_ { - u64 session_handle; + session_handle_t session_handle; } __clib_packed session_req_worker_update_msg_t; /* NOTE: using u16 for wrk indices because message needs to fit in 18B */ @@ -509,12 +509,12 @@ typedef struct session_worker_update_msg_ u32 client_index; u16 wrk_index; u16 req_wrk_index; - u64 handle; + session_handle_t handle; } __clib_packed session_worker_update_msg_t; typedef struct session_worker_update_reply_msg_ { - u64 handle; + session_handle_t handle; uword rx_fifo; uword tx_fifo; u64 segment_handle; diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index c2d0d3b0cf3..befdb7c7002 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -58,10 +58,10 @@ void app_worker_free (app_worker_t * app_wrk) { application_t *app = application_get (app_wrk->app_index); + session_handle_t handle, *handles = 0, *sh; vnet_unlisten_args_t _a, *a = &_a; - u64 handle, *handles = 0, *sm_indices = 0; segment_manager_t *sm; - session_handle_t *sh; + u64 *sm_indices = 0; session_t *ls; u32 sm_index; int i; @@ -853,7 +853,7 @@ u8 * format_app_worker_listener (u8 * s, va_list * args) { app_worker_t *app_wrk = va_arg (*args, app_worker_t *); - u64 handle = va_arg (*args, u64); + session_handle_t handle = va_arg (*args, u64); u32 sm_index = va_arg (*args, u32); int verbose = va_arg (*args, int); session_t *listener; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e2fd1644fbd..67e7ee39001 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -97,6 +97,13 @@ session_send_io_evt_to_thread_custom (void *data, u32 thread_index, } int +session_program_tx_io_evt (session_handle_tu_t sh, session_evt_type_t evt_type) +{ + return session_send_evt_to_thread ((void *) &sh.session_index, 0, + (u32) sh.thread_index, evt_type); +} + +int session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type) { /* only events supported are disconnect, shutdown and reset */ @@ -266,7 +273,7 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) app_worker_t *app_wrk; app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (!app_wrk) + if (PREDICT_FALSE (!app_wrk)) { if (ntf == SESSION_CLEANUP_TRANSPORT) return; @@ -1589,6 +1596,37 @@ session_reset (session_t * s) session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET); } +void +session_detach_app (session_t *s) +{ + if (s->session_state < SESSION_STATE_TRANSPORT_CLOSING) + { + session_close (s); + } + else if (s->session_state < SESSION_STATE_TRANSPORT_DELETED) + { + transport_connection_t *tc; + + /* Transport is closing but it's not yet deleted. Confirm close and + * subsequently detach transport from session and enqueue a session + * cleanup notification. Transport closed and cleanup notifications are + * going to be dropped by session layer apis */ + transport_close (session_get_transport_proto (s), s->connection_index, + s->thread_index); + tc = session_get_transport (s); + tc->s_index = SESSION_INVALID_INDEX; + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); + session_cleanup_notify (s, SESSION_CLEANUP_SESSION); + } + else + { + session_cleanup_notify (s, SESSION_CLEANUP_SESSION); + } + + s->flags |= SESSION_F_APP_CLOSED; + s->app_wrk_index = APP_INVALID_INDEX; +} + /** * Notify transport the session can be half-disconnected. * diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 78158d5f3ed..a5604bf8725 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -392,44 +392,37 @@ session_get_if_valid (u64 si, u32 thread_index) } always_inline session_t * -session_get_from_handle (session_handle_t handle) +session_get_from_handle (session_handle_tu_t handle) { session_main_t *smm = &session_main; - u32 session_index, thread_index; - session_parse_handle (handle, &session_index, &thread_index); - return pool_elt_at_index (smm->wrk[thread_index].sessions, session_index); + return pool_elt_at_index (smm->wrk[handle.thread_index].sessions, + handle.session_index); } always_inline session_t * -session_get_from_handle_if_valid (session_handle_t handle) +session_get_from_handle_if_valid (session_handle_tu_t handle) { - u32 session_index, thread_index; - session_parse_handle (handle, &session_index, &thread_index); - return session_get_if_valid (session_index, thread_index); + return session_get_if_valid (handle.session_index, handle.thread_index); } -u64 session_segment_handle (session_t * s); - /** * Get session from handle and avoid pool validation if no same thread * * Peekers are fine because pool grows with barrier (see @ref session_alloc) */ always_inline session_t * -session_get_from_handle_safe (u64 handle) +session_get_from_handle_safe (session_handle_tu_t handle) { - u32 thread_index = session_thread_from_handle (handle); - session_worker_t *wrk = &session_main.wrk[thread_index]; + session_worker_t *wrk = &session_main.wrk[handle.thread_index]; - if (thread_index == vlib_get_thread_index ()) + if (handle.thread_index == vlib_get_thread_index ()) { - return pool_elt_at_index (wrk->sessions, - session_index_from_handle (handle)); + return pool_elt_at_index (wrk->sessions, handle.session_index); } else { /* Don't use pool_elt_at index to avoid pool bitmap reallocs */ - return wrk->sessions + session_index_from_handle (handle); + return wrk->sessions + handle.session_index; } } @@ -455,17 +448,19 @@ int session_stop_listen (session_t * s); void session_half_close (session_t *s); void session_close (session_t * s); void session_reset (session_t * s); +void session_detach_app (session_t *s); void session_transport_half_close (session_t *s); void session_transport_close (session_t * s); void session_transport_reset (session_t * s); void session_transport_cleanup (session_t * s); -int session_send_io_evt_to_thread (svm_fifo_t * f, - session_evt_type_t evt_type); int session_enqueue_notify (session_t *s); int session_dequeue_notify (session_t * s); int session_enqueue_notify_cl (session_t *s); +int session_send_io_evt_to_thread (svm_fifo_t *f, session_evt_type_t evt_type); int session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type); +int session_program_tx_io_evt (session_handle_tu_t sh, + session_evt_type_t evt_type); void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args); void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, @@ -478,6 +473,7 @@ void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); int session_transport_attribute (session_t *s, u8 is_get, transport_endpt_attr_t *attr); +u64 session_segment_handle (session_t *s); u8 *format_session (u8 * s, va_list * args); uword unformat_session (unformat_input_t * input, va_list * args); @@ -654,8 +650,8 @@ session_vlib_thread_is_cl_thread (void) * Listen sessions */ -always_inline u64 -listen_session_get_handle (session_t * s) +always_inline session_handle_t +listen_session_get_handle (session_t *s) { ASSERT (s->session_state == SESSION_STATE_LISTENING || session_get_transport_proto (s) == TRANSPORT_PROTO_QUIC); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 2ecb464e38c..48eb932a2c9 100644 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -711,9 +711,10 @@ done: VL_API_APP_WORKER_ADD_DEL_REPLY, ((!rv && mp->is_add) ? vec_len (args.segment->name) : 0), ({ rmp->is_add = mp->is_add; - rmp->wrk_index = clib_host_to_net_u32 (args.wrk_map_index); + rmp->wrk_index = mp->wrk_index; if (!rv && mp->is_add) { + rmp->wrk_index = clib_host_to_net_u32 (args.wrk_map_index); rmp->segment_handle = clib_host_to_net_u64 (args.segment_handle); rmp->app_event_queue_address = fifo_segment_msg_q_offset ((fifo_segment_t *) args.segment, 0); @@ -1465,10 +1466,11 @@ done: rmp = &msg.worker_add_del_reply; rmp->retval = rv; rmp->is_add = mp->is_add; + rmp->wrk_index = mp->wrk_index; rmp->api_client_handle = sapi_handle; - rmp->wrk_index = args.wrk_map_index; if (!rv && mp->is_add) { + rmp->wrk_index = args.wrk_map_index; rmp->segment_handle = args.segment_handle; /* No segment name and size. This supports only memfds */ rmp->app_event_queue_address = diff --git a/src/vnet/session/session_input.c b/src/vnet/session/session_input.c index 41e8beb8abc..73b777127fd 100644 --- a/src/vnet/session/session_input.c +++ b/src/vnet/session/session_input.c @@ -154,8 +154,7 @@ app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index, old_state = s->session_state; if (app->cb_fns.session_accept_callback (s)) { - session_close (s); - s->app_wrk_index = SESSION_INVALID_INDEX; + session_detach_app (s); break; } if (is_builtin) @@ -184,8 +183,7 @@ app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index, break; if (rv) { - session_close (s); - s->app_wrk_index = SESSION_INVALID_INDEX; + session_detach_app (s); break; } if (old_state >= SESSION_STATE_TRANSPORT_CLOSING) @@ -225,13 +223,20 @@ app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index, break; case SESSION_CTRL_EVT_TRANSPORT_CLOSED: s = session_get (evt->session_index, thread_index); + /* Notification enqueued before session was refused by app */ + if (PREDICT_FALSE (s->app_wrk_index == APP_INVALID_INDEX)) + break; if (app->cb_fns.session_transport_closed_callback) app->cb_fns.session_transport_closed_callback (s); break; case SESSION_CTRL_EVT_CLEANUP: s = session_get (evt->as_u64[0] & 0xffffffff, thread_index); - if (app->cb_fns.session_cleanup_callback) - app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32); + /* Notification enqueued before session was refused by app */ + if (PREDICT_TRUE (s->app_wrk_index != APP_INVALID_INDEX)) + { + if (app->cb_fns.session_cleanup_callback) + app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32); + } if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION) break; uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s); diff --git a/src/vnet/session/session_lookup.c b/src/vnet/session/session_lookup.c index 9d028dbb28c..ff20bc2d835 100644 --- a/src/vnet/session/session_lookup.c +++ b/src/vnet/session/session_lookup.c @@ -1184,7 +1184,6 @@ session_lookup_connection_wt6 (u32 fib_index, ip6_address_t * lcl, rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6); if (rv == 0) { - ASSERT ((u32) (kv6.value >> 32) == thread_index); if (PREDICT_FALSE ((u32) (kv6.value >> 32) != thread_index)) { *result = SESSION_LOOKUP_RESULT_WRONG_THREAD; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 0b9a5b4c4c0..0ec158fb429 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -491,15 +491,13 @@ session_mq_reset_reply_handler (void *data) app_worker_t *app_wrk; session_t *s; application_t *app; - u32 index, thread_index; mp = (session_reset_reply_msg_t *) data; app = application_lookup (mp->context); if (!app) return; - session_parse_handle (mp->handle, &index, &thread_index); - s = session_get_if_valid (index, thread_index); + s = session_get_from_handle_if_valid (mp->handle); /* No session or not the right session */ if (!s || s->session_state < SESSION_STATE_TRANSPORT_CLOSING) diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 8ec972da832..5e650727d61 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -25,6 +25,19 @@ #define SESSION_CTRL_MSG_TX_MAX_SIZE 160 #define SESSION_NODE_FRAME_SIZE 128 +typedef u8 session_type_t; +typedef u64 session_handle_t; + +typedef union session_handle_tu_ +{ + session_handle_t handle; + struct + { + u32 session_index; + u32 thread_index; + }; +} __attribute__ ((__transparent_union__)) session_handle_tu_t; + #define foreach_session_endpoint_fields \ foreach_transport_endpoint_cfg_fields \ _(u8, transport_proto) \ @@ -125,9 +138,6 @@ session_endpoint_is_zero (session_endpoint_t * sep) return ip_is_zero (&sep->ip, sep->is_ip4); } -typedef u8 session_type_t; -typedef u64 session_handle_t; - typedef enum { SESSION_CLEANUP_TRANSPORT, @@ -144,19 +154,19 @@ typedef enum session_ft_action_ /* * Session states */ -#define foreach_session_state \ - _(CREATED, "created") \ - _(LISTENING, "listening") \ - _(CONNECTING, "connecting") \ - _(ACCEPTING, "accepting") \ - _(READY, "ready") \ - _(OPENED, "opened") \ - _(TRANSPORT_CLOSING, "transport-closing") \ - _(CLOSING, "closing") \ - _(APP_CLOSED, "app-closed") \ - _(TRANSPORT_CLOSED, "transport-closed") \ - _(CLOSED, "closed") \ - _(TRANSPORT_DELETED, "transport-deleted") \ +#define foreach_session_state \ + _ (CREATED, "created") \ + _ (LISTENING, "listening") \ + _ (CONNECTING, "connecting") \ + _ (ACCEPTING, "accepting") \ + _ (READY, "ready") \ + _ (OPENED, "opened") \ + _ (TRANSPORT_CLOSING, "transport-closing") \ + _ (CLOSING, "closing") \ + _ (APP_CLOSED, "app-closed") \ + _ (TRANSPORT_CLOSED, "transport-closed") \ + _ (CLOSED, "closed") \ + _ (TRANSPORT_DELETED, "transport-deleted") typedef enum { @@ -164,7 +174,7 @@ typedef enum foreach_session_state #undef _ SESSION_N_STATES, -} session_state_t; +} __clib_packed session_state_t; #define foreach_session_flag \ _ (RX_EVT, "rx-event") \ @@ -198,23 +208,30 @@ typedef struct session_ svm_fifo_t *rx_fifo; svm_fifo_t *tx_fifo; + union + { + session_handle_t handle; + struct + { + /** Index in thread pool where session was allocated */ + u32 session_index; + + /** Index of the thread that allocated the session */ + u32 thread_index; + }; + }; + /** Type built from transport and network protocol types */ session_type_t session_type; /** State in session layer state machine. See @ref session_state_t */ - volatile u8 session_state; - - /** Index in thread pool where session was allocated */ - u32 session_index; + volatile session_state_t session_state; /** Index of the app worker that owns the session */ u32 app_wrk_index; - /** Index of the thread that allocated the session */ - u8 thread_index; - /** Session flags. See @ref session_flags_t */ - u32 flags; + session_flags_t flags; /** Index of the transport connection associated to the session */ u32 connection_index; @@ -299,45 +316,35 @@ session_tx_is_dgram (session_t * s) always_inline session_handle_t session_handle (session_t * s) { - return ((u64) s->thread_index << 32) | (u64) s->session_index; + return s->handle; } always_inline u32 -session_index_from_handle (session_handle_t handle) +session_index_from_handle (session_handle_tu_t handle) { - return handle & 0xFFFFFFFF; + return handle.session_index; } always_inline u32 -session_thread_from_handle (session_handle_t handle) +session_thread_from_handle (session_handle_tu_t handle) { - return handle >> 32; + return handle.thread_index; } always_inline void -session_parse_handle (session_handle_t handle, u32 * index, - u32 * thread_index) +session_parse_handle (session_handle_tu_t handle, u32 *index, + u32 *thread_index) { - *index = session_index_from_handle (handle); - *thread_index = session_thread_from_handle (handle); + *index = handle.session_index; + *thread_index = handle.thread_index; } static inline session_handle_t session_make_handle (u32 session_index, u32 data) { - return (((u64) data << 32) | (u64) session_index); -} - -always_inline u32 -session_handle_index (session_handle_t ho_handle) -{ - return (ho_handle & 0xffffffff); -} - -always_inline u32 -session_handle_data (session_handle_t ho_handle) -{ - return (ho_handle >> 32); + return ((session_handle_tu_t){ .session_index = session_index, + .thread_index = data }) + .handle; } typedef enum |