diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/vcl/vcl_locked.c | 9 | ||||
-rw-r--r-- | src/vnet/session/application.c | 55 | ||||
-rw-r--r-- | src/vnet/session/application.h | 5 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 74 | ||||
-rw-r--r-- | src/vnet/session/session.c | 13 | ||||
-rw-r--r-- | src/vnet/session/session.h | 3 | ||||
-rw-r--r-- | src/vnet/session/session_api.c | 21 | ||||
-rw-r--r-- | src/vnet/udp/udp_input.c | 31 |
8 files changed, 171 insertions, 40 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index 563d97ef804..412db8def3b 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -847,13 +847,14 @@ vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls) vls_shared_data_pool_runlock (); - if (s->rx_fifo) + if (s->session_state == VCL_STATE_LISTEN) { - vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo); + s->session_state = VCL_STATE_LISTEN_NO_MQ; + s->rx_fifo = s->tx_fifo = 0; } - else if (s->session_state == VCL_STATE_LISTEN) + else if (s->rx_fifo) { - s->session_state = VCL_STATE_LISTEN_NO_MQ; + vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo); } } diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index fdd5a0a67cd..5c8efe1c438 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -52,6 +52,7 @@ static void app_listener_free (application_t * app, app_listener_t * app_listener) { clib_bitmap_free (app_listener->workers); + vec_free (app_listener->cl_listeners); if (CLIB_DEBUG) clib_memset (app_listener, 0xfa, sizeof (*app_listener)); pool_put (app->listeners, app_listener); @@ -321,6 +322,13 @@ app_listener_get_local_session (app_listener_t * al) return listen_session_get (al->local_index); } +session_t * +app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_map_index) +{ + u32 si = vec_elt (al->cl_listeners, wrk_map_index); + return session_get (si, 0 /* listener thread */); +} + static app_worker_map_t * app_worker_map_alloc (application_t * app) { @@ -1017,6 +1025,53 @@ application_listener_select_worker (session_t * ls) return app_listener_select_worker (app, al); } +always_inline u32 +app_listener_cl_flow_hash (session_dgram_hdr_t *hdr) +{ + u32 hash = 0; + + if (hdr->is_ip4) + { + hash = clib_crc32c_u32 (hash, hdr->rmt_ip.ip4.as_u32); + hash = clib_crc32c_u32 (hash, hdr->lcl_ip.ip4.as_u32); + hash = clib_crc32c_u16 (hash, hdr->rmt_port); + hash = clib_crc32c_u16 (hash, hdr->lcl_port); + } + else + { + hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[0]); + hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[1]); + hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[0]); + hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[1]); + hash = clib_crc32c_u16 (hash, hdr->rmt_port); + hash = clib_crc32c_u16 (hash, hdr->lcl_port); + } + + return hash; +} + +session_t * +app_listener_select_wrk_cl_session (session_t *ls, session_dgram_hdr_t *hdr) +{ + u32 wrk_map_index = 0; + application_t *app; + app_listener_t *al; + + app = application_get (ls->app_index); + al = app_listener_get (app, ls->al_index); + /* Crude test to check if only worker 0 is set */ + if (al->workers[0] != 1) + { + u32 hash = app_listener_cl_flow_hash (hdr); + hash %= vec_len (al->workers) * sizeof (uword); + wrk_map_index = clib_bitmap_next_set (al->workers, hash); + if (wrk_map_index == ~0) + wrk_map_index = clib_bitmap_first_set (al->workers); + } + + return app_listener_get_wrk_cl_session (al, wrk_map_index); +} + int application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 5505d91ea09..7c63b90ac5a 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -106,6 +106,8 @@ typedef struct app_listener_ session_handle_t ls_handle; /**< session handle of the local or global listening session that also identifies the app listener */ + u32 *cl_listeners; /**< vector that maps app workers to their + cl sessions with fifos */ } app_listener_t; typedef enum app_rx_mq_flags_ @@ -254,6 +256,8 @@ void app_listener_cleanup (app_listener_t * app_listener); session_handle_t app_listener_handle (app_listener_t * app_listener); app_listener_t *app_listener_lookup (application_t * app, session_endpoint_cfg_t * sep); +session_t *app_listener_select_wrk_cl_session (session_t *ls, + session_dgram_hdr_t *hdr); /** * Get app listener handle for listening session @@ -280,6 +284,7 @@ app_listener_t *app_listener_get_w_handle (session_handle_t handle); app_listener_t *app_listener_get_w_session (session_t * ls); session_t *app_listener_get_session (app_listener_t * al); session_t *app_listener_get_local_session (app_listener_t * al); +session_t *app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_index); application_t *application_get (u32 index); application_t *application_get_if_valid (u32 index); diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 960d916133f..ea5a4db7fd5 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -186,12 +186,67 @@ app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s) } int +app_worker_alloc_wrk_cl_session (app_worker_t *app_wrk, session_t *ls) +{ + svm_fifo_t *rx_fifo = 0, *tx_fifo = 0; + segment_manager_t *sm; + session_handle_t lsh; + app_listener_t *al; + session_t *s; + + al = app_listener_get_w_session (ls); + sm = app_worker_get_listen_segment_manager (app_wrk, ls); + lsh = session_handle (ls); + + s = session_alloc (0 /* listener on main worker */); + session_set_state (s, SESSION_STATE_LISTENING); + s->flags |= SESSION_F_IS_CLESS; + s->app_wrk_index = app_wrk->wrk_index; + ls = session_get_from_handle (lsh); + s->session_type = ls->session_type; + s->connection_index = ls->connection_index; + + segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo, + &tx_fifo); + + rx_fifo->shr->master_session_index = s->session_index; + rx_fifo->master_thread_index = s->thread_index; + + tx_fifo->shr->master_session_index = s->session_index; + tx_fifo->master_thread_index = s->thread_index; + + s->rx_fifo = rx_fifo; + s->tx_fifo = tx_fifo; + + vec_validate (al->cl_listeners, app_wrk->wrk_map_index); + al->cl_listeners[app_wrk->wrk_map_index] = s->session_index; + + return 0; +} + +void +app_worker_free_wrk_cl_session (app_worker_t *app_wrk, session_t *ls) +{ + app_listener_t *al; + session_t *s; + + al = app_listener_get_w_session (ls); + + s = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); + + al->cl_listeners[app_wrk->wrk_map_index] = SESSION_INVALID_INDEX; +} + +int app_worker_init_listener (app_worker_t * app_wrk, session_t * ls) { segment_manager_t *sm; /* Allocate segment manager. All sessions derived out of a listen session - * have fifos allocated by the same segment manager. */ + * have fifos allocated by the same segment manager. + * TODO(fcoras): limit memory consumption by cless listeners */ if (!(sm = app_worker_alloc_segment_manager (app_wrk))) return SESSION_E_ALLOC; @@ -202,12 +257,9 @@ app_worker_init_listener (app_worker_t * app_wrk, session_t * ls) hash_set (app_wrk->listeners_table, listen_session_get_handle (ls), segment_manager_index (sm)); - if (transport_connection_is_cless (session_get_transport (ls))) - { - if (ls->rx_fifo) - return SESSION_E_NOSUPPORT; - return app_worker_alloc_session_fifos (sm, ls); - } + if (ls->flags & SESSION_F_IS_CLESS) + return app_worker_alloc_wrk_cl_session (app_wrk, ls); + return 0; } @@ -276,12 +328,8 @@ app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls) if (PREDICT_FALSE (!sm_indexp)) return; - /* Dealloc fifos, if any (dgram listeners) */ - if (ls->rx_fifo) - { - segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo); - ls->tx_fifo = ls->rx_fifo = 0; - } + if (ls->flags & SESSION_F_IS_CLESS) + app_worker_free_wrk_cl_session (app_wrk, ls); /* Try to cleanup segment manager */ sm = segment_manager_get (*sm_indexp); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 2afc37a77f8..0d2e1b1615e 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -867,11 +867,22 @@ session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr, } int +session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, u8 queue_event) +{ + return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + queue_event, 1 /* is_cl */); +} + +int session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr, vlib_buffer_t *b, u8 proto, u8 queue_event) { - return session_enqueue_dgram_connection_inline (s, hdr, b, proto, + session_t *awls; + + awls = app_listener_select_wrk_cl_session (s, hdr); + return session_enqueue_dgram_connection_inline (awls, hdr, b, proto, queue_event, 1 /* is_cl */); } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index ebaad5a93a7..78158d5f3ed 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -495,6 +495,9 @@ int session_enqueue_dgram_connection (session_t * s, session_dgram_hdr_t * hdr, vlib_buffer_t * b, u8 proto, u8 queue_event); +int session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr, + vlib_buffer_t *b, u8 proto, + u8 queue_event); int session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr, vlib_buffer_t *b, u8 proto, diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 0574a58c723..84d44e9fa3e 100644 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -276,7 +276,7 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context, session_handle_t handle, int rv) { session_bound_msg_t m = { 0 }; - transport_endpoint_t tep; + transport_connection_t *ltc; fifo_segment_t *eq_seg; app_worker_t *app_wrk; application_t *app; @@ -298,23 +298,24 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context, else ls = app_listener_get_local_session (al); - session_get_endpoint (ls, &tep, 1 /* is_lcl */); - m.lcl_port = tep.port; - m.lcl_is_ip4 = tep.is_ip4; - clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip)); + ltc = session_get_transport (ls); + m.lcl_port = ltc->lcl_port; + m.lcl_is_ip4 = ltc->is_ip4; + clib_memcpy_fast (m.lcl_ip, <c->lcl_ip, sizeof (m.lcl_ip)); app = application_get (app_wrk->app_index); eq_seg = application_get_rx_mqs_segment (app); m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index); m.mq_index = ls->thread_index; - if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL && - ls->rx_fifo) + if (transport_connection_is_cless (ltc)) { + session_t *wrk_ls; m.mq_index = transport_cl_thread (); m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, m.mq_index); - m.rx_fifo = fifo_segment_fifo_offset (ls->rx_fifo); - m.tx_fifo = fifo_segment_fifo_offset (ls->tx_fifo); - m.segment_handle = session_segment_handle (ls); + wrk_ls = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index); + m.rx_fifo = fifo_segment_fifo_offset (wrk_ls->rx_fifo); + m.tx_fifo = fifo_segment_fifo_offset (wrk_ls->tx_fifo); + m.segment_handle = session_segment_handle (wrk_ls); } snd_msg: diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c index 66f6229efa6..9cabfdf73cd 100644 --- a/src/vnet/udp/udp_input.c +++ b/src/vnet/udp/udp_input.c @@ -136,39 +136,46 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0, int wrote0; if (!(uc0->flags & UDP_CONN_F_CONNECTED)) - clib_spinlock_lock (&uc0->rx_lock); + { + clib_spinlock_lock (&uc0->rx_lock); + + wrote0 = session_enqueue_dgram_connection_cl ( + s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event); + + clib_spinlock_unlock (&uc0->rx_lock); + + /* Expect cl udp enqueue to fail because fifo enqueue */ + if (PREDICT_FALSE (wrote0 == 0)) + *error0 = UDP_ERROR_FIFO_FULL; + + return; + } if (svm_fifo_max_enqueue_prod (s0->rx_fifo) < hdr0->data_length + sizeof (session_dgram_hdr_t)) { *error0 = UDP_ERROR_FIFO_FULL; - goto unlock_rx_lock; + return; } /* If session is owned by another thread and rx event needed, * enqueue event now while we still have the peeker lock */ if (s0->thread_index != thread_index) { - wrote0 = session_enqueue_dgram_connection_cl ( + wrote0 = session_enqueue_dgram_connection2 ( s0, hdr0, b, TRANSPORT_PROTO_UDP, - /* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo)); + queue_event && !svm_fifo_has_event (s0->rx_fifo)); } else { - wrote0 = session_enqueue_dgram_connection (s0, hdr0, b, - TRANSPORT_PROTO_UDP, - queue_event); + wrote0 = session_enqueue_dgram_connection ( + s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event); } /* In some rare cases, session_enqueue_dgram_connection can fail because a * chunk cannot be allocated in the RX FIFO */ if (PREDICT_FALSE (wrote0 == 0)) *error0 = UDP_ERROR_FIFO_NOMEM; - -unlock_rx_lock: - - if (!(uc0->flags & UDP_CONN_F_CONNECTED)) - clib_spinlock_unlock (&uc0->rx_lock); } always_inline session_t * |