From c3a06556d1a4a63646d4cc7aa76274177a56c13f Mon Sep 17 00:00:00 2001 From: Dave Barach Date: Mon, 1 Oct 2018 09:25:32 -0400 Subject: API / CLI event-log tracing Add an "elog trace [api][cli][barrier]" debug CLI command. Removed the barrier elog test command. Remove unused reliable multicast code. Change-Id: Ib3ecde901b7c49fe92b313d0087cd7e776adcdce Signed-off-by: Dave Barach --- src/vlib/CMakeLists.txt | 3 - src/vlib/buffer_funcs.h | 1 + src/vlib/cli.c | 113 ++ src/vlib/main.h | 8 +- src/vlib/mc.c | 2609 ----------------------------------------------- src/vlib/mc.h | 695 ------------- src/vlib/threads.c | 10 +- src/vlib/threads.h | 1 + src/vlib/threads_cli.c | 23 - src/vlib/vlib.h | 1 - 10 files changed, 125 insertions(+), 3339 deletions(-) delete mode 100644 src/vlib/mc.c delete mode 100644 src/vlib/mc.h (limited to 'src/vlib') diff --git a/src/vlib/CMakeLists.txt b/src/vlib/CMakeLists.txt index b187f980401..72c73f3c2d8 100644 --- a/src/vlib/CMakeLists.txt +++ b/src/vlib/CMakeLists.txt @@ -43,7 +43,6 @@ add_vpp_library(vlib linux/vfio.c log.c main.c - mc.c node.c node_cli.c node_format.c @@ -55,7 +54,6 @@ add_vpp_library(vlib unix/cli.c unix/input.c unix/main.c - unix/mc_socket.c unix/plugin.c unix/util.c @@ -77,7 +75,6 @@ add_vpp_library(vlib linux/vfio.h log.h main.h - mc.h node_funcs.h node.h pci/pci_config.h diff --git a/src/vlib/buffer_funcs.h b/src/vlib/buffer_funcs.h index 5306af6e218..d8abdf31d79 100644 --- a/src/vlib/buffer_funcs.h +++ b/src/vlib/buffer_funcs.h @@ -41,6 +41,7 @@ #define included_vlib_buffer_funcs_h #include +#include /** \file vlib buffer access methods. diff --git a/src/vlib/cli.c b/src/vlib/cli.c index ef02d27d288..d7b2a4686d1 100644 --- a/src/vlib/cli.c +++ b/src/vlib/cli.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -583,6 +584,23 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm, } else { + if (PREDICT_FALSE (vm->elog_trace_cli_commands)) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "cli-cmd: %s", + .format_args = "T4", + }; + /* *INDENT-ON* */ + struct + { + u32 c; + } *ed; + ed = ELOG_DATA (&vm->elog_main, e); + ed->c = elog_global_id_for_msg_name (c->path); + } + if (!c->is_mp_safe) vlib_worker_thread_barrier_sync (vm); @@ -591,6 +609,32 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm, if (!c->is_mp_safe) vlib_worker_thread_barrier_release (vm); + if (PREDICT_FALSE (vm->elog_trace_cli_commands)) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "cli-cmd: %s %s", + .format_args = "T4T4", + }; + /* *INDENT-ON* */ + struct + { + u32 c, err; + } *ed; + ed = ELOG_DATA (&vm->elog_main, e); + ed->c = elog_global_id_for_msg_name (c->path); + if (c_error) + { + vec_add1 (c_error->what, 0); + ed->err = elog_global_id_for_msg_name + ((const char *) c_error->what); + _vec_len (c_error->what) -= 1; + } + else + ed->err = elog_global_id_for_msg_name ("OK"); + } + if (c_error) { error = @@ -1415,6 +1459,75 @@ VLIB_CLI_COMMAND (show_cli_command, static) = { }; /* *INDENT-ON* */ +static clib_error_t * +elog_trace_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + int enable = 1; + int api = 0, cli = 0, barrier = 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + goto print_status; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (line_input, "api")) + api = 1; + else if (unformat (line_input, "cli")) + cli = 1; + else if (unformat (line_input, "barrier")) + barrier = 1; + else if (unformat (line_input, "disable")) + enable = 0; + else if (unformat (line_input, "enable")) + enable = 1; + else + break; + } + unformat_free (line_input); + + vm->elog_trace_api_messages = api ? enable : vm->elog_trace_api_messages; + vm->elog_trace_cli_commands = cli ? enable : vm->elog_trace_cli_commands; + vlib_worker_threads->barrier_elog_enabled = + barrier ? enable : vlib_worker_threads->barrier_elog_enabled; + +print_status: + vlib_cli_output (vm, "Current status:"); + + vlib_cli_output + (vm, " Event log API message trace: %s\n CLI command trace: %s", + vm->elog_trace_api_messages ? "on" : "off", + vm->elog_trace_cli_commands ? "on" : "off"); + vlib_cli_output + (vm, " Barrier sync trace: %s", + vlib_worker_threads->barrier_elog_enabled ? "on" : "off"); + + return 0; +} + +/*? + * Control event logging of api, cli, and thread barrier events + * With no arguments, displays the current trace status. + * Name the event groups you wish to trace or stop tracing. + * + * @cliexpar + * @clistart + * elog trace api cli barrier + * elog trace api cli barrier disable + * elog trace + * @cliend + * @cliexcmd{elog trace [api][cli][barrier][disable]} +?*/ +/* *INDENT-OFF* */ +VLIB_CLI_COMMAND (elog_trace_command, static) = +{ + .path = "elog trace", + .short_help = "elog trace [api][cli][barrier][disable]", + .function = elog_trace_command_fn, +}; +/* *INDENT-ON* */ + static clib_error_t * vlib_cli_init (vlib_main_t * vm) { diff --git a/src/vlib/main.h b/src/vlib/main.h index d21227a5fdf..ce42b6ea442 100644 --- a/src/vlib/main.h +++ b/src/vlib/main.h @@ -148,9 +148,6 @@ typedef struct vlib_main_t struct vlib_node_runtime_t * node, vlib_frame_t * frame); - /* Multicast distribution. Set to zero for MC disabled. */ - mc_main_t *mc_main; - /* Stream index to use for distribution when MC is enabled. */ u32 mc_stream_index; @@ -159,6 +156,10 @@ typedef struct vlib_main_t /* Event logger. */ elog_main_t elog_main; + /* Event logger trace flags */ + int elog_trace_api_messages; + int elog_trace_cli_commands; + /* Node call and return event types. */ elog_event_type_t *node_call_elog_event_types; elog_event_type_t *node_return_elog_event_types; @@ -184,7 +185,6 @@ typedef struct vlib_main_t _vlib_init_function_list_elt_t *main_loop_exit_function_registrations; _vlib_init_function_list_elt_t *api_init_function_registrations; vlib_config_function_runtime_t *config_function_registrations; - mc_serialize_msg_t *mc_msg_registrations; /* mc_main is a pointer... */ /* control-plane API queue signal pending, length indication */ volatile u32 queue_signal_pending; diff --git a/src/vlib/mc.c b/src/vlib/mc.c deleted file mode 100644 index a289871f570..00000000000 --- a/src/vlib/mc.c +++ /dev/null @@ -1,2609 +0,0 @@ -/* - * mc.c: vlib reliable sequenced multicast distributed applications - * - * Copyright (c) 2010 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -/* - * 1 to enable msg id training wheels, which are useful for tracking - * down catchup and/or partitioned network problems - */ -#define MSG_ID_DEBUG 0 - -static format_function_t format_mc_stream_state; - -static u32 -elog_id_for_peer_id (mc_main_t * m, u64 peer_id) -{ - uword *p, r; - mhash_t *h = &m->elog_id_by_peer_id; - - if (!m->elog_id_by_peer_id.hash) - mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t)); - - p = mhash_get (h, &peer_id); - if (p) - return p[0]; - r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id); - mhash_set (h, &peer_id, r, /* old_value */ 0); - return r; -} - -static u32 -elog_id_for_msg_name (mc_main_t * m, char *msg_name) -{ - uword *p, r; - uword *h = m->elog_id_by_msg_name; - u8 *name_copy; - - if (!h) - h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword)); - - p = hash_get_mem (h, msg_name); - if (p) - return p[0]; - r = elog_string (m->elog_main, "%s", msg_name); - - name_copy = format (0, "%s%c", msg_name, 0); - - hash_set_mem (h, name_copy, r); - m->elog_id_by_msg_name = h; - - return r; -} - -static void -elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, - u32 retry_count) -{ - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-msg: stream %d local seq %d attempt %d", - .format_args = "i4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_id, local_sequence, retry_count; - } *ed; - ed = ELOG_DATA (m->elog_main, e); - ed->stream_id = stream_id; - ed->local_sequence = local_sequence; - ed->retry_count = retry_count; - } -} - -/* - * seq_cmp - * correctly compare two unsigned sequence numbers. - * This function works so long as x and y are within 2**(n-1) of each - * other, where n = bits(x, y). - * - * Magic decoder ring: - * seq_cmp == 0 => x and y are equal - * seq_cmp < 0 => x is "in the past" with respect to y - * seq_cmp > 0 => x is "in the future" with respect to y - */ -always_inline i32 -mc_seq_cmp (u32 x, u32 y) -{ - return (i32) x - (i32) y; -} - -void * -mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return) -{ - u32 n_alloc, bi = 0; - vlib_buffer_t *b; - - n_alloc = vlib_buffer_alloc (vm, &bi, 1); - ASSERT (n_alloc == 1); - - b = vlib_get_buffer (vm, bi); - b->current_length = n_bytes; - *bi_return = bi; - return (void *) b->data; -} - -static void -delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s, - uword index, int notify_application) -{ - mc_stream_peer_t *p = pool_elt_at_index (s->peers, index); - ASSERT (p != 0); - if (s->config.peer_died && notify_application) - s->config.peer_died (mcm, s, p->id); - - s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "delete peer %s from all_peer_bitmap", - .format_args = "T4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer; - } *ed = 0; - - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - } - /* Do not delete the pool / hash table entries, or we lose sequence number state */ -} - -static mc_stream_peer_t * -get_or_create_peer_with_id (mc_main_t * mcm, - mc_stream_t * s, mc_peer_id_t id, int *created) -{ - uword *q = mhash_get (&s->peer_index_by_id, &id); - mc_stream_peer_t *p; - - if (q) - { - p = pool_elt_at_index (s->peers, q[0]); - goto done; - } - - pool_get (s->peers, p); - memset (p, 0, sizeof (p[0])); - p->id = id; - p->last_sequence_received = ~0; - mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0); - if (created) - *created = 1; - -done: - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "get_or_create %s peer %s stream %d seq %d", - .format_args = "t4T4i4i4", - .n_enum_strings = 2, - .enum_strings = { - "old", "new", - }, - }; - /* *INDENT-ON* */ - struct - { - u32 is_new, peer, stream_index, rx_sequence; - } *ed = 0; - - ed = ELOG_DATA (mcm->elog_main, e); - ed->is_new = q ? 0 : 1; - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->stream_index = s->index; - ed->rx_sequence = p->last_sequence_received; - } - /* $$$$ Enable or reenable this peer */ - s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers); - return p; -} - -static void -maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream) -{ - vlib_one_time_waiting_process_t *p; - - if (pool_elts (stream->retry_pool) >= stream->config.window_size) - return; - - vec_foreach (p, stream->procs_waiting_for_open_window) - vlib_signal_one_time_waiting_process (vm, p); - - if (stream->procs_waiting_for_open_window) - _vec_len (stream->procs_waiting_for_open_window) = 0; -} - -static void -mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r) -{ - mc_retry_t record, *retp; - - if (r->unacked_by_peer_bitmap) - _vec_len (r->unacked_by_peer_bitmap) = 0; - - if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) - { - clib_fifo_sub1 (s->retired_fifo, record); - vlib_buffer_free_one (mcm->vlib_main, record.buffer_index); - } - - clib_fifo_add2 (s->retired_fifo, retp); - - retp->buffer_index = r->buffer_index; - retp->local_sequence = r->local_sequence; - - r->buffer_index = ~0; /* poison buffer index in this retry */ -} - -static void -mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence) -{ - mc_retry_t *retry; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "resend-retired: search for local seq %d", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->local_sequence = local_sequence; - } - - /* *INDENT-OFF* */ - clib_fifo_foreach (retry, s->retired_fifo, - ({ - if (retry->local_sequence == local_sequence) - { - elog_tx_msg (mcm, s->index, retry-> local_sequence, -13); - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, - retry->buffer_index); - return; - } - })); - /* *INDENT-ON* */ - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "resend-retired: FAILED search for local seq %d", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->local_sequence = local_sequence; - } -} - -static uword * -delete_retry_fifo_elt (mc_main_t * mcm, - mc_stream_t * stream, - mc_retry_t * r, uword * dead_peer_bitmap) -{ - mc_stream_peer_t *p; - - /* *INDENT-OFF* */ - pool_foreach (p, stream->peers, ({ - uword pi = p - stream->peers; - uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi); - - if (! is_alive) - dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi); - - if (MC_EVENT_LOGGING > 0) - { - ELOG_TYPE_DECLARE (e) = { - .format = "delete_retry_fifo_elt: peer %s is %s", - .format_args = "T4t4", - .n_enum_strings = 2, - .enum_strings = { "alive", "dead", }, - }; - struct { u32 peer, is_alive; } * ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->is_alive = is_alive; - } - })); - /* *INDENT-ON* */ - - hash_unset (stream->retry_index_by_local_sequence, r->local_sequence); - mc_retry_free (mcm, stream, r); - - return dead_peer_bitmap; -} - -always_inline mc_retry_t * -prev_retry (mc_stream_t * s, mc_retry_t * r) -{ - return (r->prev_index != ~0 - ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0); -} - -always_inline mc_retry_t * -next_retry (mc_stream_t * s, mc_retry_t * r) -{ - return (r->next_index != ~0 - ? pool_elt_at_index (s->retry_pool, r->next_index) : 0); -} - -always_inline void -remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r) -{ - mc_retry_t *p = prev_retry (s, r); - mc_retry_t *n = next_retry (s, r); - - if (p) - p->next_index = r->next_index; - else - s->retry_head_index = r->next_index; - if (n) - n->prev_index = r->prev_index; - else - s->retry_tail_index = r->prev_index; - - pool_put_index (s->retry_pool, r - s->retry_pool); -} - -static void -check_retry (mc_main_t * mcm, mc_stream_t * s) -{ - mc_retry_t *r; - vlib_main_t *vm = mcm->vlib_main; - f64 now = vlib_time_now (vm); - uword *dead_peer_bitmap = 0; - u32 ri, ri_next; - - for (ri = s->retry_head_index; ri != ~0; ri = ri_next) - { - r = pool_elt_at_index (s->retry_pool, ri); - ri_next = r->next_index; - - if (now < r->sent_at + s->config.retry_interval) - continue; - - r->n_retries += 1; - if (r->n_retries > s->config.retry_limit) - { - dead_peer_bitmap = - delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap); - remove_retry_from_pool (s, r); - } - else - { - if (MC_EVENT_LOGGING > 0) - { - mc_stream_peer_t *p; - - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "resend local seq %d attempt %d", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers)) - { - ELOG_TYPE_DECLARE (ev) = { - .format = "resend: needed by peer %s local seq %d", - .format_args = "T4i4", - }; - struct { u32 peer, rx_sequence; } * ed; - ed = ELOG_DATA (mcm->elog_main, ev); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->rx_sequence = r->local_sequence; - } - })); - /* *INDENT-ON* */ - - struct - { - u32 sequence; - u32 trail; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->sequence = r->local_sequence; - ed->trail = r->n_retries; - } - - r->sent_at = vlib_time_now (vm); - s->stats.n_retries += 1; - - elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries); - - mcm->transport.tx_buffer - (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index); - } - } - - maybe_send_window_open_event (mcm->vlib_main, s); - - /* Delete any dead peers we've found. */ - if (!clib_bitmap_is_zero (dead_peer_bitmap)) - { - uword i; - - /* *INDENT-OFF* */ - clib_bitmap_foreach (i, dead_peer_bitmap, ({ - delete_peer_with_index (mcm, s, i, /* notify_application */ 1); - - /* Delete any references to just deleted peer in retry pool. */ - pool_foreach (r, s->retry_pool, ({ - r->unacked_by_peer_bitmap = - clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i); - })); - })); -/* *INDENT-ON* */ - clib_bitmap_free (dead_peer_bitmap); - } -} - -always_inline mc_main_t * -mc_node_get_main (vlib_node_runtime_t * node) -{ - mc_main_t **p = (void *) node->runtime_data; - return p[0]; -} - -static uword -mc_retry_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - mc_stream_t *s; - - while (1) - { - vlib_process_suspend (vm, 1.0); - vec_foreach (s, mcm->stream_vector) - { - if (s->state != MC_STREAM_STATE_invalid) - check_retry (mcm, s); - } - } - return 0; /* not likely */ -} - -static void -send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_msg_join_or_leave_request_t *mp; - u32 bi; - - mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); - memset (mp, 0, sizeof (*mp)); - mp->type = MC_MSG_TYPE_join_or_leave_request; - mp->peer_id = mcm->transport.our_ack_peer_id; - mp->stream_index = stream_index; - mp->is_join = is_join; - - mc_byte_swap_msg_join_or_leave_request (mp); - - /* - * These msgs are unnumbered, unordered so send on the from-relay - * channel. - */ - mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); -} - -static uword -mc_join_ager_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - - while (1) - { - if (mcm->joins_in_progress) - { - mc_stream_t *s; - vlib_one_time_waiting_process_t *p; - f64 now = vlib_time_now (vm); - - vec_foreach (s, mcm->stream_vector) - { - if (s->state != MC_STREAM_STATE_join_in_progress) - continue; - - if (now > s->join_timeout) - { - s->state = MC_STREAM_STATE_ready; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream %d join timeout", - }; - /* *INDENT-ON* */ - ELOG (mcm->elog_main, e, s->index); - } - /* Make sure that this app instance exists as a stream peer, - or we may answer a catchup request with a NULL - all_peer_bitmap... */ - (void) get_or_create_peer_with_id - (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0); - - vec_foreach (p, s->procs_waiting_for_join_done) - vlib_signal_one_time_waiting_process (vm, p); - if (s->procs_waiting_for_join_done) - _vec_len (s->procs_waiting_for_join_done) = 0; - - mcm->joins_in_progress--; - ASSERT (mcm->joins_in_progress >= 0); - } - else - { - /* Resent join request which may have been lost. */ - send_join_or_leave_request (mcm, s->index, 1 /* is_join */ ); - - /* We're *not* alone, retry for as long as it takes */ - if (mcm->relay_state == MC_RELAY_STATE_SLAVE) - s->join_timeout = vlib_time_now (vm) + 2.0; - - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream %d resend join request", - }; - /* *INDENT-ON* */ - ELOG (mcm->elog_main, e, s->index); - } - } - } - } - - vlib_process_suspend (vm, .5); - } - - return 0; /* not likely */ -} - -static void -serialize_mc_register_stream_name (serialize_main_t * m, va_list * va) -{ - char *name = va_arg (*va, char *); - serialize_cstring (m, name); -} - -static void -elog_stream_name (char *buf, int n_buf_bytes, char *v) -{ - clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v))); - buf[n_buf_bytes - 1] = 0; -} - -static void -unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - char *name; - mc_stream_t *s; - uword *p; - - unserialize_cstring (m, &name); - - if ((p = hash_get_mem (mcm->stream_index_by_name, name))) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d already named %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = p[0]; - elog_stream_name (ed->name, sizeof (ed->name), name); - } - - vec_free (name); - return; - } - - vec_add2 (mcm->stream_vector, s, 1); - mc_stream_init (s); - s->state = MC_STREAM_STATE_name_known; - s->index = s - mcm->stream_vector; - s->config.name = name; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d named %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = s->index; - elog_stream_name (ed->name, sizeof (ed->name), name); - } - - hash_set_mem (mcm->stream_index_by_name, name, s->index); - - p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name); - if (p) - { - vlib_one_time_waiting_process_t *wp, **w; - w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); - vec_foreach (wp, w[0]) - vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); - pool_put (mcm->procs_waiting_for_stream_name_pool, w); - hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name); - } -} - -/* *INDENT-OFF* */ -MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = -{ - .name = "mc_register_stream_name", - .serialize = serialize_mc_register_stream_name, - .unserialize = unserialize_mc_register_stream_name, -}; -/* *INDENT-ON* */ - -void -mc_rx_buffer_unserialize (mc_main_t * mcm, - mc_stream_t * stream, - mc_peer_id_t peer_id, u32 buffer_index) -{ - return mc_unserialize (mcm, stream, buffer_index); -} - -static u8 * -mc_internal_catchup_snapshot (mc_main_t * mcm, - u8 * data_vector, - u32 last_global_sequence_processed) -{ - serialize_main_t m; - - /* Append serialized data to data vector. */ - serialize_open_vector (&m, data_vector); - m.stream.current_buffer_index = vec_len (data_vector); - - serialize (&m, serialize_mc_main, mcm); - return serialize_close_vector (&m); -} - -static void -mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes) -{ - serialize_main_t s; - - unserialize_open_data (&s, data, n_data_bytes); - - unserialize (&s, unserialize_mc_main, mcm); -} - -/* Overridden from the application layer, not actually used here */ -void mc_stream_join_process_hold (void) __attribute__ ((weak)); -void -mc_stream_join_process_hold (void) -{ -} - -static u32 -mc_stream_join_helper (mc_main_t * mcm, - mc_stream_config_t * config, u32 is_internal) -{ - mc_stream_t *s; - vlib_main_t *vm = mcm->vlib_main; - - s = 0; - if (!is_internal) - { - uword *p; - - /* Already have a stream with given name? */ - if ((s = mc_stream_by_name (mcm, config->name))) - { - /* Already joined and ready? */ - if (s->state == MC_STREAM_STATE_ready) - return s->index; - } - - /* First join MC internal stream. */ - if (!mcm->stream_vector - || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state - == MC_STREAM_STATE_invalid)) - { - static mc_stream_config_t c = { - .name = "mc-internal", - .rx_buffer = mc_rx_buffer_unserialize, - .catchup = mc_internal_catchup, - .catchup_snapshot = mc_internal_catchup_snapshot, - }; - - c.save_snapshot = config->save_snapshot; - - mc_stream_join_helper (mcm, &c, /* is_internal */ 1); - } - - /* If stream is still unknown register this name and wait for - sequenced message to name stream. This way all peers agree - on stream name to index mappings. */ - s = mc_stream_by_name (mcm, config->name); - if (!s) - { - vlib_one_time_waiting_process_t *wp, **w; - u8 *name_copy = format (0, "%s", config->name); - - mc_serialize_stream (mcm, - MC_STREAM_INDEX_INTERNAL, - &mc_register_stream_name_msg, config->name); - - /* Wait for this stream to be named. */ - p = - hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, - name_copy); - if (p) - w = - pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, - p[0]); - else - { - pool_get (mcm->procs_waiting_for_stream_name_pool, w); - if (!mcm->procs_waiting_for_stream_name_by_name) - mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */ - sizeof - (uword)); - hash_set_mem (mcm->procs_waiting_for_stream_name_by_name, - name_copy, - w - mcm->procs_waiting_for_stream_name_pool); - w[0] = 0; - } - - vec_add2 (w[0], wp, 1); - vlib_current_process_wait_for_one_time_event (vm, wp); - vec_free (name_copy); - } - - /* Name should be known now. */ - s = mc_stream_by_name (mcm, config->name); - ASSERT (s != 0); - ASSERT (s->state == MC_STREAM_STATE_name_known); - } - - if (!s) - { - vec_add2 (mcm->stream_vector, s, 1); - mc_stream_init (s); - s->index = s - mcm->stream_vector; - } - - { - /* Save name since we could have already used it as hash key. */ - char *name_save = s->config.name; - - s->config = config[0]; - - if (name_save) - s->config.name = name_save; - } - - if (s->config.window_size == 0) - s->config.window_size = 8; - - if (s->config.retry_interval == 0.0) - s->config.retry_interval = 1.0; - - /* Sanity. */ - ASSERT (s->config.retry_interval < 30); - - if (s->config.retry_limit == 0) - s->config.retry_limit = 7; - - s->state = MC_STREAM_STATE_join_in_progress; - if (!s->peer_index_by_id.hash) - mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); - - /* If we don't hear from someone in 5 seconds, we're alone */ - s->join_timeout = vlib_time_now (vm) + 5.0; - mcm->joins_in_progress++; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d join request %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = s->index; - elog_stream_name (ed->name, sizeof (ed->name), s->config.name); - } - - send_join_or_leave_request (mcm, s->index, 1 /* join */ ); - - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_join_done); - - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "join complete stream %d"); - ELOG (mcm->elog_main, e, s->index); - } - - return s->index; -} - -u32 -mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config) -{ - return mc_stream_join_helper (mcm, config, /* is_internal */ 0); -} - -void -mc_stream_leave (mc_main_t * mcm, u32 stream_index) -{ - mc_stream_t *s = mc_stream_by_index (mcm, stream_index); - - if (!s) - return; - - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "leave-stream: %d",.format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 index; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->index = stream_index; - } - - send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ ); - mc_stream_free (s); - s->state = MC_STREAM_STATE_name_known; -} - -void -mc_msg_join_or_leave_request_handler (mc_main_t * mcm, - mc_msg_join_or_leave_request_t * req, - u32 buffer_index) -{ - mc_stream_t *s; - mc_msg_join_reply_t *rep; - u32 bi; - - mc_byte_swap_msg_join_or_leave_request (req); - - s = mc_stream_by_index (mcm, req->stream_index); - if (!s || s->state != MC_STREAM_STATE_ready) - return; - - /* If the peer is joining, create it */ - if (req->is_join) - { - mc_stream_t *this_s; - - /* We're not in a position to catch up a peer until all - stream joins are complete. */ - if (0) - { - /* XXX This is hard to test so we've. */ - vec_foreach (this_s, mcm->stream_vector) - { - if (this_s->state != MC_STREAM_STATE_ready - && this_s->state != MC_STREAM_STATE_name_known) - return; - } - } - else if (mcm->joins_in_progress > 0) - return; - - (void) get_or_create_peer_with_id (mcm, s, req->peer_id, - /* created */ 0); - - rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi); - memset (rep, 0, sizeof (rep[0])); - rep->type = MC_MSG_TYPE_join_reply; - rep->stream_index = req->stream_index; - - mc_byte_swap_msg_join_reply (rep); - /* These two are already in network byte order... */ - rep->peer_id = mcm->transport.our_ack_peer_id; - rep->catchup_peer_id = mcm->transport.our_catchup_peer_id; - - mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); - } - else - { - if (s->config.peer_died) - s->config.peer_died (mcm, s, req->peer_id); - } -} - -void -mc_msg_join_reply_handler (mc_main_t * mcm, - mc_msg_join_reply_t * mp, u32 buffer_index) -{ - mc_stream_t *s; - - mc_byte_swap_msg_join_reply (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - if (!s || s->state != MC_STREAM_STATE_join_in_progress) - return; - - /* Switch to catchup state; next join reply - for this stream will be ignored. */ - s->state = MC_STREAM_STATE_catchup; - - mcm->joins_in_progress--; - mcm->transport.catchup_request_fun (mcm->transport.opaque, - mp->stream_index, mp->catchup_peer_id); -} - -void -mc_wait_for_stream_ready (mc_main_t * m, char *stream_name) -{ - mc_stream_t *s; - - while (1) - { - s = mc_stream_by_name (m, stream_name); - if (s) - break; - vlib_process_suspend (m->vlib_main, .1); - } - - /* It's OK to send a message in catchup and ready states. */ - if (s->state == MC_STREAM_STATE_catchup - || s->state == MC_STREAM_STATE_ready) - return; - - /* Otherwise we are waiting for a join to finish. */ - vlib_current_process_wait_for_one_time_event_vector - (m->vlib_main, &s->procs_waiting_for_join_done); -} - -u32 -mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index) -{ - mc_stream_t *s = mc_stream_by_index (mcm, stream_index); - vlib_main_t *vm = mcm->vlib_main; - mc_retry_t *r; - mc_msg_user_request_t *mp; - vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); - u32 ri; - - if (!s) - return 0; - - if (s->state != MC_STREAM_STATE_ready) - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_join_done); - - while (pool_elts (s->retry_pool) >= s->config.window_size) - { - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_open_window); - } - - pool_get (s->retry_pool, r); - ri = r - s->retry_pool; - - r->prev_index = s->retry_tail_index; - r->next_index = ~0; - s->retry_tail_index = ri; - - if (r->prev_index == ~0) - s->retry_head_index = ri; - else - { - mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index); - p->next_index = ri; - } - - vlib_buffer_advance (b, -sizeof (mp[0])); - mp = vlib_buffer_get_current (b); - - mp->peer_id = mcm->transport.our_ack_peer_id; - /* mp->transport.global_sequence set by relay agent. */ - mp->global_sequence = 0xdeadbeef; - mp->stream_index = s->index; - mp->local_sequence = s->our_local_sequence++; - mp->n_data_bytes = - vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]); - - r->buffer_index = buffer_index; - r->local_sequence = mp->local_sequence; - r->sent_at = vlib_time_now (vm); - r->n_retries = 0; - - /* Retry will be freed when all currently known peers have acked. */ - vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1); - vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap); - - hash_set (s->retry_index_by_local_sequence, r->local_sequence, - r - s->retry_pool); - - elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries); - - mc_byte_swap_msg_user_request (mp); - - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index); - - s->user_requests_sent++; - - /* return amount of window remaining */ - return s->config.window_size - pool_elts (s->retry_pool); -} - -void -mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, - u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_t *s; - mc_stream_peer_t *peer; - i32 seq_cmp_result; - static int once = 0; - - mc_byte_swap_msg_user_request (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - /* Not signed up for this stream? Turf-o-matic */ - if (!s || s->state != MC_STREAM_STATE_ready) - { - vlib_buffer_free_one (vm, buffer_index); - return; - } - - /* Find peer, including ourselves. */ - peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, - /* created */ 0); - - seq_cmp_result = mc_seq_cmp (mp->local_sequence, - peer->last_sequence_received + 1); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d", - .format_args = "T4i4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, stream_index, rx_sequence; - i32 seq_cmp_result; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - ed->stream_index = mp->stream_index; - ed->rx_sequence = mp->local_sequence; - ed->seq_cmp_result = seq_cmp_result; - } - - if (0 && mp->stream_index == 1 && once == 0) - { - once = 1; - ELOG_TYPE (e, "FAKE lost msg on stream 1"); - ELOG (mcm->elog_main, e, 0); - return; - } - - peer->last_sequence_received += seq_cmp_result == 0; - s->user_requests_received++; - - if (seq_cmp_result > 0) - peer->stats.n_msgs_from_future += 1; - - /* Send ack even if msg from future */ - if (1) - { - mc_msg_user_ack_t *rp; - u32 bi; - - rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi); - rp->peer_id = mcm->transport.our_ack_peer_id; - rp->stream_index = s->index; - rp->local_sequence = mp->local_sequence; - rp->seq_cmp_result = seq_cmp_result; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-ack: stream %d local seq %d", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = rp->stream_index; - ed->local_sequence = rp->local_sequence; - } - - mc_byte_swap_msg_user_ack (rp); - - mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi); - /* Msg from past? If so, free the buffer... */ - if (seq_cmp_result < 0) - { - vlib_buffer_free_one (vm, buffer_index); - peer->stats.n_msgs_from_past += 1; - } - } - - if (seq_cmp_result == 0) - { - vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); - switch (s->state) - { - case MC_STREAM_STATE_ready: - vlib_buffer_advance (b, sizeof (mp[0])); - s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index); - - /* Stream vector can change address via rx callback for mc-internal - stream. */ - s = mc_stream_by_index (mcm, mp->stream_index); - ASSERT (s != 0); - s->last_global_sequence_processed = mp->global_sequence; - break; - - case MC_STREAM_STATE_catchup: - clib_fifo_add1 (s->catchup_fifo, buffer_index); - break; - - default: - clib_warning ("stream in unknown state %U", - format_mc_stream_state, s->state); - break; - } - } -} - -void -mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, - u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - uword *p; - mc_stream_t *s; - mc_stream_peer_t *peer; - mc_retry_t *r; - int peer_created = 0; - - mc_byte_swap_msg_user_ack (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "rx-ack: local seq %d peer %s seq_cmp_result %d", - .format_args = "i4T4i4", - }; - /* *INDENT-ON* */ - - struct - { - u32 local_sequence; - u32 peer; - i32 seq_cmp_result; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - ed->seq_cmp_result = mp->seq_cmp_result; - } - - /* Unknown stream? */ - if (!s) - return; - - /* Find the peer which just ack'ed. */ - peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, - /* created */ &peer_created); - - /* - * Peer reports message from the future. If it's not in the retry - * fifo, look for a retired message. - */ - if (mp->seq_cmp_result > 0) - { - p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - - mp->seq_cmp_result); - if (p == 0) - mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result); - - /* Normal retry should fix it... */ - return; - } - - /* - * Pointer to the indicated retry fifo entry. - * Worth hashing because we could use a window size of 100 or 1000. - */ - p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence); - - /* - * Is this a duplicate ACK, received after we've retired the - * fifo entry. This can happen when learning about new - * peers. - */ - if (p == 0) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: for seq %d from peer %s no fifo elt", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - } - - return; - } - - r = pool_elt_at_index (s->retry_pool, p[0]); - - /* Make sure that this new peer ACKs our msgs from now on */ - if (peer_created) - { - mc_retry_t *later_retry = next_retry (s, r); - - while (later_retry) - { - later_retry->unacked_by_peer_bitmap = - clib_bitmap_ori (later_retry->unacked_by_peer_bitmap, - peer - s->peers); - later_retry = next_retry (s, later_retry); - } - } - - ASSERT (mp->local_sequence == r->local_sequence); - - /* If we weren't expecting to hear from this peer */ - if (!peer_created && - !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers)) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "dup-ack: for seq %d from peer %s", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = r->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - } - if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) - return; - } - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: for seq %d from peer %s", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - } - - r->unacked_by_peer_bitmap = - clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers); - - /* Not all clients have ack'ed */ - if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) - { - return; - } - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: retire fifo elt loc seq %d after %d acks", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 npeers; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = r->local_sequence; - ed->npeers = pool_elts (s->peers); - } - - hash_unset (s->retry_index_by_local_sequence, mp->local_sequence); - mc_retry_free (mcm, s, r); - remove_retry_from_pool (s, r); - maybe_send_window_open_event (vm, s); -} - -#define EVENT_MC_SEND_CATCHUP_DATA 0 - -static uword -mc_catchup_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - uword *event_data = 0; - mc_catchup_process_arg_t *args; - int i; - - while (1) - { - if (event_data) - _vec_len (event_data) = 0; - vlib_process_wait_for_event_with_type (vm, &event_data, - EVENT_MC_SEND_CATCHUP_DATA); - - for (i = 0; i < vec_len (event_data); i++) - { - args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]); - - mcm->transport.catchup_send_fun (mcm->transport.opaque, - args->catchup_opaque, - args->catchup_snapshot); - - /* Send function will free snapshot data vector. */ - pool_put (mcm->catchup_process_args, args); - } - } - - return 0; /* not likely */ -} - -static void -serialize_mc_stream (serialize_main_t * m, va_list * va) -{ - mc_stream_t *s = va_arg (*va, mc_stream_t *); - mc_stream_peer_t *p; - - serialize_integer (m, pool_elts (s->peers), sizeof (u32)); - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - u8 * x = serialize_get (m, sizeof (p->id)); - clib_memcpy (x, p->id.as_u8, sizeof (p->id)); - serialize_integer (m, p->last_sequence_received, - sizeof (p->last_sequence_received)); - })); -/* *INDENT-ON* */ - serialize_bitmap (m, s->all_peer_bitmap); -} - -void -unserialize_mc_stream (serialize_main_t * m, va_list * va) -{ - mc_stream_t *s = va_arg (*va, mc_stream_t *); - u32 i, n_peers; - mc_stream_peer_t *p; - - unserialize_integer (m, &n_peers, sizeof (u32)); - mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); - for (i = 0; i < n_peers; i++) - { - u8 *x; - pool_get (s->peers, p); - x = unserialize_get (m, sizeof (p->id)); - clib_memcpy (p->id.as_u8, x, sizeof (p->id)); - unserialize_integer (m, &p->last_sequence_received, - sizeof (p->last_sequence_received)); - mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ - 0); - } - s->all_peer_bitmap = unserialize_bitmap (m); - - /* This is really bad. */ - if (!s->all_peer_bitmap) - clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name); -} - -void -mc_msg_catchup_request_handler (mc_main_t * mcm, - mc_msg_catchup_request_t * req, - u32 catchup_opaque) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_t *s; - mc_catchup_process_arg_t *args; - - mc_byte_swap_msg_catchup_request (req); - - s = mc_stream_by_index (mcm, req->stream_index); - if (!s || s->state != MC_STREAM_STATE_ready) - return; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup-request: from %s stream %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, stream; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64); - ed->stream = req->stream_index; - } - - /* - * The application has to snapshoot its data structures right - * here, right now. If we process any messages after - * noting the last global sequence we've processed, the client - * won't be able to accurately reconstruct our data structures. - * - * Once the data structures are e.g. vec_dup()'ed, we - * send the resulting messages from a separate process, to - * make sure that we don't cause a bunch of message retransmissions - */ - pool_get (mcm->catchup_process_args, args); - - args->stream_index = s - mcm->stream_vector; - args->catchup_opaque = catchup_opaque; - args->catchup_snapshot = 0; - - /* Construct catchup reply and snapshot state for stream to send as - catchup reply payload. */ - { - mc_msg_catchup_reply_t *rep; - serialize_main_t m; - - vec_resize (args->catchup_snapshot, sizeof (rep[0])); - - rep = (void *) args->catchup_snapshot; - - rep->peer_id = req->peer_id; - rep->stream_index = req->stream_index; - rep->last_global_sequence_included = s->last_global_sequence_processed; - - /* Setup for serialize to append to catchup snapshot. */ - serialize_open_vector (&m, args->catchup_snapshot); - m.stream.current_buffer_index = vec_len (m.stream.buffer); - - serialize (&m, serialize_mc_stream, s); - - args->catchup_snapshot = serialize_close_vector (&m); - - /* Actually copy internal state */ - args->catchup_snapshot = s->config.catchup_snapshot - (mcm, args->catchup_snapshot, rep->last_global_sequence_included); - - rep = (void *) args->catchup_snapshot; - rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]); - - mc_byte_swap_msg_catchup_reply (rep); - } - - /* now go send it... */ - vlib_process_signal_event (vm, mcm->catchup_process, - EVENT_MC_SEND_CATCHUP_DATA, - args - mcm->catchup_process_args); -} - -#define EVENT_MC_UNSERIALIZE_BUFFER 0 -#define EVENT_MC_UNSERIALIZE_CATCHUP 1 - -void -mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, - u32 catchup_opaque) -{ - vlib_process_signal_event (mcm->vlib_main, - mcm->unserialize_process, - EVENT_MC_UNSERIALIZE_CATCHUP, - pointer_to_uword (mp)); -} - -static void -perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp) -{ - mc_stream_t *s; - i32 seq_cmp_result; - - mc_byte_swap_msg_catchup_reply (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - /* Never heard of this stream or already caught up. */ - if (!s || s->state == MC_STREAM_STATE_ready) - return; - - { - serialize_main_t m; - mc_stream_peer_t *p; - u32 n_stream_bytes; - - /* For offline sim replay: save the entire catchup snapshot... */ - if (s->config.save_snapshot) - s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, - mp->n_data_bytes); - - unserialize_open_data (&m, mp->data, mp->n_data_bytes); - unserialize (&m, unserialize_mc_stream, s); - - /* Make sure we start numbering our messages as expected */ - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64) - s->our_local_sequence = p->last_sequence_received + 1; - })); -/* *INDENT-ON* */ - - n_stream_bytes = m.stream.current_buffer_index; - - /* No need to unserialize close; nothing to free. */ - - /* After serialized stream is user's catchup data. */ - s->config.catchup (mcm, mp->data + n_stream_bytes, - mp->n_data_bytes - n_stream_bytes); - } - - /* Vector could have been moved by catchup. - This can only happen for mc-internal stream. */ - s = mc_stream_by_index (mcm, mp->stream_index); - - s->last_global_sequence_processed = mp->last_global_sequence_included; - - while (clib_fifo_elts (s->catchup_fifo)) - { - mc_msg_user_request_t *gp; - u32 bi; - vlib_buffer_t *b; - - clib_fifo_sub1 (s->catchup_fifo, bi); - - b = vlib_get_buffer (mcm->vlib_main, bi); - gp = vlib_buffer_get_current (b); - - /* Make sure we're replaying "new" news */ - seq_cmp_result = mc_seq_cmp (gp->global_sequence, - mp->last_global_sequence_included); - - if (seq_cmp_result > 0) - { - vlib_buffer_advance (b, sizeof (gp[0])); - s->config.rx_buffer (mcm, s, gp->peer_id, bi); - s->last_global_sequence_processed = gp->global_sequence; - - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup replay local sequence 0x%x", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = gp->local_sequence; - } - } - else - { - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup discard local sequence 0x%x", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = gp->local_sequence; - } - - vlib_buffer_free_one (mcm->vlib_main, bi); - } - } - - s->state = MC_STREAM_STATE_ready; - - /* Now that we are caught up wake up joining process. */ - { - vlib_one_time_waiting_process_t *wp; - vec_foreach (wp, s->procs_waiting_for_join_done) - vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); - if (s->procs_waiting_for_join_done) - _vec_len (s->procs_waiting_for_join_done) = 0; - } -} - -static void -this_node_maybe_master (mc_main_t * mcm) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_msg_master_assert_t *mp; - uword event_type; - int timeouts = 0; - int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER; - clib_error_t *error; - f64 now, time_last_master_assert = -1; - u32 bi; - - while (1) - { - if (!mcm->we_can_be_relay_master) - { - mcm->relay_state = MC_RELAY_STATE_SLAVE; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become slave (config)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - - now = vlib_time_now (vm); - if (now >= time_last_master_assert + 1) - { - time_last_master_assert = now; - mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi); - - mp->peer_id = mcm->transport.our_ack_peer_id; - mp->global_sequence = mcm->relay_global_sequence; - - /* - * these messages clog the event log, set MC_EVENT_LOGGING higher - * if you want them - */ - if (MC_EVENT_LOGGING > 1) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-massert: peer %s global seq %u", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, global_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - ed->global_sequence = mp->global_sequence; - } - - mc_byte_swap_msg_master_assert (mp); - - error = - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_MASTERSHIP, bi); - if (error) - clib_error_report (error); - } - - vlib_process_wait_for_event_or_clock (vm, 1.0); - event_type = vlib_process_get_events (vm, /* no event data */ 0); - - switch (event_type) - { - case ~0: - if (!is_master && timeouts++ > 2) - { - mcm->relay_state = MC_RELAY_STATE_MASTER; - mcm->relay_master_peer_id = - mcm->transport.our_ack_peer_id.as_u64; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become master (was maybe_master)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - break; - - case MC_RELAY_STATE_SLAVE: - mcm->relay_state = MC_RELAY_STATE_SLAVE; - if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE) - { - ELOG_TYPE (e, "become slave (was maybe_master)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - } -} - -static void -this_node_slave (mc_main_t * mcm) -{ - vlib_main_t *vm = mcm->vlib_main; - uword event_type; - int timeouts = 0; - - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become slave"); - ELOG (mcm->elog_main, e, 0); - } - - while (1) - { - vlib_process_wait_for_event_or_clock (vm, 1.0); - event_type = vlib_process_get_events (vm, /* no event data */ 0); - - switch (event_type) - { - case ~0: - if (timeouts++ > 2) - { - mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; - mcm->relay_master_peer_id = ~0ULL; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "timeouts; negoitate mastership"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - break; - - case MC_RELAY_STATE_SLAVE: - mcm->relay_state = MC_RELAY_STATE_SLAVE; - timeouts = 0; - break; - } - } -} - -static uword -mc_mastership_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - - while (1) - { - switch (mcm->relay_state) - { - case MC_RELAY_STATE_NEGOTIATE: - case MC_RELAY_STATE_MASTER: - this_node_maybe_master (mcm); - break; - - case MC_RELAY_STATE_SLAVE: - this_node_slave (mcm); - break; - } - } - return 0; /* not likely */ -} - -void -mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master) -{ - if (we_can_be_master != mcm->we_can_be_relay_master) - { - mcm->we_can_be_relay_master = we_can_be_master; - vlib_process_signal_event (mcm->vlib_main, - mcm->mastership_process, - MC_RELAY_STATE_NEGOTIATE, 0); - } -} - -void -mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, - u32 buffer_index) -{ - mc_peer_id_t his_peer_id, our_peer_id; - i32 seq_cmp_result; - u8 signal_slave = 0; - u8 update_global_sequence = 0; - - mc_byte_swap_msg_master_assert (mp); - - his_peer_id = mp->peer_id; - our_peer_id = mcm->transport.our_ack_peer_id; - - /* compare the incoming global sequence with ours */ - seq_cmp_result = mc_seq_cmp (mp->global_sequence, - mcm->relay_global_sequence); - - /* If the sender has a lower peer id and the sender's sequence >= - our global sequence, we become a slave. Otherwise we are master. */ - if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 - && seq_cmp_result >= 0) - { - vlib_process_signal_event (mcm->vlib_main, - mcm->mastership_process, - MC_RELAY_STATE_SLAVE, 0); - signal_slave = 1; - } - - /* Update our global sequence. */ - if (seq_cmp_result > 0) - { - mcm->relay_global_sequence = mp->global_sequence; - update_global_sequence = 1; - } - - { - uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id); - mc_mastership_peer_t *p; - - if (q) - p = vec_elt_at_index (mcm->mastership_peers, q[0]); - else - { - vec_add2 (mcm->mastership_peers, p, 1); - p->peer_id = his_peer_id; - mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, - p - mcm->mastership_peers, - /* old_value */ 0); - } - p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main); - } - - /* - * these messages clog the event log, set MC_EVENT_LOGGING higher - * if you want them. - */ - if (MC_EVENT_LOGGING > 1) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "rx-massert: peer %s global seq %u upd %d slave %d", - .format_args = "T4i4i1i1", - }; - /* *INDENT-ON* */ - - struct - { - u32 peer; - u32 global_sequence; - u8 update_sequence; - u8 slave; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64); - ed->global_sequence = mp->global_sequence; - ed->update_sequence = update_global_sequence; - ed->slave = signal_slave; - } -} - -static void -mc_serialize_init (mc_main_t * mcm) -{ - mc_serialize_msg_t *m; - vlib_main_t *vm = vlib_get_main (); - - mcm->global_msg_index_by_name - = hash_create_string ( /* elts */ 0, sizeof (uword)); - - m = vm->mc_msg_registrations; - - while (m) - { - m->global_index = vec_len (mcm->global_msgs); - hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index); - vec_add1 (mcm->global_msgs, m); - m = m->next_registration; - } -} - -clib_error_t * -mc_serialize_va (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, va_list * va) -{ - mc_stream_t *s; - clib_error_t *error; - serialize_main_t *m = &mc->serialize_mains[VLIB_TX]; - vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX]; - u32 bi, n_before, n_after, n_total, n_this_msg; - u32 si, gi; - - if (!sbm->vlib_main) - { - sbm->tx.max_n_data_bytes_per_chain = 4096; - sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX; - } - - if (sbm->first_buffer == 0) - serialize_open_vlib_buffer (m, mc->vlib_main, sbm); - - n_before = serialize_vlib_buffer_n_bytes (m); - - s = mc_stream_by_index (mc, stream_index); - gi = msg->global_index; - ASSERT (msg == vec_elt (mc->global_msgs, gi)); - - si = ~0; - if (gi < vec_len (s->stream_msg_index_by_global_index)) - si = s->stream_msg_index_by_global_index[gi]; - - serialize_likely_small_unsigned_integer (m, si); - - /* For first time message is sent, use name to identify message. */ - if (si == ~0 || MSG_ID_DEBUG) - serialize_cstring (m, msg->name); - - if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "serialize-msg: %s index %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[2]; - } *ed; - ed = ELOG_DATA (mc->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mc, msg->name); - ed->c[1] = si; - } - - error = va_serialize (m, va); - - n_after = serialize_vlib_buffer_n_bytes (m); - n_this_msg = n_after - n_before; - n_total = n_after + sizeof (mc_msg_user_request_t); - - /* For max message size ignore first message where string name is sent. */ - if (si != ~0) - msg->max_n_bytes_serialized = - clib_max (msg->max_n_bytes_serialized, n_this_msg); - - if (!multiple_messages_per_vlib_buffer - || si == ~0 - || n_total + msg->max_n_bytes_serialized > - mc->transport.max_packet_size) - { - bi = serialize_close_vlib_buffer (m); - sbm->first_buffer = 0; - if (!error) - mc_stream_send (mc, stream_index, bi); - else if (bi != ~0) - vlib_buffer_free_one (mc->vlib_main, bi); - } - - return error; -} - -clib_error_t * -mc_serialize_internal (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, ...) -{ - vlib_main_t *vm = mc->vlib_main; - va_list va; - clib_error_t *error; - - if (stream_index == ~0) - { - if (vm->mc_main && vm->mc_stream_index == ~0) - vlib_current_process_wait_for_one_time_event_vector - (vm, &vm->procs_waiting_for_mc_stream_join); - stream_index = vm->mc_stream_index; - } - - va_start (va, msg); - error = mc_serialize_va (mc, stream_index, - multiple_messages_per_vlib_buffer, msg, &va); - va_end (va); - return error; -} - -uword -mc_unserialize_message (mc_main_t * mcm, - mc_stream_t * s, serialize_main_t * m) -{ - mc_serialize_stream_msg_t *sm; - u32 gi, si; - - si = unserialize_likely_small_unsigned_integer (m); - - if (!(si == ~0 || MSG_ID_DEBUG)) - { - sm = vec_elt_at_index (s->stream_msgs, si); - gi = sm->global_index; - } - else - { - char *name; - - unserialize_cstring (m, &name); - - if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "unserialize-msg: %s rx index %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[2]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - } - - { - uword *p = hash_get_mem (mcm->global_msg_index_by_name, name); - gi = p ? p[0] : ~0; - } - - /* Unknown message? */ - if (gi == ~0) - { - vec_free (name); - goto done; - } - - vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); - si = s->stream_msg_index_by_global_index[gi]; - - /* Stream local index unknown? Create it. */ - if (si == ~0) - { - vec_add2 (s->stream_msgs, sm, 1); - - si = sm - s->stream_msgs; - sm->global_index = gi; - s->stream_msg_index_by_global_index[gi] = si; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "msg-bind: stream %d %s to index %d", - .format_args = "i4T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[3]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = s->index; - ed->c[1] = elog_id_for_msg_name (mcm, name); - ed->c[2] = si; - } - } - else - { - sm = vec_elt_at_index (s->stream_msgs, si); - if (gi != sm->global_index && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "msg-id-ERROR: %s index %d expected %d", - .format_args = "T4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[3]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - ed->c[2] = ~0; - if (sm->global_index < - vec_len (s->stream_msg_index_by_global_index)) - ed->c[2] = - s->stream_msg_index_by_global_index[sm->global_index]; - } - } - - vec_free (name); - } - - if (gi != ~0) - { - mc_serialize_msg_t *msg; - msg = vec_elt (mcm->global_msgs, gi); - unserialize (m, msg->unserialize, mcm); - } - -done: - return gi != ~0; -} - -void -mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - serialize_main_t *m = &mcm->serialize_mains[VLIB_RX]; - vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX]; - mc_stream_and_buffer_t *sb; - mc_stream_t *stream; - u32 buffer_index; - - sb = - pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, - stream_and_buffer_index); - buffer_index = sb->buffer_index; - stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index); - pool_put (mcm->mc_unserialize_stream_and_buffers, sb); - - if (stream->config.save_snapshot) - { - u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index); - static u8 *contents; - vec_reset_length (contents); - vec_validate (contents, n_bytes - 1); - vlib_buffer_contents (vm, buffer_index, contents); - stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, - n_bytes); - } - - ASSERT (vlib_in_process_context (vm)); - - unserialize_open_vlib_buffer (m, vm, sbm); - - clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index); - - while (unserialize_vlib_buffer_n_bytes (m) > 0) - mc_unserialize_message (mcm, stream, m); - - /* Frees buffer. */ - unserialize_close_vlib_buffer (m); -} - -void -mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_and_buffer_t *sb; - pool_get (mcm->mc_unserialize_stream_and_buffers, sb); - sb->stream_index = s->index; - sb->buffer_index = buffer_index; - vlib_process_signal_event (vm, mcm->unserialize_process, - EVENT_MC_UNSERIALIZE_BUFFER, - sb - mcm->mc_unserialize_stream_and_buffers); -} - -static uword -mc_unserialize_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - uword event_type, *event_data = 0; - int i; - - while (1) - { - if (event_data) - _vec_len (event_data) = 0; - - vlib_process_wait_for_event (vm); - event_type = vlib_process_get_events (vm, &event_data); - switch (event_type) - { - case EVENT_MC_UNSERIALIZE_BUFFER: - for (i = 0; i < vec_len (event_data); i++) - mc_unserialize_internal (mcm, event_data[i]); - break; - - case EVENT_MC_UNSERIALIZE_CATCHUP: - for (i = 0; i < vec_len (event_data); i++) - { - u8 *mp = uword_to_pointer (event_data[i], u8 *); - perform_catchup (mcm, (void *) mp); - vec_free (mp); - } - break; - - default: - break; - } - } - - return 0; /* not likely */ -} - -void -serialize_mc_main (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - mc_stream_t *s; - mc_serialize_stream_msg_t *sm; - mc_serialize_msg_t *msg; - - serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32)); - vec_foreach (s, mcm->stream_vector) - { - /* Stream name. */ - serialize_cstring (m, s->config.name); - - /* Serialize global names for all sent messages. */ - serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32)); - vec_foreach (sm, s->stream_msgs) - { - msg = vec_elt (mcm->global_msgs, sm->global_index); - serialize_cstring (m, msg->name); - } - } -} - -void -unserialize_mc_main (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - u32 i, n_streams, n_stream_msgs; - char *name; - mc_stream_t *s; - mc_serialize_stream_msg_t *sm; - - unserialize_integer (m, &n_streams, sizeof (u32)); - for (i = 0; i < n_streams; i++) - { - unserialize_cstring (m, &name); - if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name)) - { - vec_validate (mcm->stream_vector, i); - s = vec_elt_at_index (mcm->stream_vector, i); - mc_stream_init (s); - s->index = s - mcm->stream_vector; - s->config.name = name; - s->state = MC_STREAM_STATE_name_known; - hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index); - } - else - vec_free (name); - - s = vec_elt_at_index (mcm->stream_vector, i); - - vec_free (s->stream_msgs); - vec_free (s->stream_msg_index_by_global_index); - - unserialize_integer (m, &n_stream_msgs, sizeof (u32)); - vec_resize (s->stream_msgs, n_stream_msgs); - vec_foreach (sm, s->stream_msgs) - { - uword *p; - u32 si, gi; - - unserialize_cstring (m, &name); - p = hash_get (mcm->global_msg_index_by_name, name); - gi = p ? p[0] : ~0; - si = sm - s->stream_msgs; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "catchup-bind: %s to %d global index %d stream %d", - .format_args = "T4i4i4i4", - }; - /* *INDENT-ON* */ - - struct - { - u32 c[4]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - ed->c[2] = gi; - ed->c[3] = s->index; - } - - vec_free (name); - - sm->global_index = gi; - if (gi != ~0) - { - vec_validate_init_empty (s->stream_msg_index_by_global_index, - gi, ~0); - s->stream_msg_index_by_global_index[gi] = si; - } - } - } -} - -void -mc_main_init (mc_main_t * mcm, char *tag) -{ - vlib_main_t *vm = vlib_get_main (); - - mcm->vlib_main = vm; - mcm->elog_main = &vm->elog_main; - - mcm->relay_master_peer_id = ~0ULL; - mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; - - mcm->stream_index_by_name - = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); - - { - vlib_node_registration_t r; - - memset (&r, 0, sizeof (r)); - - r.type = VLIB_NODE_TYPE_PROCESS; - - /* Point runtime data to main instance. */ - r.runtime_data = &mcm; - r.runtime_data_bytes = sizeof (&mcm); - - r.name = (char *) format (0, "mc-mastership-%s", tag); - r.function = mc_mastership_process; - mcm->mastership_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-join-ager-%s", tag); - r.function = mc_join_ager_process; - mcm->join_ager_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-retry-%s", tag); - r.function = mc_retry_process; - mcm->retry_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-catchup-%s", tag); - r.function = mc_catchup_process; - mcm->catchup_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-unserialize-%s", tag); - r.function = mc_unserialize_process; - mcm->unserialize_process = vlib_register_node (vm, &r); - } - - if (MC_EVENT_LOGGING > 0) - mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), - sizeof (mc_peer_id_t)); - - mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), - sizeof (mc_peer_id_t)); - mc_serialize_init (mcm); -} - -static u8 * -format_mc_relay_state (u8 * s, va_list * args) -{ - mc_relay_state_t state = va_arg (*args, mc_relay_state_t); - char *t = 0; - switch (state) - { - case MC_RELAY_STATE_NEGOTIATE: - t = "negotiate"; - break; - case MC_RELAY_STATE_MASTER: - t = "master"; - break; - case MC_RELAY_STATE_SLAVE: - t = "slave"; - break; - default: - return format (s, "unknown 0x%x", state); - } - - return format (s, "%s", t); -} - -static u8 * -format_mc_stream_state (u8 * s, va_list * args) -{ - mc_stream_state_t state = va_arg (*args, mc_stream_state_t); - char *t = 0; - switch (state) - { -#define _(f) case MC_STREAM_STATE_##f: t = #f; break; - foreach_mc_stream_state -#undef _ - default: - return format (s, "unknown 0x%x", state); - } - - return format (s, "%s", t); -} - -static int -mc_peer_comp (void *a1, void *a2) -{ - mc_stream_peer_t *p1 = a1; - mc_stream_peer_t *p2 = a2; - - return mc_peer_id_compare (p1->id, p2->id); -} - -u8 * -format_mc_main (u8 * s, va_list * args) -{ - mc_main_t *mcm = va_arg (*args, mc_main_t *); - mc_stream_t *t; - mc_stream_peer_t *p, *ps; - u32 indent = format_get_indent (s); - - s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", - format_mc_relay_state, mcm->relay_state, - vec_len (mcm->stream_vector), mcm->relay_global_sequence); - - { - mc_mastership_peer_t *mp; - f64 now = vlib_time_now (mcm->vlib_main); - s = format (s, "\n%UMost recent mastership peers:", - format_white_space, indent + 2); - vec_foreach (mp, mcm->mastership_peers) - { - s = format (s, "\n%U%-30U%.4e", - format_white_space, indent + 4, - mcm->transport.format_peer_id, mp->peer_id, - now - mp->time_last_master_assert_received); - } - } - - vec_foreach (t, mcm->stream_vector) - { - s = format (s, "\n%Ustream `%s' index %d", - format_white_space, indent + 2, t->config.name, t->index); - - s = format (s, "\n%Ustate %U", - format_white_space, indent + 4, - format_mc_stream_state, t->state); - - s = - format (s, - "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent", - format_white_space, indent + 4, t->config.retry_interval, - t->config.retry_limit, pool_elts (t->retry_pool), - t->stats.n_retries - t->stats_last_clear.n_retries); - - s = format (s, "\n%U%Ld/%Ld user requests sent/received", - format_white_space, indent + 4, - t->user_requests_sent, t->user_requests_received); - - s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x", - format_white_space, indent + 4, - pool_elts (t->peers), - t->our_local_sequence, t->last_global_sequence_processed); - - ps = 0; - /* *INDENT-OFF* */ - pool_foreach (p, t->peers, - ({ - if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers)) - vec_add1 (ps, p[0]); - })); - /* *INDENT-ON* */ - vec_sort_with_function (ps, mc_peer_comp); - s = format (s, "\n%U%=30s%10s%16s%16s", - format_white_space, indent + 6, - "Peer", "Last seq", "Retries", "Future"); - - vec_foreach (p, ps) - { - s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s", - format_white_space, indent + 6, - mcm->transport.format_peer_id, p->id.as_u64, - p->last_sequence_received, - p->stats.n_msgs_from_past - - p->stats_last_clear.n_msgs_from_past, - p->stats.n_msgs_from_future - - p->stats_last_clear.n_msgs_from_future, - (mcm->transport.our_ack_peer_id.as_u64 == - p->id.as_u64 ? " (self)" : "")); - } - vec_free (ps); - } - - return s; -} - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/vlib/mc.h b/src/vlib/mc.h deleted file mode 100644 index 28f94350537..00000000000 --- a/src/vlib/mc.h +++ /dev/null @@ -1,695 +0,0 @@ -/* - * mc.h: vlib reliable sequenced multicast distributed applications - * - * Copyright (c) 2010 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef included_vlib_mc_h -#define included_vlib_mc_h - -#include -#include -#include -#include - -#ifndef MC_EVENT_LOGGING -#define MC_EVENT_LOGGING 1 -#endif - -always_inline uword -mc_need_byte_swap (void) -{ - return CLIB_ARCH_IS_LITTLE_ENDIAN; -} - -/* - * Used to uniquely identify hosts. - * For IP4 this would be ip4_address plus tcp/udp port. - */ -typedef union -{ - u8 as_u8[8]; - u64 as_u64; -} mc_peer_id_t; - -always_inline mc_peer_id_t -mc_byte_swap_peer_id (mc_peer_id_t i) -{ - /* Peer id is already in network byte order. */ - return i; -} - -always_inline int -mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b) -{ - return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8)); -} - -/* Assert mastership. Lowest peer_id amount all peers wins mastership. - Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP). - So, we don't need a message opcode. */ -typedef CLIB_PACKED (struct - { - /* Peer id asserting mastership. */ - mc_peer_id_t peer_id; - /* Global sequence number asserted. */ - u32 global_sequence;}) mc_msg_master_assert_t; - -always_inline void -mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->global_sequence = clib_byte_swap_u32 (r->global_sequence); - } -} - -#define foreach_mc_msg_type \ - _ (master_assert) \ - _ (join_or_leave_request) \ - _ (join_reply) \ - _ (user_request) \ - _ (user_ack) \ - _ (catchup_request) \ - _ (catchup_reply) - -typedef enum -{ -#define _(f) MC_MSG_TYPE_##f, - foreach_mc_msg_type -#undef _ -} mc_relay_msg_type_t; - -/* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */ -typedef CLIB_PACKED (struct - { -mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; - /* MC_MSG_TYPE_join_or_leave_request */ - /* Stream to join or leave. */ - u32 stream_index; - /* join = 1, leave = 0 */ - u8 is_join;}) mc_msg_join_or_leave_request_t; - -always_inline void -mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->type = clib_byte_swap_u32 (r->type); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - } -} - -/* Join reply. Multicast over MC_TRANSPORT_JOIN. */ -typedef CLIB_PACKED (struct - { -mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; - /* MC_MSG_TYPE_join_reply */ - u32 stream_index; - /* Peer ID to contact to catchup with this stream. */ - mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t; - -always_inline void -mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->type = clib_byte_swap_u32 (r->type); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id); - } -} - -/* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then - relayed by relay master after filling in global sequence number. */ -typedef CLIB_PACKED (struct - { - mc_peer_id_t peer_id; u32 stream_index; - /* Global sequence number as filled in by relay master. */ - u32 global_sequence; - /* Local sequence number as filled in by peer sending message. */ - u32 local_sequence; - /* Size of request data. */ - u32 n_data_bytes; - /* Opaque request data. */ - u8 data[0];}) mc_msg_user_request_t; - -always_inline void -mc_byte_swap_msg_user_request (mc_msg_user_request_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - r->global_sequence = clib_byte_swap_u32 (r->global_sequence); - r->local_sequence = clib_byte_swap_u32 (r->local_sequence); - r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes); - } -} - -/* Sent unicast over ACK channel. */ -typedef CLIB_PACKED (struct - { - mc_peer_id_t peer_id; - u32 global_sequence; u32 stream_index; - u32 local_sequence; - i32 seq_cmp_result;}) mc_msg_user_ack_t; - -always_inline void -mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - r->global_sequence = clib_byte_swap_u32 (r->global_sequence); - r->local_sequence = clib_byte_swap_u32 (r->local_sequence); - r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result); - } -} - -/* Sent/received unicast over catchup channel (e.g. using TCP). */ -typedef CLIB_PACKED (struct - { - mc_peer_id_t peer_id; - u32 stream_index;}) mc_msg_catchup_request_t; - -always_inline void -mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - } -} - -/* Sent/received unicast over catchup channel. */ -typedef CLIB_PACKED (struct - { - mc_peer_id_t peer_id; u32 stream_index; - /* Last global sequence number included in catchup data. */ - u32 last_global_sequence_included; - /* Size of catchup data. */ - u32 n_data_bytes; - /* Catchup data. */ - u8 data[0];}) mc_msg_catchup_reply_t; - -always_inline void -mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r) -{ - if (mc_need_byte_swap ()) - { - r->peer_id = mc_byte_swap_peer_id (r->peer_id); - r->stream_index = clib_byte_swap_u32 (r->stream_index); - r->last_global_sequence_included = - clib_byte_swap_u32 (r->last_global_sequence_included); - r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes); - } -} - -typedef struct _mc_serialize_msg -{ - /* Name for this type. */ - char *name; - - /* Functions to serialize/unserialize data. */ - serialize_function_t *serialize; - serialize_function_t *unserialize; - - /* Maximum message size in bytes when serialized. - If zero then this will be set to the largest sent message. */ - u32 max_n_bytes_serialized; - - /* Opaque to use for first argument to serialize/unserialize function. */ - u32 opaque; - - /* Index in global message vector. */ - u32 global_index; - - /* Registration list */ - struct _mc_serialize_msg *next_registration; -} mc_serialize_msg_t; - -typedef struct -{ - /* Index into global message vector. */ - u32 global_index; -} mc_serialize_stream_msg_t; - -#define MC_SERIALIZE_MSG(x,...) \ - __VA_ARGS__ mc_serialize_msg_t x; \ -static void __mc_serialize_msg_registration_##x (void) \ - __attribute__((__constructor__)) ; \ -static void __mc_serialize_msg_registration_##x (void) \ -{ \ - vlib_main_t * vm = vlib_get_main(); \ - x.next_registration = vm->mc_msg_registrations; \ - vm->mc_msg_registrations = &x; \ -} \ -static void __mc_serialize_msg_unregistration_##x (void) \ - __attribute__((__destructor__)) ; \ -static void __mc_serialize_msg_unregistration_##x (void) \ -{ \ - vlib_main_t * vm = vlib_get_main(); \ - VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \ - next_registration); \ -} \ -__VA_ARGS__ mc_serialize_msg_t x - -typedef enum -{ - MC_TRANSPORT_MASTERSHIP, - MC_TRANSPORT_JOIN, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, - MC_TRANSPORT_USER_REQUEST_FROM_RELAY, - MC_N_TRANSPORT_TYPE, -} mc_transport_type_t; - -typedef struct -{ - clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type, - u32 buffer_index); - - clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id, - u32 buffer_index); - - /* Returns catchup opaque. */ - uword (*catchup_request_fun) (void *opaque, u32 stream_index, - mc_peer_id_t catchup_peer_id); - - void (*catchup_send_fun) (void *opaque, uword catchup_opaque, - u8 * data_vector); - - /* Opaque passed to callbacks. */ - void *opaque; - - mc_peer_id_t our_ack_peer_id; - mc_peer_id_t our_catchup_peer_id; - - /* Max packet size (MTU) for this transport. - For IP this is interface MTU less IP + UDP header size. */ - u32 max_packet_size; - - format_function_t *format_peer_id; -} mc_transport_t; - -typedef struct -{ - /* Count of messages received from this peer from the past/future - (with seq_cmp != 0). */ - u64 n_msgs_from_past; - u64 n_msgs_from_future; -} mc_stream_peer_stats_t; - -typedef struct -{ - /* ID of this peer. */ - mc_peer_id_t id; - - /* The last sequence we received from this peer. */ - u32 last_sequence_received; - - mc_stream_peer_stats_t stats, stats_last_clear; -} mc_stream_peer_t; - -typedef struct -{ - u32 buffer_index; - - /* Cached copy of local sequence number from buffer. */ - u32 local_sequence; - - /* Number of times this buffer has been sent (retried). */ - u32 n_retries; - - /* Previous/next retries in doubly-linked list. */ - u32 prev_index, next_index; - - /* Bitmap of all peers which have acked this msg */ - uword *unacked_by_peer_bitmap; - - /* Message send or resend time */ - f64 sent_at; -} mc_retry_t; - -typedef struct -{ - /* Number of retries sent for this stream. */ - u64 n_retries; -} mc_stream_stats_t; - -struct mc_main_t; -struct mc_stream_t; - -typedef struct -{ - /* Stream name. */ - char *name; - - /* Number of outstanding messages. */ - u32 window_size; - - /* Retry interval, in seconds */ - f64 retry_interval; - - /* Retry limit */ - u32 retry_limit; - - /* User rx buffer callback */ - void (*rx_buffer) (struct mc_main_t * mc_main, - struct mc_stream_t * stream, - mc_peer_id_t peer_id, u32 buffer_index); - - /* User callback to create a snapshot */ - u8 *(*catchup_snapshot) (struct mc_main_t * mc_main, - u8 * snapshot_vector, - u32 last_global_sequence_included); - - /* User callback to replay a snapshot */ - void (*catchup) (struct mc_main_t * mc_main, - u8 * snapshot_data, u32 n_snapshot_data_bytes); - - /* Callback to save a snapshot for offline replay */ - void (*save_snapshot) (struct mc_main_t * mc_main, - u32 is_catchup, - u8 * snapshot_data, u32 n_snapshot_data_bytes); - - /* Called when a peer dies */ - void (*peer_died) (struct mc_main_t * mc_main, - struct mc_stream_t * stream, mc_peer_id_t peer_id); -} mc_stream_config_t; - -#define foreach_mc_stream_state \ - _ (invalid) \ - _ (name_known) \ - _ (join_in_progress) \ - _ (catchup) \ - _ (ready) - -typedef enum -{ -#define _(f) MC_STREAM_STATE_##f, - foreach_mc_stream_state -#undef _ -} mc_stream_state_t; - -typedef struct mc_stream_t -{ - mc_stream_config_t config; - - mc_stream_state_t state; - - /* Index in stream pool. */ - u32 index; - - /* Stream index 0 is always for MC internal use. */ -#define MC_STREAM_INDEX_INTERNAL 0 - - mc_retry_t *retry_pool; - - /* Head and tail index of retry pool. */ - u32 retry_head_index, retry_tail_index; - - /* - * Country club for recently retired messages - * If the set of peers is expanding and a new peer - * misses a message, we can easily retire the FIFO - * element before we even know about the new peer - */ - mc_retry_t *retired_fifo; - - /* Hash mapping local sequence to retry pool index. */ - uword *retry_index_by_local_sequence; - - /* catch-up fifo of VLIB buffer indices. - start recording when catching up. */ - u32 *catchup_fifo; - - mc_stream_stats_t stats, stats_last_clear; - - /* Peer pool. */ - mc_stream_peer_t *peers; - - /* Bitmap with ones for all peers in peer pool. */ - uword *all_peer_bitmap; - - /* Map of 64 bit id to index in stream pool. */ - mhash_t peer_index_by_id; - - /* Timeout, in case we're alone in the world */ - f64 join_timeout; - - vlib_one_time_waiting_process_t *procs_waiting_for_join_done; - - vlib_one_time_waiting_process_t *procs_waiting_for_open_window; - - /* Next sequence number to use */ - u32 our_local_sequence; - - /* - * Last global sequence we processed. - * When supplying catchup data, we need to tell - * the client precisely where to start replaying - */ - u32 last_global_sequence_processed; - - /* Vector of unique messages we've sent on this stream. */ - mc_serialize_stream_msg_t *stream_msgs; - - /* Vector global message index into per stream message index. */ - u32 *stream_msg_index_by_global_index; - - /* Hashed by message name. */ - uword *stream_msg_index_by_name; - - u64 user_requests_sent; - u64 user_requests_received; -} mc_stream_t; - -always_inline void -mc_stream_free (mc_stream_t * s) -{ - pool_free (s->retry_pool); - hash_free (s->retry_index_by_local_sequence); - clib_fifo_free (s->catchup_fifo); - pool_free (s->peers); - mhash_free (&s->peer_index_by_id); - vec_free (s->procs_waiting_for_join_done); - vec_free (s->procs_waiting_for_open_window); -} - -always_inline void -mc_stream_init (mc_stream_t * s) -{ - memset (s, 0, sizeof (s[0])); - s->retry_head_index = s->retry_tail_index = ~0; -} - -typedef struct -{ - u32 stream_index; - u32 catchup_opaque; - u8 *catchup_snapshot; -} mc_catchup_process_arg_t; - -typedef enum -{ - MC_RELAY_STATE_NEGOTIATE, - MC_RELAY_STATE_MASTER, - MC_RELAY_STATE_SLAVE, -} mc_relay_state_t; - -typedef struct -{ - mc_peer_id_t peer_id; - - f64 time_last_master_assert_received; -} mc_mastership_peer_t; - -typedef struct -{ - u32 stream_index; - u32 buffer_index; -} mc_stream_and_buffer_t; - -typedef struct mc_main_t -{ - mc_relay_state_t relay_state; - - /* Mastership */ - u32 we_can_be_relay_master; - - u64 relay_master_peer_id; - - mc_mastership_peer_t *mastership_peers; - - /* Map of 64 bit id to index in stream pool. */ - mhash_t mastership_peer_index_by_id; - - /* The transport we're using. */ - mc_transport_t transport; - - /* Last-used global sequence number. */ - u32 relay_global_sequence; - - /* Vector of streams. */ - mc_stream_t *stream_vector; - - /* Hash table mapping stream name to pool index. */ - uword *stream_index_by_name; - - uword *procs_waiting_for_stream_name_by_name; - - vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool; - - int joins_in_progress; - - mc_catchup_process_arg_t *catchup_process_args; - - /* Node indices for mastership, join ager, - retry and catchup processes. */ - u32 mastership_process; - u32 join_ager_process; - u32 retry_process; - u32 catchup_process; - u32 unserialize_process; - - /* Global vector of messages. */ - mc_serialize_msg_t **global_msgs; - - /* Hash table mapping message name to index. */ - uword *global_msg_index_by_name; - - /* Shared serialize/unserialize main. */ - serialize_main_t serialize_mains[VLIB_N_RX_TX]; - - vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]; - - /* Convenience variables */ - struct vlib_main_t *vlib_main; - elog_main_t *elog_main; - - /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */ - mhash_t elog_id_by_peer_id; - - uword *elog_id_by_msg_name; - - /* For mc_unserialize. */ - mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers; -} mc_main_t; - -always_inline mc_stream_t * -mc_stream_by_name (mc_main_t * m, char *name) -{ - uword *p = hash_get (m->stream_index_by_name, name); - return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0; -} - -always_inline mc_stream_t * -mc_stream_by_index (mc_main_t * m, u32 i) -{ - return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0; -} - -always_inline void -mc_clear_stream_stats (mc_main_t * m) -{ - mc_stream_t *s; - mc_stream_peer_t *p; - vec_foreach (s, m->stream_vector) - { - s->stats_last_clear = s->stats; - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - p->stats_last_clear = p->stats; - })); - /* *INDENT-ON* */ - } -} - -/* Declare all message handlers. */ -#define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index); -foreach_mc_msg_type -#undef _ - u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *); - -void mc_stream_leave (mc_main_t * mcm, u32 stream_index); - -void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name); - -u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index); - -void mc_main_init (mc_main_t * mcm, char *tag); - -void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master); - -void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes, - u32 * bi_return); - -format_function_t format_mc_main; - -clib_error_t *mc_serialize_internal (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, ...); - -clib_error_t *mc_serialize_va (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, va_list * va); - -#define mc_serialize_stream(mc,si,msg,args...) \ - mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args) - -#define mc_serialize(mc,msg,args...) \ - mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args) - -#define mc_serialize2(mc,add,msg,args...) \ - mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args) - -void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index); -uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s, - serialize_main_t * m); - -serialize_function_t serialize_mc_main, unserialize_mc_main; - -always_inline uword -mc_max_message_size_in_bytes (mc_main_t * mcm) -{ - return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t); -} - -always_inline word -mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m) -{ - return mc_max_message_size_in_bytes (mcm) - - serialize_vlib_buffer_n_bytes (m); -} - -void unserialize_mc_stream (serialize_main_t * m, va_list * va); -void mc_stream_join_process_hold (void); - -#endif /* included_vlib_mc_h */ - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/vlib/threads.c b/src/vlib/threads.c index 055998adac8..981209bf2e7 100644 --- a/src/vlib/threads.c +++ b/src/vlib/threads.c @@ -43,8 +43,8 @@ vlib_thread_main_t vlib_thread_main; * imapacts observed timings. */ -static u32 -elog_id_for_msg_name (const char *msg_name) +u32 +elog_global_id_for_msg_name (const char *msg_name) { uword *p, r; static uword *h; @@ -85,7 +85,8 @@ barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed) ed = ELOG_DATA (&vlib_global_main.elog_main, e); ed->count = (int) vlib_worker_threads[0].barrier_sync_count; - ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller); + ed->caller = elog_global_id_for_msg_name + (vlib_worker_threads[0].barrier_caller); ed->t_entry = (int) (1000000.0 * t_entry); ed->t_open = (int) (1000000.0 * t_open); ed->t_closed = (int) (1000000.0 * t_closed); @@ -111,7 +112,8 @@ barrier_trace_sync_rec (f64 t_entry) ed = ELOG_DATA (&vlib_global_main.elog_main, e); ed->depth = (int) vlib_worker_threads[0].recursion_level - 1; - ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller); + ed->caller = elog_global_id_for_msg_name + (vlib_worker_threads[0].barrier_caller); } static inline void diff --git a/src/vlib/threads.h b/src/vlib/threads.h index 71b5d0c8261..bb7c164c2e3 100644 --- a/src/vlib/threads.h +++ b/src/vlib/threads.h @@ -597,6 +597,7 @@ vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t * args); void vlib_rpc_call_main_thread (void *function, u8 * args, u32 size); +u32 elog_global_id_for_msg_name (const char *msg_name); #endif /* included_vlib_threads_h */ /* diff --git a/src/vlib/threads_cli.c b/src/vlib/threads_cli.c index a47d86401bc..86c02479892 100644 --- a/src/vlib/threads_cli.c +++ b/src/vlib/threads_cli.c @@ -532,29 +532,6 @@ VLIB_CLI_COMMAND (cmd_test_frame_queue_threshold,static) = { }; /* *INDENT-ON* */ -static clib_error_t * -test_threads_barrier_elog_command_fn (vlib_main_t * vm, - unformat_input_t * input, - vlib_cli_command_t * cmd) -{ - if (unformat (input, "enable")) - vlib_worker_threads->barrier_elog_enabled = 1; - else if (unformat (input, "disable")) - vlib_worker_threads->barrier_elog_enabled = 0; - else - return clib_error_return (0, "please choose enable or disable"); - return 0; -} - -/* *INDENT-OFF* */ -VLIB_CLI_COMMAND (test_elog_vector_length_trigger, static) = -{ - .path = "test threads barrier-elog", - .short_help = "test threads barrier-elog", - .function = test_threads_barrier_elog_command_fn, -}; -/* *INDENT-ON* */ - /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vlib/vlib.h b/src/vlib/vlib.h index 21c5b2aa814..ba72f46aab2 100644 --- a/src/vlib/vlib.h +++ b/src/vlib/vlib.h @@ -56,7 +56,6 @@ struct vlib_main_t; #include #include #include -#include #include #include #include -- cgit 1.2.3-korg