aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/hs_apps/echo_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/hs_apps/echo_server.c')
-rw-r--r--src/plugins/hs_apps/echo_server.c330
1 files changed, 273 insertions, 57 deletions
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c
index 178e9eef4fb..eeaf2d70088 100644
--- a/src/plugins/hs_apps/echo_server.c
+++ b/src/plugins/hs_apps/echo_server.c
@@ -13,16 +13,14 @@
* limitations under the License.
*/
+#include <hs_apps/hs_test.h>
#include <vnet/vnet.h>
#include <vlibmemory/api.h>
#include <vnet/session/application.h>
#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
-#define ECHO_SERVER_DBG (0)
-#define DBG(_fmt, _args...) \
- if (ECHO_SERVER_DBG) \
- clib_warning (_fmt, ##_args)
+static void es_set_echo_rx_callbacks (u8 no_echo);
typedef struct
{
@@ -39,7 +37,7 @@ typedef struct
/*
* Config params
*/
- u8 no_echo; /**< Don't echo traffic */
+ hs_test_cfg_t cfg;
u32 fifo_size; /**< Fifo size */
u32 rcv_buffer_size; /**< Rcv buffer size */
u32 prealloc_fifos; /**< Preallocate fifos */
@@ -53,21 +51,43 @@ typedef struct
/*
* Test state
*/
+ int (*rx_callback) (session_t *session);
+ u64 **session_handles;
u8 **rx_buf; /**< Per-thread RX buffer */
- u64 byte_index;
u32 **rx_retries;
+ u8 byte_index;
u8 transport_proto;
u64 listener_handle; /**< Session handle of the root listener */
+ u64 ctrl_listener_handle;
vlib_main_t *vlib_main;
} echo_server_main_t;
echo_server_main_t echo_server_main;
+#define es_err(_fmt, _args...) clib_warning (_fmt, ##_args);
+
+#define es_dbg(_fmt, _args...) \
+ do \
+ { \
+ if (PREDICT_FALSE (echo_server_main.cfg.verbose)) \
+ es_err (_fmt, ##_args); \
+ } \
+ while (0)
+
+#define es_cli(_fmt, _args...) vlib_cli_output (vm, _fmt, ##_args)
+
int
quic_echo_server_qsession_accept_callback (session_t * s)
{
- DBG ("QSession %u accept w/opaque %d", s->session_index, s->opaque);
+ es_dbg ("QSession %u accept w/opaque %d", s->session_index, s->opaque);
+ return 0;
+}
+
+static int
+echo_server_ctrl_session_accept_callback (session_t *s)
+{
+ s->session_state = SESSION_STATE_READY;
return 0;
}
@@ -75,14 +95,18 @@ int
quic_echo_server_session_accept_callback (session_t * s)
{
echo_server_main_t *esm = &echo_server_main;
+
+ if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle))
+ return echo_server_ctrl_session_accept_callback (s);
+
if (s->listener_handle == esm->listener_handle)
return quic_echo_server_qsession_accept_callback (s);
- DBG ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque);
+
+ es_dbg ("SSESSION %u accept w/opaque %d", s->session_index, s->opaque);
esm->vpp_queue[s->thread_index] =
session_main_get_vpp_event_queue (s->thread_index);
s->session_state = SESSION_STATE_READY;
- esm->byte_index = 0;
ASSERT (vec_len (esm->rx_retries) > s->thread_index);
vec_validate (esm->rx_retries[s->thread_index], s->session_index);
esm->rx_retries[s->thread_index][s->session_index] = 0;
@@ -93,13 +117,20 @@ int
echo_server_session_accept_callback (session_t * s)
{
echo_server_main_t *esm = &echo_server_main;
+
+ if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle))
+ return echo_server_ctrl_session_accept_callback (s);
+
esm->vpp_queue[s->thread_index] =
session_main_get_vpp_event_queue (s->thread_index);
s->session_state = SESSION_STATE_READY;
- esm->byte_index = 0;
ASSERT (vec_len (esm->rx_retries) > s->thread_index);
vec_validate (esm->rx_retries[s->thread_index], s->session_index);
esm->rx_retries[s->thread_index][s->session_index] = 0;
+ if (session_get_transport_proto (s) == TRANSPORT_PROTO_UDP)
+ {
+ vec_add1 (esm->session_handles[s->thread_index], session_handle (s));
+ }
return 0;
}
@@ -119,7 +150,7 @@ echo_server_session_reset_callback (session_t * s)
{
echo_server_main_t *esm = &echo_server_main;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
- clib_warning ("Reset session %U", format_session, s, 2);
+ es_dbg ("Reset session %U", format_session, s, 2);
a->handle = session_handle (s);
a->app_index = esm->app_index;
vnet_disconnect_session (a);
@@ -129,7 +160,7 @@ int
echo_server_session_connected_callback (u32 app_index, u32 api_context,
session_t * s, session_error_t err)
{
- clib_warning ("called...");
+ es_err ("called...");
return -1;
}
@@ -143,26 +174,142 @@ echo_server_add_segment_callback (u32 client_index, u64 segment_handle)
int
echo_server_redirect_connect_callback (u32 client_index, void *mp)
{
- clib_warning ("called...");
+ es_err ("called...");
return -1;
}
-void
-test_bytes (echo_server_main_t * esm, int actual_transfer)
+static void
+es_foreach_thread (void *fp)
{
- int i;
- u32 my_thread_id = vlib_get_thread_index ();
+ echo_server_main_t *esm = &echo_server_main;
+ uword thread_index;
+ for (thread_index = 0; thread_index < vec_len (esm->session_handles);
+ thread_index++)
+ {
+ session_send_rpc_evt_to_thread (thread_index, fp,
+ uword_to_pointer (thread_index, void *));
+ }
+}
- for (i = 0; i < actual_transfer; i++)
+static int
+es_wrk_prealloc_sessions (void *args)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ u32 thread_index = pointer_to_uword (args);
+
+ vec_validate (esm->rx_retries[thread_index], esm->cfg.num_test_sessions);
+
+ return 0;
+}
+
+static int
+echo_server_setup_test (hs_test_cfg_t *c)
+{
+ echo_server_main_t *esm = &echo_server_main;
+
+ if (c->test == HS_TEST_TYPE_UNI)
+ es_set_echo_rx_callbacks (1 /* no echo */);
+ else
+ es_set_echo_rx_callbacks (0 /* no echo */);
+
+ es_foreach_thread (es_wrk_prealloc_sessions);
+
+ if (c->test_bytes && c->num_test_sessions > 1)
{
- if (esm->rx_buf[my_thread_id][i] != ((esm->byte_index + i) & 0xff))
+ es_err ("test bytes not supported for more sessions; turning it off");
+ c->test_bytes = 0;
+ }
+ esm->byte_index = 0;
+
+ return 0;
+}
+
+static void
+echo_server_ctrl_reply (session_t *s)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ int rv;
+
+ rv = svm_fifo_enqueue (s->tx_fifo, sizeof (esm->cfg), (u8 *) &esm->cfg);
+ ASSERT (rv == sizeof (esm->cfg));
+ session_send_io_evt_to_thread_custom (&s->session_index, s->thread_index,
+ SESSION_IO_EVT_TX);
+}
+
+static int
+es_test_cmd_sync (echo_server_main_t *esm, session_t *s)
+{
+ int rv;
+
+ rv = echo_server_setup_test (&esm->cfg);
+ if (rv)
+ es_err ("setup test error!");
+
+ echo_server_ctrl_reply (s);
+ return 0;
+}
+
+static int
+es_wrk_cleanup_session (void *args)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ u32 thread_index = pointer_to_uword (args);
+ session_handle_t *session_handles, *sh;
+ vnet_disconnect_args_t _a = {}, *a = &_a;
+
+ a->app_index = esm->app_index;
+
+ session_handles = esm->session_handles[thread_index];
+
+ vec_foreach (sh, session_handles)
+ {
+ a->handle = sh[0];
+ vnet_disconnect_session (a);
+ }
+ vec_reset_length (session_handles);
+ return 0;
+}
+
+static int
+echo_server_rx_ctrl_callback (session_t *s)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ int rv;
+
+ rv = svm_fifo_dequeue (s->rx_fifo, sizeof (esm->cfg), (u8 *) &esm->cfg);
+ ASSERT (rv == sizeof (esm->cfg));
+
+ es_dbg ("control message received:");
+ if (esm->cfg.verbose)
+ hs_test_cfg_dump (&esm->cfg, 0);
+
+ switch (esm->cfg.cmd)
+ {
+ case HS_TEST_CMD_SYNC:
+ switch (esm->cfg.test)
{
- clib_warning ("at %lld expected %d got %d", esm->byte_index + i,
- (esm->byte_index + i) & 0xff,
- esm->rx_buf[my_thread_id][i]);
+ case HS_TEST_TYPE_ECHO:
+ case HS_TEST_TYPE_NONE:
+ es_foreach_thread (es_wrk_cleanup_session);
+ echo_server_ctrl_reply (s);
+ break;
+ case HS_TEST_TYPE_UNI:
+ case HS_TEST_TYPE_BI:
+ return es_test_cmd_sync (esm, s);
+ break;
+ default:
+ es_err ("unknown command type! %d", esm->cfg.cmd);
}
+ break;
+ case HS_TEST_CMD_START:
+ case HS_TEST_CMD_STOP:
+ echo_server_ctrl_reply (s);
+ break;
+ default:
+ es_err ("unknown command! %d", esm->cfg.cmd);
+ break;
}
- esm->byte_index += actual_transfer;
+ return 0;
}
/*
@@ -171,11 +318,31 @@ test_bytes (echo_server_main_t * esm, int actual_transfer)
int
echo_server_builtin_server_rx_callback_no_echo (session_t * s)
{
+ echo_server_main_t *esm = &echo_server_main;
+ if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle))
+ return echo_server_rx_ctrl_callback (s);
+
svm_fifo_t *rx_fifo = s->rx_fifo;
svm_fifo_dequeue_drop (rx_fifo, svm_fifo_max_dequeue_cons (rx_fifo));
return 0;
}
+static void
+es_test_bytes (echo_server_main_t *esm, int actual_transfer)
+{
+ int i;
+ u32 my_thread_id = vlib_get_thread_index ();
+ for (i = 0; i < actual_transfer; i++)
+ {
+ if (esm->rx_buf[my_thread_id][i] != ((esm->byte_index + i) & 0xff))
+ {
+ es_err ("at %lld expected %d got %d", esm->byte_index + i,
+ (esm->byte_index + i) & 0xff, esm->rx_buf[my_thread_id][i]);
+ }
+ }
+ esm->byte_index += actual_transfer;
+}
+
int
echo_server_rx_callback (session_t * s)
{
@@ -194,6 +361,9 @@ echo_server_rx_callback (session_t * s)
ASSERT (rx_fifo->master_thread_index == thread_index);
ASSERT (tx_fifo->master_thread_index == thread_index);
+ if (PREDICT_FALSE (esm->ctrl_listener_handle == s->listener_handle))
+ return echo_server_rx_ctrl_callback (s);
+
max_enqueue = svm_fifo_max_enqueue_prod (tx_fifo);
if (!esm->is_dgram)
{
@@ -232,12 +402,12 @@ echo_server_rx_callback (session_t * s)
* to fail if that's the case */
if (session_send_io_evt_to_thread (rx_fifo,
SESSION_IO_EVT_BUILTIN_RX))
- clib_warning ("failed to enqueue self-tap");
+ es_err ("failed to enqueue self-tap");
vec_validate (esm->rx_retries[s->thread_index], s->session_index);
if (esm->rx_retries[thread_index][s->session_index] == 500000)
{
- clib_warning ("session stuck: %U", format_session, s, 2);
+ es_err ("session stuck: %U", format_session, s, 2);
}
if (esm->rx_retries[thread_index][s->session_index] < 500001)
esm->rx_retries[thread_index][s->session_index]++;
@@ -264,7 +434,11 @@ echo_server_rx_callback (session_t * s)
0 /* peek */ );
}
ASSERT (actual_transfer == max_transfer);
- /* test_bytes (esm, actual_transfer); */
+
+ if (esm->cfg.test_bytes)
+ {
+ es_test_bytes (esm, actual_transfer);
+ }
/*
* Echo back
@@ -288,7 +462,7 @@ echo_server_rx_callback (session_t * s)
}
if (n_written != max_transfer)
- clib_warning ("short trout! written %u read %u", n_written, max_transfer);
+ es_err ("short trout! written %u read %u", n_written, max_transfer);
if (PREDICT_FALSE (svm_fifo_max_dequeue_cons (rx_fifo)))
goto rx_event;
@@ -296,15 +470,32 @@ echo_server_rx_callback (session_t * s)
return 0;
}
+int
+echo_server_rx_callback_common (session_t *s)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ return esm->rx_callback (s);
+}
+
static session_cb_vft_t echo_server_session_cb_vft = {
.session_accept_callback = echo_server_session_accept_callback,
.session_disconnect_callback = echo_server_session_disconnect_callback,
.session_connected_callback = echo_server_session_connected_callback,
.add_segment_callback = echo_server_add_segment_callback,
- .builtin_app_rx_callback = echo_server_rx_callback,
+ .builtin_app_rx_callback = echo_server_rx_callback_common,
.session_reset_callback = echo_server_session_reset_callback
};
+static void
+es_set_echo_rx_callbacks (u8 no_echo)
+{
+ echo_server_main_t *esm = &echo_server_main;
+ if (no_echo)
+ esm->rx_callback = echo_server_builtin_server_rx_callback_no_echo;
+ else
+ esm->rx_callback = echo_server_rx_callback;
+}
+
static int
echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
{
@@ -316,12 +507,8 @@ echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
clib_memset (a, 0, sizeof (*a));
clib_memset (options, 0, sizeof (options));
- if (esm->no_echo)
- echo_server_session_cb_vft.builtin_app_rx_callback =
- echo_server_builtin_server_rx_callback_no_echo;
- else
- echo_server_session_cb_vft.builtin_app_rx_callback =
- echo_server_rx_callback;
+ esm->rx_callback = echo_server_rx_callback;
+
if (esm->transport_proto == TRANSPORT_PROTO_QUIC)
echo_server_session_cb_vft.session_accept_callback =
quic_echo_server_session_accept_callback;
@@ -350,7 +537,7 @@ echo_server_attach (u8 * appns_id, u64 appns_flags, u64 appns_secret)
if (vnet_application_attach (a))
{
- clib_warning ("failed to attach server");
+ es_err ("failed to attach server");
return -1;
}
esm->app_index = a->app_index;
@@ -390,19 +577,35 @@ echo_client_transport_needs_crypto (transport_proto_t proto)
}
static int
+echo_server_listen_ctrl ()
+{
+ echo_server_main_t *esm = &echo_server_main;
+ vnet_listen_args_t _args = {}, *args = &_args;
+ session_error_t rv;
+
+ if ((rv = parse_uri (esm->server_uri, &args->sep_ext)))
+ return -1;
+ args->sep_ext.transport_proto = TRANSPORT_PROTO_TCP;
+ args->app_index = esm->app_index;
+
+ rv = vnet_listen (args);
+ esm->ctrl_listener_handle = args->handle;
+ return rv;
+}
+
+static int
echo_server_listen ()
{
i32 rv;
echo_server_main_t *esm = &echo_server_main;
- vnet_listen_args_t _args = { 0 }, *args = &_args;
-
- args->sep_ext.app_wrk_index = 0;
+ vnet_listen_args_t _args = {}, *args = &_args;
if ((rv = parse_uri (esm->server_uri, &args->sep_ext)))
{
return -1;
}
args->app_index = esm->app_index;
+ args->sep_ext.port = hs_make_data_port (args->sep_ext.port);
if (echo_client_transport_needs_crypto (args->sep_ext.transport_proto))
{
session_endpoint_alloc_ext_cfg (&args->sep_ext,
@@ -435,23 +638,38 @@ echo_server_create (vlib_main_t * vm, u8 * appns_id, u64 appns_flags,
vec_validate (echo_server_main.vpp_queue, num_threads - 1);
vec_validate (esm->rx_buf, num_threads - 1);
vec_validate (esm->rx_retries, num_threads - 1);
+ vec_validate (esm->session_handles, num_threads - 1);
for (i = 0; i < vec_len (esm->rx_retries); i++)
- vec_validate (esm->rx_retries[i],
- pool_elts (session_main.wrk[i].sessions));
+ {
+ vec_validate (esm->rx_retries[i],
+ pool_elts (session_main.wrk[i].sessions));
+ vec_validate (esm->session_handles[i],
+ pool_elts (session_main.wrk[i].sessions));
+ clib_memset (esm->session_handles[i], ~0,
+ sizeof (u64) * vec_len (esm->session_handles[i]));
+ vec_reset_length (esm->session_handles[i]);
+ }
esm->rcv_buffer_size = clib_max (esm->rcv_buffer_size, esm->fifo_size);
for (i = 0; i < num_threads; i++)
vec_validate (esm->rx_buf[i], esm->rcv_buffer_size);
if (echo_server_attach (appns_id, appns_flags, appns_secret))
{
- clib_warning ("failed to attach server");
+ es_err ("failed to attach server");
+ return -1;
+ }
+ if (echo_server_listen_ctrl ())
+ {
+ es_err ("failed to start listening on ctrl session");
+ if (echo_server_detach ())
+ es_err ("failed to detach");
return -1;
}
if (echo_server_listen ())
{
- clib_warning ("failed to start listening");
+ es_err ("failed to start listening");
if (echo_server_detach ())
- clib_warning ("failed to detach");
+ es_err ("failed to detach");
return -1;
}
return 0;
@@ -469,7 +687,6 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
int rv, is_stop = 0;
clib_error_t *error = 0;
- esm->no_echo = 0;
esm->fifo_size = 64 << 10;
esm->rcv_buffer_size = 128 << 10;
esm->prealloc_fifos = 0;
@@ -482,10 +699,9 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
{
if (unformat (input, "uri %s", &esm->server_uri))
server_uri_set = 1;
- else if (unformat (input, "no-echo"))
- esm->no_echo = 1;
- else if (unformat (input, "fifo-size %d", &esm->fifo_size))
- esm->fifo_size <<= 10;
+ else if (unformat (input, "fifo-size %U", unformat_memory_size,
+ &esm->fifo_size))
+ ;
else if (unformat (input, "rcv-buf-size %d", &esm->rcv_buffer_size))
;
else if (unformat (input, "prealloc-fifos %d", &esm->prealloc_fifos))
@@ -523,14 +739,14 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
{
if (esm->app_index == (u32) ~ 0)
{
- clib_warning ("server not running");
+ es_cli ("server not running");
error = clib_error_return (0, "failed: server not running");
goto cleanup;
}
rv = echo_server_detach ();
if (rv)
{
- clib_warning ("failed: detach");
+ es_cli ("failed: detach");
error = clib_error_return (0, "failed: server detach %d", rv);
goto cleanup;
}
@@ -541,7 +757,7 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
if (!server_uri_set)
{
- clib_warning ("No uri provided! Using default: %s", default_uri);
+ es_cli ("No uri provided! Using default: %s", default_uri);
esm->server_uri = (char *) format (0, "%s%c", default_uri, 0);
}
@@ -568,13 +784,13 @@ cleanup:
}
/* *INDENT-OFF* */
-VLIB_CLI_COMMAND (echo_server_create_command, static) =
-{
+VLIB_CLI_COMMAND (echo_server_create_command, static) = {
.path = "test echo server",
- .short_help = "test echo server proto <proto> [no echo][fifo-size <mbytes>]"
- "[rcv-buf-size <bytes>][prealloc-fifos <count>]"
- "[private-segment-count <count>][private-segment-size <bytes[m|g]>]"
- "[uri <tcp://ip/port>]",
+ .short_help =
+ "test echo server proto <proto> [fifo-size <mbytes>]"
+ "[rcv-buf-size <bytes>][prealloc-fifos <count>]"
+ "[private-segment-count <count>][private-segment-size <bytes[m|g]>]"
+ "[uri <tcp://ip/port>]",
.function = echo_server_create_command_fn,
};
/* *INDENT-ON* */