diff options
Diffstat (limited to 'src/plugins/hs_apps/echo_server.c')
-rw-r--r-- | src/plugins/hs_apps/echo_server.c | 217 |
1 files changed, 110 insertions, 107 deletions
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c index eeaf2d70088..f4f9acf1d0b 100644 --- a/src/plugins/hs_apps/echo_server.c +++ b/src/plugins/hs_apps/echo_server.c @@ -24,15 +24,26 @@ static void es_set_echo_rx_callbacks (u8 no_echo); typedef struct { - /* - * Server app parameters - */ - svm_msg_q_t **vpp_queue; - svm_queue_t *vl_input_queue; /**< Sever's event queue */ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + u32 session_index; + u64 vpp_session_handle; + u32 vpp_session_index; + u32 rx_retries; + u8 byte_index; +} es_session_t; + +typedef struct +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + es_session_t *sessions; + u8 *rx_buf; /**< Per-thread RX buffer */ + svm_msg_q_t *vpp_event_queue; + u32 thread_index; +} es_worker_t; +typedef struct +{ u32 app_index; /**< Server app index */ - u32 my_client_index; /**< API client handle */ - u32 node_index; /**< process node index for event scheduling */ /* * Config params @@ -51,11 +62,8 @@ typedef struct /* * Test state */ + es_worker_t *wrk; int (*rx_callback) (session_t *session); - u64 **session_handles; - u8 **rx_buf; /**< Per-thread RX buffer */ - u32 **rx_retries; - u8 byte_index; u8 transport_proto; u64 listener_handle; /**< Session handle of the root listener */ u64 ctrl_listener_handle; @@ -77,6 +85,28 @@ echo_server_main_t echo_server_main; #define es_cli(_fmt, _args...) vlib_cli_output (vm, _fmt, ##_args) +static inline es_worker_t * +es_worker_get (u32 thread_index) +{ + return vec_elt_at_index (echo_server_main.wrk, thread_index); +} + +static inline es_session_t * +es_session_alloc (es_worker_t *wrk) +{ + es_session_t *es; + + pool_get_zero (wrk->sessions, es); + es->session_index = es - wrk->sessions; + return es; +} + +static inline es_session_t * +es_session_get (es_worker_t *wrk, u32 es_index) +{ + return pool_elt_at_index (wrk->sessions, es_index); +} + int quic_echo_server_qsession_accept_callback (session_t * s) { @@ -91,6 +121,18 @@ echo_server_ctrl_session_accept_callback (session_t *s) return 0; } +static void +es_session_alloc_and_init (session_t *s) +{ + es_session_t *es; + es_worker_t *wrk = es_worker_get (s->thread_index); + + es = es_session_alloc (wrk); + es->vpp_session_index = s->session_index; + es->vpp_session_handle = session_handle (s); + s->opaque = es->session_index; +} + int quic_echo_server_session_accept_callback (session_t * s) { @@ -104,12 +146,8 @@ quic_echo_server_session_accept_callback (session_t * s) es_dbg ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque); - esm->vpp_queue[s->thread_index] = - session_main_get_vpp_event_queue (s->thread_index); s->session_state = SESSION_STATE_READY; - ASSERT (vec_len (esm->rx_retries) > s->thread_index); - vec_validate (esm->rx_retries[s->thread_index], s->session_index); - esm->rx_retries[s->thread_index][s->session_index] = 0; + es_session_alloc_and_init (s); return 0; } @@ -121,16 +159,8 @@ echo_server_session_accept_callback (session_t * s) if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) return echo_server_ctrl_session_accept_callback (s); - esm->vpp_queue[s->thread_index] = - session_main_get_vpp_event_queue (s->thread_index); s->session_state = SESSION_STATE_READY; - ASSERT (vec_len (esm->rx_retries) > s->thread_index); - vec_validate (esm->rx_retries[s->thread_index], s->session_index); - esm->rx_retries[s->thread_index][s->session_index] = 0; - if (session_get_transport_proto (s) == TRANSPORT_PROTO_UDP) - { - vec_add1 (esm->session_handles[s->thread_index], session_handle (s)); - } + es_session_alloc_and_init (s); return 0; } @@ -183,8 +213,7 @@ es_foreach_thread (void *fp) { echo_server_main_t *esm = &echo_server_main; uword thread_index; - for (thread_index = 0; thread_index < vec_len (esm->session_handles); - thread_index++) + for (thread_index = 0; thread_index < vec_len (esm->wrk); thread_index++) { session_send_rpc_evt_to_thread (thread_index, fp, uword_to_pointer (thread_index, void *)); @@ -195,32 +224,25 @@ static int es_wrk_prealloc_sessions (void *args) { echo_server_main_t *esm = &echo_server_main; - u32 thread_index = pointer_to_uword (args); - - vec_validate (esm->rx_retries[thread_index], esm->cfg.num_test_sessions); + u32 sessions_per_wrk, n_wrks, thread_index; + thread_index = pointer_to_uword (args); + es_worker_t *wrk = es_worker_get (thread_index); + n_wrks = vlib_num_workers () ? vlib_num_workers () : 1; + sessions_per_wrk = esm->cfg.num_test_sessions / n_wrks; + pool_alloc (wrk->sessions, 1.1 * sessions_per_wrk); return 0; } static int echo_server_setup_test (hs_test_cfg_t *c) { - echo_server_main_t *esm = &echo_server_main; - if (c->test == HS_TEST_TYPE_UNI) es_set_echo_rx_callbacks (1 /* no echo */); else es_set_echo_rx_callbacks (0 /* no echo */); es_foreach_thread (es_wrk_prealloc_sessions); - - if (c->test_bytes && c->num_test_sessions > 1) - { - es_err ("test bytes not supported for more sessions; turning it off"); - c->test_bytes = 0; - } - esm->byte_index = 0; - return 0; } @@ -250,23 +272,24 @@ es_test_cmd_sync (echo_server_main_t *esm, session_t *s) } static int -es_wrk_cleanup_session (void *args) +es_wrk_cleanup_sessions (void *args) { echo_server_main_t *esm = &echo_server_main; - u32 thread_index = pointer_to_uword (args); - session_handle_t *session_handles, *sh; vnet_disconnect_args_t _a = {}, *a = &_a; + u32 thread_index = pointer_to_uword (args); + es_session_t *es; + es_worker_t *wrk; + wrk = es_worker_get (thread_index); a->app_index = esm->app_index; - session_handles = esm->session_handles[thread_index]; - - vec_foreach (sh, session_handles) + pool_foreach (es, wrk->sessions) { - a->handle = sh[0]; + a->handle = es->vpp_session_handle; vnet_disconnect_session (a); } - vec_reset_length (session_handles); + pool_free (wrk->sessions); + return 0; } @@ -290,7 +313,7 @@ echo_server_rx_ctrl_callback (session_t *s) { case HS_TEST_TYPE_ECHO: case HS_TEST_TYPE_NONE: - es_foreach_thread (es_wrk_cleanup_session); + es_foreach_thread (es_wrk_cleanup_sessions); echo_server_ctrl_reply (s); break; case HS_TEST_TYPE_UNI: @@ -328,19 +351,18 @@ echo_server_builtin_server_rx_callback_no_echo (session_t * s) } static void -es_test_bytes (echo_server_main_t *esm, int actual_transfer) +es_test_bytes (es_worker_t *wrk, es_session_t *es, int actual_transfer) { int i; - u32 my_thread_id = vlib_get_thread_index (); for (i = 0; i < actual_transfer; i++) { - if (esm->rx_buf[my_thread_id][i] != ((esm->byte_index + i) & 0xff)) + if (wrk->rx_buf[i] != ((es->byte_index + i) & 0xff)) { - es_err ("at %lld expected %d got %d", esm->byte_index + i, - (esm->byte_index + i) & 0xff, esm->rx_buf[my_thread_id][i]); + es_err ("at %lld expected %d got %d", es->byte_index + i, + (es->byte_index + i) & 0xff, wrk->rx_buf[i]); } } - esm->byte_index += actual_transfer; + es->byte_index += actual_transfer; } int @@ -352,6 +374,8 @@ echo_server_rx_callback (session_t * s) echo_server_main_t *esm = &echo_server_main; u32 thread_index = vlib_get_thread_index (); app_session_transport_t at; + es_worker_t *wrk; + es_session_t *es; ASSERT (s->thread_index == thread_index); @@ -364,7 +388,9 @@ echo_server_rx_callback (session_t * s) if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) return echo_server_rx_ctrl_callback (s); + wrk = es_worker_get (thread_index); max_enqueue = svm_fifo_max_enqueue_prod (tx_fifo); + if (!esm->is_dgram) { max_dequeue = svm_fifo_max_dequeue_cons (rx_fifo); @@ -374,18 +400,15 @@ echo_server_rx_callback (session_t * s) session_dgram_pre_hdr_t ph; svm_fifo_peek (rx_fifo, 0, sizeof (ph), (u8 *) & ph); max_dequeue = ph.data_length - ph.data_offset; - if (!esm->vpp_queue[s->thread_index]) - { - svm_msg_q_t *mq; - mq = session_main_get_vpp_event_queue (s->thread_index); - esm->vpp_queue[s->thread_index] = mq; - } + ASSERT (wrk->vpp_event_queue); max_enqueue -= sizeof (session_dgram_hdr_t); } if (PREDICT_FALSE (max_dequeue == 0)) return 0; + es = es_session_get (wrk, s->opaque); + /* Number of bytes we're going to copy */ max_transfer = clib_min (max_dequeue, max_enqueue); @@ -404,40 +427,35 @@ echo_server_rx_callback (session_t * s) SESSION_IO_EVT_BUILTIN_RX)) es_err ("failed to enqueue self-tap"); - vec_validate (esm->rx_retries[s->thread_index], s->session_index); - if (esm->rx_retries[thread_index][s->session_index] == 500000) + if (es->rx_retries == 500000) { es_err ("session stuck: %U", format_session, s, 2); } - if (esm->rx_retries[thread_index][s->session_index] < 500001) - esm->rx_retries[thread_index][s->session_index]++; + if (es->rx_retries < 500001) + es->rx_retries++; } return 0; } - vec_validate (esm->rx_buf[thread_index], max_transfer); + vec_validate (wrk->rx_buf, max_transfer); if (!esm->is_dgram) { - actual_transfer = app_recv_stream_raw (rx_fifo, - esm->rx_buf[thread_index], - max_transfer, - 0 /* don't clear event */ , - 0 /* peek */ ); + actual_transfer = + app_recv_stream_raw (rx_fifo, wrk->rx_buf, max_transfer, + 0 /* don't clear event */, 0 /* peek */); } else { - actual_transfer = app_recv_dgram_raw (rx_fifo, - esm->rx_buf[thread_index], - max_transfer, &at, - 0 /* don't clear event */ , - 0 /* peek */ ); + actual_transfer = + app_recv_dgram_raw (rx_fifo, wrk->rx_buf, max_transfer, &at, + 0 /* don't clear event */, 0 /* peek */); } ASSERT (actual_transfer == max_transfer); if (esm->cfg.test_bytes) { - es_test_bytes (esm, actual_transfer); + es_test_bytes (wrk, es, actual_transfer); } /* @@ -446,19 +464,15 @@ echo_server_rx_callback (session_t * s) if (!esm->is_dgram) { - n_written = app_send_stream_raw (tx_fifo, - esm->vpp_queue[thread_index], - esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, - 1 /* do_evt */ , 0); + n_written = app_send_stream_raw (tx_fifo, wrk->vpp_event_queue, + wrk->rx_buf, actual_transfer, + SESSION_IO_EVT_TX, 1 /* do_evt */, 0); } else { - n_written = app_send_dgram_raw (tx_fifo, &at, - esm->vpp_queue[s->thread_index], - esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, - 1 /* do_evt */ , 0); + n_written = app_send_dgram_raw (tx_fifo, &at, wrk->vpp_event_queue, + wrk->rx_buf, actual_transfer, + SESSION_IO_EVT_TX, 1 /* do_evt */, 0); } if (n_written != max_transfer) @@ -631,27 +645,18 @@ echo_server_create (vlib_main_t * vm, u8 * appns_id, u64 appns_flags, { echo_server_main_t *esm = &echo_server_main; vlib_thread_main_t *vtm = vlib_get_thread_main (); - u32 num_threads; - int i; + es_worker_t *wrk; + + esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size); + vec_validate (esm->wrk, vtm->n_threads); - num_threads = 1 /* main thread */ + vtm->n_threads; - vec_validate (echo_server_main.vpp_queue, num_threads - 1); - vec_validate (esm->rx_buf, num_threads - 1); - vec_validate (esm->rx_retries, num_threads - 1); - vec_validate (esm->session_handles, num_threads - 1); - for (i = 0; i < vec_len (esm->rx_retries); i++) + vec_foreach (wrk, esm->wrk) { - vec_validate (esm->rx_retries[i], - pool_elts (session_main.wrk[i].sessions)); - vec_validate (esm->session_handles[i], - pool_elts (session_main.wrk[i].sessions)); - clib_memset (esm->session_handles[i], ~0, - sizeof (u64) * vec_len (esm->session_handles[i])); - vec_reset_length (esm->session_handles[i]); + wrk->thread_index = wrk - esm->wrk; + vec_validate (wrk->rx_buf, esm->rcv_buffer_size); + wrk->vpp_event_queue = + session_main_get_vpp_event_queue (wrk->thread_index); } - esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size); - for (i = 0; i < num_threads; i++) - vec_validate (esm->rx_buf[i], esm->rcv_buffer_size); if (echo_server_attach (appns_id, appns_flags, appns_secret)) { @@ -798,8 +803,6 @@ VLIB_CLI_COMMAND (echo_server_create_command, static) = { clib_error_t * echo_server_main_init (vlib_main_t * vm) { - echo_server_main_t *esm = &echo_server_main; - esm->my_client_index = ~0; return 0; } |