diff options
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r-- | src/vnet/session/session.c | 517 |
1 files changed, 243 insertions, 274 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 228234ceefc..b494041f9eb 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -240,18 +240,26 @@ session_is_valid (u32 si, u8 thread_index) || s->session_state <= SESSION_STATE_LISTENING) return 1; - if (s->session_state == SESSION_STATE_CONNECTING && + if ((s->session_state == SESSION_STATE_CONNECTING || + s->session_state == SESSION_STATE_TRANSPORT_CLOSED) && (s->flags & SESSION_F_HALF_OPEN)) return 1; tc = session_get_transport (s); - if (s->connection_index != tc->c_index - || s->thread_index != tc->thread_index || tc->s_index != si) + if (s->connection_index != tc->c_index || + s->thread_index != tc->thread_index || tc->s_index != si) return 0; return 1; } +void +session_cleanup (session_t *s) +{ + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); +} + static void session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) { @@ -259,16 +267,21 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf) app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (!app_wrk) - return; + { + if (ntf == SESSION_CLEANUP_TRANSPORT) + return; + + session_cleanup (s); + return; + } app_worker_cleanup_notify (app_wrk, s, ntf); } void -session_free_w_fifos (session_t * s) +session_program_cleanup (session_t *s) { + ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED); session_cleanup_notify (s, SESSION_CLEANUP_SESSION); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); } /** @@ -285,7 +298,7 @@ session_delete (session_t * s) if ((rv = session_lookup_del_session (s))) clib_warning ("session %u hash delete rv %d", s->session_index, rv); - session_free_w_fifos (s); + session_program_cleanup (s); } void @@ -307,7 +320,7 @@ session_cleanup_half_open (session_handle_t ho_handle) transport_cleanup (session_get_transport_proto (ho), ho->connection_index, ho->app_index /* overloaded */); } - else + else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED) { /* Cleanup half-open session lookup table if need be */ if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED) @@ -333,7 +346,8 @@ session_half_open_free (session_t *ho) app_wrk = app_worker_get_if_valid (ho->app_wrk_index); if (app_wrk) app_worker_del_half_open (app_wrk, ho); - session_free (ho); + else + session_free (ho); } static void @@ -354,6 +368,7 @@ session_half_open_delete_notify (transport_connection_t *tc) if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) session_lookup_del_half_open (tc); } + session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED); /* Notification from ctrl thread accepted without rpc */ if (tc->thread_index == transport_cl_thread ()) @@ -558,10 +573,158 @@ session_fifo_tuning (session_t * s, svm_fifo_t * f, } } +void +session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index) +{ + u8 need_interrupt; + + ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ()); + need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf); + wrk->app_wrks_pending_ntf = + clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1); + + if (need_interrupt) + vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index); +} + +always_inline void +session_program_io_event (app_worker_t *app_wrk, session_t *s, + session_evt_type_t et, u8 is_cl) +{ + if (is_cl) + { + /* Special events for connectionless sessions */ + et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX; + + ASSERT (s->thread_index == 0); + session_event_t evt = { + .event_type = et, + .session_handle = session_handle (s), + }; + + app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt); + } + else + { + app_worker_add_event (app_wrk, s, et); + } +} + +static inline int +session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f, + session_evt_type_t evt_type) +{ + app_worker_t *app_wrk; + application_t *app; + u8 is_cl; + int i; + + app = application_get (app_index); + if (!app) + return -1; + + is_cl = s->thread_index != vlib_get_thread_index (); + for (i = 0; i < f->shr->n_subscribers; i++) + { + app_wrk = application_get_worker (app, f->shr->subscribers[i]); + if (!app_wrk) + continue; + session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0); + } + + return 0; +} + +always_inline int +session_enqueue_notify_inline (session_t *s, u8 is_cl) +{ + app_worker_t *app_wrk; + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo, + SESSION_IO_EVT_RX); + + return 0; +} + +int +session_enqueue_notify (session_t *s) +{ + return session_enqueue_notify_inline (s, 0 /* is_cl */); +} + +int +session_enqueue_notify_cl (session_t *s) +{ + return session_enqueue_notify_inline (s, 1 /* is_cl */); +} + +int +session_dequeue_notify (session_t *s) +{ + app_worker_t *app_wrk; + u8 is_cl; + + /* Unset as soon as event is requested */ + svm_fifo_clear_deq_ntf (s->tx_fifo); + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (PREDICT_FALSE (!app_wrk)) + return -1; + + is_cl = s->session_state == SESSION_STATE_LISTENING || + s->session_state == SESSION_STATE_OPENED; + session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0); + + if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo))) + return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo, + SESSION_IO_EVT_TX); + + return 0; +} + +/** + * Flushes queue of sessions that are to be notified of new data + * enqueued events. + * + * @param transport_proto transport protocol for which queue to be flushed + * @param thread_index Thread index for which the flush is to be performed. + * @return 0 on success or a positive number indicating the number of + * failures due to API queue being full. + */ +void +session_main_flush_enqueue_events (transport_proto_t transport_proto, + u32 thread_index) +{ + session_worker_t *wrk = session_main_get_worker (thread_index); + session_handle_t *handles; + session_t *s; + u32 i; + + handles = wrk->session_to_enqueue[transport_proto]; + + for (i = 0; i < vec_len (handles); i++) + { + s = session_get_from_handle (handles[i]); + session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, + 0 /* TODO/not needed */); + session_enqueue_notify_inline (s, + s->thread_index != thread_index ? 1 : 0); + } + + vec_reset_length (handles); + wrk->session_to_enqueue[transport_proto] = handles; +} + /* - * Enqueue data for delivery to session peer. Does not notify peer of enqueue - * event but on request can queue notification events for later delivery by - * calling stream_server_flush_enqueue_events(). + * Enqueue data for delivery to app. If requested, it queues app notification + * event for later delivery. * * @param tc Transport connection which is to be enqueued data * @param b Buffer to be enqueued @@ -610,15 +773,14 @@ session_enqueue_stream_connection (transport_connection_t * tc, if (queue_event) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + session_worker_t *wrk = session_main_get_worker (s->thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -627,10 +789,11 @@ session_enqueue_stream_connection (transport_connection_t * tc, return enqueued; } -int -session_enqueue_dgram_connection (session_t * s, - session_dgram_hdr_t * hdr, - vlib_buffer_t * b, u8 proto, u8 queue_event) +always_inline int +session_enqueue_dgram_connection_inline (session_t *s, + session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event, u32 is_cl) { int rv; @@ -639,12 +802,10 @@ session_enqueue_dgram_connection (session_t * s, if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT))) { - /* *INDENT-OFF* */ svm_fifo_seg_t segs[2] = { { (u8 *) hdr, sizeof (*hdr) }, { vlib_buffer_get_current (b), b->current_length } }; - /* *INDENT-ON* */ rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2, 0 /* allow_partial */ ); @@ -676,15 +837,16 @@ session_enqueue_dgram_connection (session_t * s, if (queue_event && rv > 0) { - /* Queue RX event on this fifo. Eventually these will need to be flushed - * by calling stream_server_flush_enqueue_events () */ - session_worker_t *wrk; - - wrk = session_main_get_worker (s->thread_index); + /* Queue RX event on this fifo. Eventually these will need to be + * flushed by calling @ref session_main_flush_enqueue_events () */ if (!(s->flags & SESSION_F_RX_EVT)) { + u32 thread_index = + is_cl ? vlib_get_thread_index () : s->thread_index; + session_worker_t *wrk = session_main_get_worker (thread_index); + ASSERT (s->thread_index == vlib_get_thread_index () || is_cl); s->flags |= SESSION_F_RX_EVT; - vec_add1 (wrk->session_to_enqueue[proto], s->session_index); + vec_add1 (wrk->session_to_enqueue[proto], session_handle (s)); } session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0); @@ -693,6 +855,23 @@ session_enqueue_dgram_connection (session_t * s, } int +session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 0 /* is_cl */); +} + +int +session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 1 /* is_cl */); +} + +int session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes) { @@ -715,189 +894,6 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes) return rv; } -static inline int -session_notify_subscribers (u32 app_index, session_t * s, - svm_fifo_t * f, session_evt_type_t evt_type) -{ - app_worker_t *app_wrk; - application_t *app; - int i; - - app = application_get (app_index); - if (!app) - return -1; - - for (i = 0; i < f->shr->n_subscribers; i++) - { - app_wrk = application_get_worker (app, f->shr->subscribers[i]); - if (!app_wrk) - continue; - if (app_worker_lock_and_send_event (app_wrk, s, evt_type)) - return -1; - } - - return 0; -} - -/** - * Notify session peer that new data has been enqueued. - * - * @param s Stream session for which the event is to be generated. - * @param lock Flag to indicate if call should lock message queue. - * - * @return 0 on success or negative number if failed to send notification. - */ -static inline int -session_enqueue_notify_inline (session_t * s) -{ - app_worker_t *app_wrk; - u32 session_index; - u8 n_subscribers; - u32 thread_index; - - session_index = s->session_index; - thread_index = s->thread_index; - n_subscribers = svm_fifo_n_subscribers (s->rx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - { - SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); - return 0; - } - - SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo)); - - s->flags &= ~SESSION_F_RX_EVT; - - /* Application didn't confirm accept yet */ - if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING)) - return 0; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_RX))) - return -1; - - if (PREDICT_FALSE (n_subscribers)) - { - s = session_get (session_index, thread_index); - return session_notify_subscribers (app_wrk->app_index, s, - s->rx_fifo, SESSION_IO_EVT_RX); - } - - return 0; -} - -int -session_enqueue_notify (session_t * s) -{ - return session_enqueue_notify_inline (s); -} - -static void -session_enqueue_notify_rpc (void *arg) -{ - u32 session_index = pointer_to_uword (arg); - session_t *s; - - s = session_get_if_valid (session_index, vlib_get_thread_index ()); - if (!s) - return; - - session_enqueue_notify (s); -} - -/** - * Like session_enqueue_notify, but can be called from a thread that does not - * own the session. - */ -void -session_enqueue_notify_thread (session_handle_t sh) -{ - u32 thread_index = session_thread_from_handle (sh); - u32 session_index = session_index_from_handle (sh); - - /* - * Pass session index (u32) as opposed to handle (u64) in case pointers - * are not 64-bit. - */ - session_send_rpc_evt_to_thread (thread_index, - session_enqueue_notify_rpc, - uword_to_pointer (session_index, void *)); -} - -int -session_dequeue_notify (session_t * s) -{ - app_worker_t *app_wrk; - - svm_fifo_clear_deq_ntf (s->tx_fifo); - - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (PREDICT_FALSE (!app_wrk)) - return -1; - - if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, - SESSION_IO_EVT_TX))) - return -1; - - if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers)) - return session_notify_subscribers (app_wrk->app_index, s, - s->tx_fifo, SESSION_IO_EVT_TX); - - return 0; -} - -/** - * Flushes queue of sessions that are to be notified of new data - * enqueued events. - * - * @param thread_index Thread index for which the flush is to be performed. - * @return 0 on success or a positive number indicating the number of - * failures due to API queue being full. - */ -int -session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) -{ - session_worker_t *wrk = session_main_get_worker (thread_index); - session_t *s; - int i, errors = 0; - u32 *indices; - - indices = wrk->session_to_enqueue[transport_proto]; - - for (i = 0; i < vec_len (indices); i++) - { - s = session_get_if_valid (indices[i], thread_index); - if (PREDICT_FALSE (!s)) - { - errors++; - continue; - } - - session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, - 0 /* TODO/not needed */ ); - - if (PREDICT_FALSE (session_enqueue_notify_inline (s))) - errors++; - } - - vec_reset_length (indices); - wrk->session_to_enqueue[transport_proto] = indices; - - return errors; -} - -int -session_main_flush_all_enqueue_events (u8 transport_proto) -{ - vlib_thread_main_t *vtm = vlib_get_thread_main (); - int i, errors = 0; - for (i = 0; i < 1 + vtm->n_threads; i++) - errors += session_main_flush_enqueue_events (transport_proto, i); - return errors; -} - int session_stream_connect_notify (transport_connection_t * tc, session_error_t err) @@ -951,43 +947,20 @@ session_stream_connect_notify (transport_connection_t * tc, return 0; } -typedef union session_switch_pool_reply_args_ -{ - struct - { - u32 session_index; - u16 thread_index; - u8 is_closed; - }; - u64 as_u64; -} session_switch_pool_reply_args_t; - -STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword), - "switch pool reply args size"); - static void -session_switch_pool_reply (void *arg) +session_switch_pool_closed_rpc (void *arg) { - session_switch_pool_reply_args_t rargs; + session_handle_t sh; session_t *s; - rargs.as_u64 = pointer_to_uword (arg); - s = session_get_if_valid (rargs.session_index, rargs.thread_index); + sh = pointer_to_uword (arg); + s = session_get_from_handle_if_valid (sh); if (!s) return; - /* Session closed during migration. Clean everything up */ - if (rargs.is_closed) - { - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); - segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); - session_free (s); - return; - } - - /* Notify app that it has data on the new session */ - session_enqueue_notify (s); + transport_cleanup (session_get_transport_proto (s), s->connection_index, + s->thread_index); + session_cleanup (s); } typedef struct _session_switch_pool_args @@ -1005,8 +978,7 @@ static void session_switch_pool (void *cb_args) { session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; - session_switch_pool_reply_args_t rargs; - session_handle_t new_sh; + session_handle_t sh, new_sh; segment_manager_t *sm; app_worker_t *app_wrk; session_t *s; @@ -1014,37 +986,32 @@ session_switch_pool (void *cb_args) ASSERT (args->thread_index == vlib_get_thread_index ()); s = session_get (args->session_index, args->thread_index); - /* Check if session closed during migration */ - rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING; + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + goto app_closed; - transport_cleanup (session_get_transport_proto (s), s->connection_index, - s->thread_index); + /* Cleanup fifo segment slice state for fifos */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_detach_fifo (sm, &s->rx_fifo); + segment_manager_detach_fifo (sm, &s->tx_fifo); - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (app_wrk) - { - /* Cleanup fifo segment slice state for fifos */ - sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_detach_fifo (sm, &s->rx_fifo); - segment_manager_detach_fifo (sm, &s->tx_fifo); + /* Check if session closed during migration */ + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + goto app_closed; - /* Notify app, using old session, about the migration event */ - if (!rargs.is_closed) - { - new_sh = session_make_handle (args->new_session_index, - args->new_thread_index); - app_worker_migrate_notify (app_wrk, s, new_sh); - } - } + new_sh = + session_make_handle (args->new_session_index, args->new_thread_index); + app_worker_migrate_notify (app_wrk, s, new_sh); - /* Trigger app read and fifo updates on the new thread */ - rargs.session_index = args->new_session_index; - rargs.thread_index = args->new_thread_index; - session_send_rpc_evt_to_thread (args->new_thread_index, - session_switch_pool_reply, - uword_to_pointer (rargs.as_u64, void *)); + clib_mem_free (cb_args); + return; - session_free (s); +app_closed: + /* Session closed during migration. Clean everything up */ + sh = session_handle (s); + session_send_rpc_evt_to_thread (args->new_thread_index, + session_switch_pool_closed_rpc, + uword_to_pointer (sh, void *)); clib_mem_free (cb_args); } @@ -1184,6 +1151,7 @@ session_transport_delete_notify (transport_connection_t * tc) break; case SESSION_STATE_CLOSED: session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT); + session_set_state (s, SESSION_STATE_TRANSPORT_DELETED); session_delete (s); break; default: @@ -1633,7 +1601,7 @@ session_transport_close (session_t * s) session_set_state (s, SESSION_STATE_CLOSED); /* If transport is already deleted, just free the session */ else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } @@ -1660,7 +1628,7 @@ session_transport_reset (session_t * s) if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) session_set_state (s, SESSION_STATE_CLOSED); else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED) - session_free_w_fifos (s); + session_program_cleanup (s); return; } @@ -2157,6 +2125,7 @@ session_node_enable_disable (u8 is_en) if (!sm->poll_main) continue; } + vlib_node_set_state (vm, session_input_node.index, mstate); vlib_node_set_state (vm, session_queue_node.index, state); } |