diff options
Diffstat (limited to 'vlib/example/mc_test.c')
-rw-r--r-- | vlib/example/mc_test.c | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/vlib/example/mc_test.c b/vlib/example/mc_test.c new file mode 100644 index 00000000000..2a7fe9867fe --- /dev/null +++ b/vlib/example/mc_test.c @@ -0,0 +1,370 @@ +/* + * mc_test.c: test program for vlib mc + * + * Copyright (c) 2010 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vlib/vlib.h> +#include <vlib/unix/mc_socket.h> +#include <vppinfra/random.h> + +typedef struct { + u32 min_n_msg_bytes; + u32 max_n_msg_bytes; + u32 tx_serial; + u32 rx_serial; + u32 seed; + u32 verbose; + u32 validate; + u32 window_size; + f64 min_delay, max_delay; + f64 n_packets_to_send; +} mc_test_main_t; + +always_inline u32 +choose_msg_size (mc_test_main_t * tm) +{ + u32 r = tm->min_n_msg_bytes; + if (tm->max_n_msg_bytes > tm->min_n_msg_bytes) + r += random_u32 (&tm->seed) % (1 + tm->max_n_msg_bytes - tm->min_n_msg_bytes); + return r; +} + +static mc_test_main_t mc_test_main; + +static void serialize_test_msg (serialize_main_t * m, va_list * va) +{ + mc_test_main_t * tm = &mc_test_main; + u32 n_bytes = choose_msg_size (tm); + u8 * msg; + int i; + serialize_integer (m, n_bytes, sizeof (n_bytes)); + msg = serialize_get (m, n_bytes); + for (i = 0; i < n_bytes; i++) + msg[i] = i + tm->tx_serial; + tm->tx_serial += n_bytes; +} + +static void unserialize_test_msg (serialize_main_t * m, va_list * va) +{ + mc_test_main_t * tm = &mc_test_main; + u32 i, n_bytes, dump_msg = tm->verbose; + u8 * p; + unserialize_integer (m, &n_bytes, sizeof (n_bytes)); + p = unserialize_get (m, n_bytes); + if (tm->validate) + for (i = 0; i < n_bytes; i++) + if (p[i] != ((tm->rx_serial + i) & 0xff)) + { + clib_warning ("corrupt msg at offset %d", i); + dump_msg = 1; + break; + } + if (dump_msg) + clib_warning ("got %d bytes, %U", n_bytes, format_hex_bytes, p, n_bytes); + tm->rx_serial += n_bytes; +} + +MC_SERIALIZE_MSG (test_msg, static) = { + .name = "test_msg", + .serialize = serialize_test_msg, + .unserialize = unserialize_test_msg, +}; + +#define SERIALIZE 1 + +#define EVENT_JOIN_STREAM 10 +#define EVENT_SEND_DATA 11 + +static void test_rx_callback (mc_main_t * mcm, + mc_stream_t * stream, + mc_peer_id_t peer_id, + u32 buffer_index) +{ + if (SERIALIZE) + { + return mc_unserialize (mcm, stream, buffer_index); + } + else + { +#if DEBUG > 1 + vlib_main_t * vm = mcm->vlib_main; + vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index); + u8 * dp = vlib_buffer_get_current (b); + + fformat(stdout, "RX from %U %U\n", + stream->transport->format_peer_id, peer_id, + format_hex_bytes, dp, tm->n_msg_bytes); + +#endif + } +} + +static u8 * +test_snapshot_callback (mc_main_t * mcm, + u8 * data_vector, + u32 last_global_sequence_processed) +{ + if (SERIALIZE) + { + serialize_main_t m; + + /* Append serialized data to data vector. */ + serialize_open_vector (&m, data_vector); + m.stream.current_buffer_index = vec_len (data_vector); + + return serialize_close_vector (&m); + } + else + return format (data_vector, + "snapshot, last global seq 0x%x", + last_global_sequence_processed); +} + +static void +test_handle_snapshot_callback (mc_main_t * mcm, + u8 * data, + u32 n_data_bytes) +{ + if (SERIALIZE) + { + serialize_main_t s; + unserialize_open_data (&s, data, n_data_bytes); + } + else + clib_warning ("snapshot `%*s'", n_data_bytes, data); +} + +static mc_socket_main_t mc_socket_main; + +static uword +mc_test_process (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * f) +{ + mc_test_main_t * tm = &mc_test_main; + mc_socket_main_t * msm = &mc_socket_main; + mc_main_t *mcm = &msm->mc_main; + uword event_type, *event_data = 0; + u32 data_serial=0, stream_index; + f64 delay; + mc_stream_config_t config; + clib_error_t * error; + int i; + char *intfcs[] = { "eth1", "eth0", "ce" }; + + memset (&config, 0, sizeof (config)); + config.name = "test"; + config.window_size = tm->window_size; + config.rx_buffer = test_rx_callback; + config.catchup_snapshot = test_snapshot_callback; + config.catchup = test_handle_snapshot_callback; + stream_index = ~0; + + msm->multicast_tx_ip4_address_host_byte_order = 0xefff0100; + msm->base_multicast_udp_port_host_byte_order = 0xffab; + + error = mc_socket_main_init (&mc_socket_main, intfcs, ARRAY_LEN(intfcs)); + if (error) + { + clib_error_report (error); + exit (1); + } + + mcm->we_can_be_relay_master = 1; + + while (1) + { + vlib_process_wait_for_event (vm); + event_type = vlib_process_get_events (vm, &event_data); + + switch (event_type) + { + case EVENT_JOIN_STREAM: + stream_index = mc_stream_join (mcm, &config); + break; + + case EVENT_SEND_DATA: { + f64 times[2]; + + if (stream_index == ~0) + stream_index = mc_stream_join (mcm, &config); + + times[0] = vlib_time_now (vm); + for (i = 0; i < event_data[0]; i++) + { + u32 bi; + if (SERIALIZE) + { + mc_serialize_stream (mcm, stream_index, &test_msg, data_serial); + } + else + { + u8 * mp; + mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); + mp[0] = data_serial; + mc_stream_send (mcm, stream_index, bi); + } + if (tm->min_delay > 0) + { + delay = tm->min_delay + random_f64 (&tm->seed) * (tm->max_delay - tm->min_delay); + vlib_process_suspend (vm, delay); + } + data_serial++; + } + times[1] = vlib_time_now (vm); + clib_warning ("done sending %d; %.4e per sec", + event_data[0], + (f64) event_data[0] / (times[1] - times[0])); + break; + } + + default: + clib_warning ("bug"); + break; + } + + if (event_data) + _vec_len (event_data) = 0; + } +} + +VLIB_REGISTER_NODE (mc_test_process_node,static) = { + .function = mc_test_process, + .type = VLIB_NODE_TYPE_PROCESS, + .name = "mc-test-process", +}; + +static clib_error_t * +mc_test_command (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + f64 npkts = 10; + + if (unformat (input, "join")) + { + vlib_cli_output (vm, "Join stream...\n"); + vlib_process_signal_event (vm, mc_test_process_node.index, + EVENT_JOIN_STREAM, 0); + return 0; + } + else if (unformat (input, "send %f", &npkts) + || unformat(input, "send")) + { + vlib_process_signal_event (vm, mc_test_process_node.index, + EVENT_SEND_DATA, (uword) npkts); + vlib_cli_output (vm, "Send %.0f pkts...\n", npkts); + + return 0; + } + else + return unformat_parse_error (input); +} + +VLIB_CLI_COMMAND (test_mc_command, static) = { + .path = "test mc", + .short_help = "Test mc command", + .function = mc_test_command, +}; + +static clib_error_t * +mc_show_command (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + mc_main_t *mcm = &mc_socket_main.mc_main; + vlib_cli_output (vm, "%U", format_mc_main, mcm); + return 0; +} + +VLIB_CLI_COMMAND (show_mc_command, static) = { + .path = "show mc", + .short_help = "Show mc command", + .function = mc_show_command, +}; + +static clib_error_t * +mc_clear_command (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + mc_main_t * mcm = &mc_socket_main.mc_main; + mc_clear_stream_stats (mcm); + return 0; +} + +VLIB_CLI_COMMAND (clear_mc_command, static) = { + .path = "clear mc", + .short_help = "Clear mc command", + .function = mc_clear_command, +}; + +static clib_error_t * +mc_config (vlib_main_t * vm, unformat_input_t * input) +{ + mc_test_main_t * tm = &mc_test_main; + mc_socket_main_t * msm = &mc_socket_main; + clib_error_t * error = 0; + + tm->min_n_msg_bytes = 4; + tm->max_n_msg_bytes = 4; + tm->window_size = 8; + tm->seed = getpid (); + tm->verbose = 0; + tm->validate = 1; + tm->min_delay = 10e-6; + tm->max_delay = 10e-3; + tm->n_packets_to_send = 0; + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "interface %s", &msm->multicast_interface_name)) + ; + + else if (unformat (input, "n-bytes %d", &tm->max_n_msg_bytes)) + tm->min_n_msg_bytes = tm->max_n_msg_bytes; + else if (unformat (input, "max-n-bytes %d", &tm->max_n_msg_bytes)) + ; + else if (unformat (input, "min-n-bytes %d", &tm->min_n_msg_bytes)) + ; + else if (unformat (input, "seed %d", &tm->seed)) + ; + else if (unformat (input, "window %d", &tm->window_size)) + ; + else if (unformat (input, "verbose")) + tm->verbose = 1; + else if (unformat (input, "no-validate")) + tm->validate = 0; + else if (unformat (input, "min-delay %f", &tm->min_delay)) + ; + else if (unformat (input, "max-delay %f", &tm->max_delay)) + ; + else if (unformat (input, "no-delay")) + tm->min_delay = tm->max_delay = 0; + else if (unformat (input, "n-packets %f", &tm->n_packets_to_send)) + ; + + else + return clib_error_return (0, "unknown input `%U'", + format_unformat_error, input); + } + + if (tm->n_packets_to_send > 0) + vlib_process_signal_event (vm, mc_test_process_node.index, + EVENT_SEND_DATA, (uword) tm->n_packets_to_send); + + return error; +} + +VLIB_CONFIG_FUNCTION (mc_config, "mc"); |