diff options
Diffstat (limited to 'src/plugins/hs_apps/echo_server.c')
-rw-r--r-- | src/plugins/hs_apps/echo_server.c | 330 |
1 files changed, 273 insertions, 57 deletions
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c index 178e9eef4fb..eeaf2d70088 100644 --- a/src/plugins/hs_apps/echo_server.c +++ b/src/plugins/hs_apps/echo_server.c @@ -13,16 +13,14 @@ * limitations under the License. */ +#include <hs_apps/hs_test.h> #include <vnet/vnet.h> #include <vlibmemory/api.h> #include <vnet/session/application.h> #include <vnet/session/application_interface.h> #include <vnet/session/session.h> -#define ECHO_SERVER_DBG (0) -#define DBG(_fmt, _args...) \ - if (ECHO_SERVER_DBG) \ - clib_warning (_fmt, ##_args) +static void es_set_echo_rx_callbacks (u8 no_echo); typedef struct { @@ -39,7 +37,7 @@ typedef struct /* * Config params */ - u8 no_echo; /**< Don't echo traffic */ + hs_test_cfg_t cfg; u32 fifo_size; /**< Fifo size */ u32 rcv_buffer_size; /**< Rcv buffer size */ u32 prealloc_fifos; /**< Preallocate fifos */ @@ -53,21 +51,43 @@ typedef struct /* * Test state */ + int (*rx_callback) (session_t *session); + u64 **session_handles; u8 **rx_buf; /**< Per-thread RX buffer */ - u64 byte_index; u32 **rx_retries; + u8 byte_index; u8 transport_proto; u64 listener_handle; /**< Session handle of the root listener */ + u64 ctrl_listener_handle; vlib_main_t *vlib_main; } echo_server_main_t; echo_server_main_t echo_server_main; +#define es_err(_fmt, _args...) clib_warning (_fmt, ##_args); + +#define es_dbg(_fmt, _args...) \ + do \ + { \ + if (PREDICT_FALSE (echo_server_main.cfg.verbose)) \ + es_err (_fmt, ##_args); \ + } \ + while (0) + +#define es_cli(_fmt, _args...) vlib_cli_output (vm, _fmt, ##_args) + int quic_echo_server_qsession_accept_callback (session_t * s) { - DBG ("QSession %u accept w/opaque %d", s->session_index, s->opaque); + es_dbg ("QSession %u accept w/opaque %d", s->session_index, s->opaque); + return 0; +} + +static int +echo_server_ctrl_session_accept_callback (session_t *s) +{ + s->session_state = SESSION_STATE_READY; return 0; } @@ -75,14 +95,18 @@ int quic_echo_server_session_accept_callback (session_t * s) { echo_server_main_t *esm = &echo_server_main; + + if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) + return echo_server_ctrl_session_accept_callback (s); + if (s->listener_handle == esm->listener_handle) return quic_echo_server_qsession_accept_callback (s); - DBG ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque); + + es_dbg ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque); esm->vpp_queue[s->thread_index] = session_main_get_vpp_event_queue (s->thread_index); s->session_state = SESSION_STATE_READY; - esm->byte_index = 0; ASSERT (vec_len (esm->rx_retries) > s->thread_index); vec_validate (esm->rx_retries[s->thread_index], s->session_index); esm->rx_retries[s->thread_index][s->session_index] = 0; @@ -93,13 +117,20 @@ int echo_server_session_accept_callback (session_t * s) { echo_server_main_t *esm = &echo_server_main; + + if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) + return echo_server_ctrl_session_accept_callback (s); + esm->vpp_queue[s->thread_index] = session_main_get_vpp_event_queue (s->thread_index); s->session_state = SESSION_STATE_READY; - esm->byte_index = 0; ASSERT (vec_len (esm->rx_retries) > s->thread_index); vec_validate (esm->rx_retries[s->thread_index], s->session_index); esm->rx_retries[s->thread_index][s->session_index] = 0; + if (session_get_transport_proto (s) == TRANSPORT_PROTO_UDP) + { + vec_add1 (esm->session_handles[s->thread_index], session_handle (s)); + } return 0; } @@ -119,7 +150,7 @@ echo_server_session_reset_callback (session_t * s) { echo_server_main_t *esm = &echo_server_main; vnet_disconnect_args_t _a = { 0 }, *a = &_a; - clib_warning ("Reset session %U", format_session, s, 2); + es_dbg ("Reset session %U", format_session, s, 2); a->handle = session_handle (s); a->app_index = esm->app_index; vnet_disconnect_session (a); @@ -129,7 +160,7 @@ int echo_server_session_connected_callback (u32 app_index, u32 api_context, session_t * s, session_error_t err) { - clib_warning ("called..."); + es_err ("called..."); return -1; } @@ -143,26 +174,142 @@ echo_server_add_segment_callback (u32 client_index, u64 segment_handle) int echo_server_redirect_connect_callback (u32 client_index, void *mp) { - clib_warning ("called..."); + es_err ("called..."); return -1; } -void -test_bytes (echo_server_main_t * esm, int actual_transfer) +static void +es_foreach_thread (void *fp) { - int i; - u32 my_thread_id = vlib_get_thread_index (); + echo_server_main_t *esm = &echo_server_main; + uword thread_index; + for (thread_index = 0; thread_index < vec_len (esm->session_handles); + thread_index++) + { + session_send_rpc_evt_to_thread (thread_index, fp, + uword_to_pointer (thread_index, void *)); + } +} - for (i = 0; i < actual_transfer; i++) +static int +es_wrk_prealloc_sessions (void *args) +{ + echo_server_main_t *esm = &echo_server_main; + u32 thread_index = pointer_to_uword (args); + + vec_validate (esm->rx_retries[thread_index], esm->cfg.num_test_sessions); + + return 0; +} + +static int +echo_server_setup_test (hs_test_cfg_t *c) +{ + echo_server_main_t *esm = &echo_server_main; + + if (c->test == HS_TEST_TYPE_UNI) + es_set_echo_rx_callbacks (1 /* no echo */); + else + es_set_echo_rx_callbacks (0 /* no echo */); + + es_foreach_thread (es_wrk_prealloc_sessions); + + if (c->test_bytes && c->num_test_sessions > 1) { - if (esm->rx_buf[my_thread_id][i] != ((esm->byte_index + i) & 0xff)) + es_err ("test bytes not supported for more sessions; turning it off"); + c->test_bytes = 0; + } + esm->byte_index = 0; + + return 0; +} + +static void +echo_server_ctrl_reply (session_t *s) +{ + echo_server_main_t *esm = &echo_server_main; + int rv; + + rv = svm_fifo_enqueue (s->tx_fifo, sizeof (esm->cfg), (u8 *) &esm->cfg); + ASSERT (rv == sizeof (esm->cfg)); + session_send_io_evt_to_thread_custom (&s->session_index, s->thread_index, + SESSION_IO_EVT_TX); +} + +static int +es_test_cmd_sync (echo_server_main_t *esm, session_t *s) +{ + int rv; + + rv = echo_server_setup_test (&esm->cfg); + if (rv) + es_err ("setup test error!"); + + echo_server_ctrl_reply (s); + return 0; +} + +static int +es_wrk_cleanup_session (void *args) +{ + echo_server_main_t *esm = &echo_server_main; + u32 thread_index = pointer_to_uword (args); + session_handle_t *session_handles, *sh; + vnet_disconnect_args_t _a = {}, *a = &_a; + + a->app_index = esm->app_index; + + session_handles = esm->session_handles[thread_index]; + + vec_foreach (sh, session_handles) + { + a->handle = sh[0]; + vnet_disconnect_session (a); + } + vec_reset_length (session_handles); + return 0; +} + +static int +echo_server_rx_ctrl_callback (session_t *s) +{ + echo_server_main_t *esm = &echo_server_main; + int rv; + + rv = svm_fifo_dequeue (s->rx_fifo, sizeof (esm->cfg), (u8 *) &esm->cfg); + ASSERT (rv == sizeof (esm->cfg)); + + es_dbg ("control message received:"); + if (esm->cfg.verbose) + hs_test_cfg_dump (&esm->cfg, 0); + + switch (esm->cfg.cmd) + { + case HS_TEST_CMD_SYNC: + switch (esm->cfg.test) { - clib_warning ("at %lld expected %d got %d", esm->byte_index + i, - (esm->byte_index + i) & 0xff, - esm->rx_buf[my_thread_id][i]); + case HS_TEST_TYPE_ECHO: + case HS_TEST_TYPE_NONE: + es_foreach_thread (es_wrk_cleanup_session); + echo_server_ctrl_reply (s); + break; + case HS_TEST_TYPE_UNI: + case HS_TEST_TYPE_BI: + return es_test_cmd_sync (esm, s); + break; + default: + es_err ("unknown command type! %d", esm->cfg.cmd); } + break; + case HS_TEST_CMD_START: + case HS_TEST_CMD_STOP: + echo_server_ctrl_reply (s); + break; + default: + es_err ("unknown command! %d", esm->cfg.cmd); + break; } - esm->byte_index += actual_transfer; + return 0; } /* @@ -171,11 +318,31 @@ test_bytes (echo_server_main_t * esm, int actual_transfer) int echo_server_builtin_server_rx_callback_no_echo (session_t * s) { + echo_server_main_t *esm = &echo_server_main; + if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) + return echo_server_rx_ctrl_callback (s); + svm_fifo_t *rx_fifo = s->rx_fifo; svm_fifo_dequeue_drop (rx_fifo, svm_fifo_max_dequeue_cons (rx_fifo)); return 0; } +static void +es_test_bytes (echo_server_main_t *esm, int actual_transfer) +{ + int i; + u32 my_thread_id = vlib_get_thread_index (); + for (i = 0; i < actual_transfer; i++) + { + if (esm->rx_buf[my_thread_id][i] != ((esm->byte_index + i) & 0xff)) + { + es_err ("at %lld expected %d got %d", esm->byte_index + i, + (esm->byte_index + i) & 0xff, esm->rx_buf[my_thread_id][i]); + } + } + esm->byte_index += actual_transfer; +} + int echo_server_rx_callback (session_t * s) { @@ -194,6 +361,9 @@ echo_server_rx_callback (session_t * s) ASSERT (rx_fifo->master_thread_index == thread_index); ASSERT (tx_fifo->master_thread_index == thread_index); + if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle)) + return echo_server_rx_ctrl_callback (s); + max_enqueue = svm_fifo_max_enqueue_prod (tx_fifo); if (!esm->is_dgram) { @@ -232,12 +402,12 @@ echo_server_rx_callback (session_t * s) * to fail if that's the case */ if (session_send_io_evt_to_thread (rx_fifo, SESSION_IO_EVT_BUILTIN_RX)) - clib_warning ("failed to enqueue self-tap"); + es_err ("failed to enqueue self-tap"); vec_validate (esm->rx_retries[s->thread_index], s->session_index); if (esm->rx_retries[thread_index][s->session_index] == 500000) { - clib_warning ("session stuck: %U", format_session, s, 2); + es_err ("session stuck: %U", format_session, s, 2); } if (esm->rx_retries[thread_index][s->session_index] < 500001) esm->rx_retries[thread_index][s->session_index]++; @@ -264,7 +434,11 @@ echo_server_rx_callback (session_t * s) 0 /* peek */ ); } ASSERT (actual_transfer == max_transfer); - /* test_bytes (esm, actual_transfer); */ + + if (esm->cfg.test_bytes) + { + es_test_bytes (esm, actual_transfer); + } /* * Echo back @@ -288,7 +462,7 @@ echo_server_rx_callback (session_t * s) } if (n_written != max_transfer) - clib_warning ("short trout! written %u read %u", n_written, max_transfer); + es_err ("short trout! written %u read %u", n_written, max_transfer); if (PREDICT_FALSE (svm_fifo_max_dequeue_cons (rx_fifo))) goto rx_event; @@ -296,15 +470,32 @@ echo_server_rx_callback (session_t * s) return 0; } +int +echo_server_rx_callback_common (session_t *s) +{ + echo_server_main_t *esm = &echo_server_main; + return esm->rx_callback (s); +} + static session_cb_vft_t echo_server_session_cb_vft = { .session_accept_callback = echo_server_session_accept_callback, .session_disconnect_callback = echo_server_session_disconnect_callback, .session_connected_callback = echo_server_session_connected_callback, .add_segment_callback = echo_server_add_segment_callback, - .builtin_app_rx_callback = echo_server_rx_callback, + .builtin_app_rx_callback = echo_server_rx_callback_common, .session_reset_callback = echo_server_session_reset_callback }; +static void +es_set_echo_rx_callbacks (u8 no_echo) +{ + echo_server_main_t *esm = &echo_server_main; + if (no_echo) + esm->rx_callback = echo_server_builtin_server_rx_callback_no_echo; + else + esm->rx_callback = echo_server_rx_callback; +} + static int echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) { @@ -316,12 +507,8 @@ echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) clib_memset (a, 0, sizeof (*a)); clib_memset (options, 0, sizeof (options)); - if (esm->no_echo) - echo_server_session_cb_vft.builtin_app_rx_callback = - echo_server_builtin_server_rx_callback_no_echo; - else - echo_server_session_cb_vft.builtin_app_rx_callback = - echo_server_rx_callback; + esm->rx_callback = echo_server_rx_callback; + if (esm->transport_proto == TRANSPORT_PROTO_QUIC) echo_server_session_cb_vft.session_accept_callback = quic_echo_server_session_accept_callback; @@ -350,7 +537,7 @@ echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret) if (vnet_application_attach (a)) { - clib_warning ("failed to attach server"); + es_err ("failed to attach server"); return -1; } esm->app_index = a->app_index; @@ -390,19 +577,35 @@ echo_client_transport_needs_crypto (transport_proto_t proto) } static int +echo_server_listen_ctrl () +{ + echo_server_main_t *esm = &echo_server_main; + vnet_listen_args_t _args = {}, *args = &_args; + session_error_t rv; + + if ((rv = parse_uri (esm->server_uri, &args->sep_ext))) + return -1; + args->sep_ext.transport_proto = TRANSPORT_PROTO_TCP; + args->app_index = esm->app_index; + + rv = vnet_listen (args); + esm->ctrl_listener_handle = args->handle; + return rv; +} + +static int echo_server_listen () { i32 rv; echo_server_main_t *esm = &echo_server_main; - vnet_listen_args_t _args = { 0 }, *args = &_args; - - args->sep_ext.app_wrk_index = 0; + vnet_listen_args_t _args = {}, *args = &_args; if ((rv = parse_uri (esm->server_uri, &args->sep_ext))) { return -1; } args->app_index = esm->app_index; + args->sep_ext.port = hs_make_data_port (args->sep_ext.port); if (echo_client_transport_needs_crypto (args->sep_ext.transport_proto)) { session_endpoint_alloc_ext_cfg (&args->sep_ext, @@ -435,23 +638,38 @@ echo_server_create (vlib_main_t * vm, u8 * appns_id, u64 appns_flags, vec_validate (echo_server_main.vpp_queue, num_threads - 1); vec_validate (esm->rx_buf, num_threads - 1); vec_validate (esm->rx_retries, num_threads - 1); + vec_validate (esm->session_handles, num_threads - 1); for (i = 0; i < vec_len (esm->rx_retries); i++) - vec_validate (esm->rx_retries[i], - pool_elts (session_main.wrk[i].sessions)); + { + vec_validate (esm->rx_retries[i], + pool_elts (session_main.wrk[i].sessions)); + vec_validate (esm->session_handles[i], + pool_elts (session_main.wrk[i].sessions)); + clib_memset (esm->session_handles[i], ~0, + sizeof (u64) * vec_len (esm->session_handles[i])); + vec_reset_length (esm->session_handles[i]); + } esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size); for (i = 0; i < num_threads; i++) vec_validate (esm->rx_buf[i], esm->rcv_buffer_size); if (echo_server_attach (appns_id, appns_flags, appns_secret)) { - clib_warning ("failed to attach server"); + es_err ("failed to attach server"); + return -1; + } + if (echo_server_listen_ctrl ()) + { + es_err ("failed to start listening on ctrl session"); + if (echo_server_detach ()) + es_err ("failed to detach"); return -1; } if (echo_server_listen ()) { - clib_warning ("failed to start listening"); + es_err ("failed to start listening"); if (echo_server_detach ()) - clib_warning ("failed to detach"); + es_err ("failed to detach"); return -1; } return 0; @@ -469,7 +687,6 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, int rv, is_stop = 0; clib_error_t *error = 0; - esm->no_echo = 0; esm->fifo_size = 64 << 10; esm->rcv_buffer_size = 128 << 10; esm->prealloc_fifos = 0; @@ -482,10 +699,9 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, { if (unformat (input, "uri %s", &esm->server_uri)) server_uri_set = 1; - else if (unformat (input, "no-echo")) - esm->no_echo = 1; - else if (unformat (input, "fifo-size %d", &esm->fifo_size)) - esm->fifo_size <<= 10; + else if (unformat (input, "fifo-size %U", unformat_memory_size, + &esm->fifo_size)) + ; else if (unformat (input, "rcv-buf-size %d", &esm->rcv_buffer_size)) ; else if (unformat (input, "prealloc-fifos %d", &esm->prealloc_fifos)) @@ -523,14 +739,14 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, { if (esm->app_index == (u32) ~ 0) { - clib_warning ("server not running"); + es_cli ("server not running"); error = clib_error_return (0, "failed: server not running"); goto cleanup; } rv = echo_server_detach (); if (rv) { - clib_warning ("failed: detach"); + es_cli ("failed: detach"); error = clib_error_return (0, "failed: server detach %d", rv); goto cleanup; } @@ -541,7 +757,7 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, if (!server_uri_set) { - clib_warning ("No uri provided! Using default: %s", default_uri); + es_cli ("No uri provided! Using default: %s", default_uri); esm->server_uri = (char *) format (0, "%s%c", default_uri, 0); } @@ -568,13 +784,13 @@ cleanup: } /* *INDENT-OFF* */ -VLIB_CLI_COMMAND (echo_server_create_command, static) = -{ +VLIB_CLI_COMMAND (echo_server_create_command, static) = { .path = "test echo server", - .short_help = "test echo server proto <proto> [no echo][fifo-size <mbytes>]" - "[rcv-buf-size <bytes>][prealloc-fifos <count>]" - "[private-segment-count <count>][private-segment-size <bytes[m|g]>]" - "[uri <tcp://ip/port>]", + .short_help = + "test echo server proto <proto> [fifo-size <mbytes>]" + "[rcv-buf-size <bytes>][prealloc-fifos <count>]" + "[private-segment-count <count>][private-segment-size <bytes[m|g]>]" + "[uri <tcp://ip/port>]", .function = echo_server_create_command_fn, }; /* *INDENT-ON* */ |