diff options
author | Florin Coras <fcoras@cisco.com> | 2017-10-02 00:18:51 -0700 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2017-10-16 21:41:11 +0000 |
commit | 3cbc04bea02fc60471dfe0c671ede3ca42c118c3 (patch) | |
tree | 6128beab7dfb01c6221da2f675078078170e75ac /src/vnet/session/session.h | |
parent | 0cb01bde499979066389975ba81670764914cbc2 (diff) |
udp: refactor udp code
Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session/session.h')
-rw-r--r-- | src/vnet/session/session.h | 149 |
1 files changed, 126 insertions, 23 deletions
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index b1a03d213e9..bd854d4b4c5 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -105,7 +105,7 @@ typedef CLIB_PACKED (struct { rpc_args_t rpc_args; }; u8 event_type; - u16 event_id; + u8 postponed; }) session_fifo_event_t; /* *INDENT-ON* */ @@ -128,17 +128,21 @@ struct _session_manager_main /** Per worker thread session pools */ stream_session_t **sessions; + /** Per worker-thread count of threads peeking into the session pool */ + u32 *session_peekers; + + /** Per worker-thread rw peekers locks */ + clib_spinlock_t *peekers_readers_locks; + clib_spinlock_t *peekers_write_locks; + /** Pool of listen sessions. Same type as stream sessions to ease lookups */ stream_session_t *listen_sessions[SESSION_N_TYPES]; - /** Sparse vector to map dst port to stream server */ - u16 *stream_server_by_dst_port[SESSION_N_TYPES]; - - /** per-worker enqueue epoch counters */ - u8 *current_enqueue_epoch; + /** Per-proto, per-worker enqueue epoch counters */ + u8 *current_enqueue_epoch[TRANSPORT_N_PROTO]; - /** Per-worker thread vector of sessions to enqueue */ - u32 **session_indices_to_enqueue_by_thread; + /** Per-proto, per-worker thread vector of sessions to enqueue */ + u32 **session_to_enqueue[TRANSPORT_N_PROTO]; /** per-worker tx buffer free lists */ u32 **tx_buffers; @@ -149,6 +153,9 @@ struct _session_manager_main /** per-worker active event vectors */ session_fifo_event_t **pending_event_vector; + /** per-worker postponed disconnects */ + session_fifo_event_t **pending_disconnects; + /** vpp fifo event queue */ unix_shared_memory_queue_t **vpp_event_queues; @@ -213,6 +220,8 @@ stream_session_is_valid (u32 si, u8 thread_index) return 1; } +stream_session_t *session_alloc (u32 thread_index); + always_inline stream_session_t * session_get (u32 si, u32 thread_index) { @@ -221,7 +230,7 @@ session_get (u32 si, u32 thread_index) } always_inline stream_session_t * -stream_session_get_if_valid (u64 si, u32 thread_index) +session_get_if_valid (u64 si, u32 thread_index) { if (thread_index >= vec_len (session_manager_main.sessions)) return 0; @@ -234,7 +243,7 @@ stream_session_get_if_valid (u64 si, u32 thread_index) } always_inline u64 -stream_session_handle (stream_session_t * s) +session_handle (stream_session_t * s) { return ((u64) s->thread_index << 32) | (u64) s->session_index; } @@ -267,6 +276,66 @@ session_get_from_handle (u64 handle) session_index_from_handle (handle)); } +/** + * Acquires a lock that blocks a session pool from expanding. + * + * This is typically used for safely peeking into other threads' + * pools in order to clone elements. Lock should be dropped as soon + * as possible by calling @ref session_pool_remove_peeker. + * + * NOTE: Avoid using pool_elt_at_index while the lock is held because + * it may lead to free elt bitmap expansion/contraction! + */ +always_inline void +session_pool_add_peeker (u32 thread_index) +{ + session_manager_main_t *smm = &session_manager_main; + if (thread_index == vlib_get_thread_index ()) + return; + clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]); + smm->session_peekers[thread_index] += 1; + if (smm->session_peekers[thread_index] == 1) + clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]); + clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]); +} + +always_inline void +session_pool_remove_peeker (u32 thread_index) +{ + session_manager_main_t *smm = &session_manager_main; + if (thread_index == vlib_get_thread_index ()) + return; + ASSERT (session_manager_main.session_peekers[thread_index] > 0); + clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]); + smm->session_peekers[thread_index] -= 1; + if (smm->session_peekers[thread_index] == 0) + clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]); + clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]); +} + +/** + * Get session from handle and 'lock' pool resize if not in same thread + * + * Caller should drop the peek 'lock' as soon as possible. + */ +always_inline stream_session_t * +session_get_from_handle_safe (u64 handle) +{ + session_manager_main_t *smm = &session_manager_main; + u32 thread_index = session_thread_from_handle (handle); + if (thread_index == vlib_get_thread_index ()) + { + return pool_elt_at_index (smm->sessions[thread_index], + session_index_from_handle (handle)); + } + else + { + session_pool_add_peeker (thread_index); + /* Don't use pool_elt_at index. See @ref session_pool_add_peeker */ + return smm->sessions[thread_index] + session_index_from_handle (handle); + } +} + always_inline stream_session_t * stream_session_listener_get (u8 sst, u64 si) { @@ -296,17 +365,52 @@ stream_session_rx_fifo_size (transport_connection_t * tc) return s->server_rx_fifo->nitems; } +always_inline u32 +session_get_index (stream_session_t * s) +{ + return (s - session_manager_main.sessions[s->thread_index]); +} + +always_inline stream_session_t * +session_clone_safe (u32 session_index, u32 thread_index) +{ + stream_session_t *old_s, *new_s; + u32 current_thread_index = vlib_get_thread_index (); + + /* If during the memcpy pool is reallocated AND the memory allocator + * decides to give the old chunk of memory to somebody in a hurry to + * scribble something on it, we have a problem. So add this thread as + * a session pool peeker. + */ + session_pool_add_peeker (thread_index); + new_s = session_alloc (current_thread_index); + old_s = session_manager_main.sessions[thread_index] + session_index; + clib_memcpy (new_s, old_s, sizeof (*new_s)); + session_pool_remove_peeker (thread_index); + new_s->thread_index = current_thread_index; + new_s->session_index = session_get_index (new_s); + return new_s; +} + +transport_connection_t *session_get_transport (stream_session_t * s); + u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc); +stream_session_t *session_alloc (u32 thread_index); int -stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, - u32 offset, u8 queue_event, u8 is_in_order); -int -stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, - u32 offset, u32 max_bytes); +session_enqueue_stream_connection (transport_connection_t * tc, + vlib_buffer_t * b, u32 offset, + u8 queue_event, u8 is_in_order); +int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, + u8 proto, u8 queue_event); +int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, + u32 offset, u32 max_bytes); u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes); -int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail); +int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail); +int session_dgram_connect_notify (transport_connection_t * tc, + u32 old_thread_index, + stream_session_t ** new_session); void stream_session_init_fifos_pointers (transport_connection_t * tc, u32 rx_pointer, u32 tx_pointer); @@ -314,12 +418,9 @@ void stream_session_accept_notify (transport_connection_t * tc); void stream_session_disconnect_notify (transport_connection_t * tc); void stream_session_delete_notify (transport_connection_t * tc); void stream_session_reset_notify (transport_connection_t * tc); -int -stream_session_accept (transport_connection_t * tc, u32 listener_index, - u8 notify); -int -stream_session_open (u32 app_index, session_endpoint_t * tep, - transport_connection_t ** tc); +int stream_session_accept (transport_connection_t * tc, u32 listener_index, + u8 notify); +int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque); int stream_session_listen (stream_session_t * s, session_endpoint_t * tep); int stream_session_stop_listen (stream_session_t * s); void stream_session_disconnect (stream_session_t * s); @@ -346,7 +447,7 @@ session_manager_get_vpp_event_queue (u32 thread_index) return session_manager_main.vpp_event_queues[thread_index]; } -int session_manager_flush_enqueue_events (u32 thread_index); +int session_manager_flush_enqueue_events (u8 proto, u32 thread_index); always_inline u64 listen_session_get_handle (stream_session_t * s) @@ -400,6 +501,8 @@ listen_session_del (stream_session_t * s) pool_put (session_manager_main.listen_sessions[s->session_type], s); } +transport_connection_t *listen_session_get_transport (stream_session_t * s); + int listen_session_get_local_session_endpoint (stream_session_t * listener, session_endpoint_t * sep); |