diff options
author | Dave Barach <dave@barachs.net> | 2018-10-01 09:25:32 -0400 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2018-10-01 15:59:35 +0000 |
commit | c3a06556d1a4a63646d4cc7aa76274177a56c13f (patch) | |
tree | 76eee7fa1da3b882109c2fa9dbd8f94dfc18110b /src/vlib/mc.c | |
parent | 904a850899db0a40bf885d2ee2e839a5f8dfbeb3 (diff) |
API / CLI event-log tracing
Add an "elog trace [api][cli][barrier]" debug CLI command. Removed the
barrier elog test command. Remove unused reliable multicast code.
Change-Id: Ib3ecde901b7c49fe92b313d0087cd7e776adcdce
Signed-off-by: Dave Barach <dave@barachs.net>
Diffstat (limited to 'src/vlib/mc.c')
-rw-r--r-- | src/vlib/mc.c | 2609 |
1 files changed, 0 insertions, 2609 deletions
diff --git a/src/vlib/mc.c b/src/vlib/mc.c deleted file mode 100644 index a289871f570..00000000000 --- a/src/vlib/mc.c +++ /dev/null @@ -1,2609 +0,0 @@ -/* - * mc.c: vlib reliable sequenced multicast distributed applications - * - * Copyright (c) 2010 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <vlib/vlib.h> - -/* - * 1 to enable msg id training wheels, which are useful for tracking - * down catchup and/or partitioned network problems - */ -#define MSG_ID_DEBUG 0 - -static format_function_t format_mc_stream_state; - -static u32 -elog_id_for_peer_id (mc_main_t * m, u64 peer_id) -{ - uword *p, r; - mhash_t *h = &m->elog_id_by_peer_id; - - if (!m->elog_id_by_peer_id.hash) - mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t)); - - p = mhash_get (h, &peer_id); - if (p) - return p[0]; - r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id); - mhash_set (h, &peer_id, r, /* old_value */ 0); - return r; -} - -static u32 -elog_id_for_msg_name (mc_main_t * m, char *msg_name) -{ - uword *p, r; - uword *h = m->elog_id_by_msg_name; - u8 *name_copy; - - if (!h) - h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword)); - - p = hash_get_mem (h, msg_name); - if (p) - return p[0]; - r = elog_string (m->elog_main, "%s", msg_name); - - name_copy = format (0, "%s%c", msg_name, 0); - - hash_set_mem (h, name_copy, r); - m->elog_id_by_msg_name = h; - - return r; -} - -static void -elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, - u32 retry_count) -{ - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-msg: stream %d local seq %d attempt %d", - .format_args = "i4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_id, local_sequence, retry_count; - } *ed; - ed = ELOG_DATA (m->elog_main, e); - ed->stream_id = stream_id; - ed->local_sequence = local_sequence; - ed->retry_count = retry_count; - } -} - -/* - * seq_cmp - * correctly compare two unsigned sequence numbers. - * This function works so long as x and y are within 2**(n-1) of each - * other, where n = bits(x, y). - * - * Magic decoder ring: - * seq_cmp == 0 => x and y are equal - * seq_cmp < 0 => x is "in the past" with respect to y - * seq_cmp > 0 => x is "in the future" with respect to y - */ -always_inline i32 -mc_seq_cmp (u32 x, u32 y) -{ - return (i32) x - (i32) y; -} - -void * -mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return) -{ - u32 n_alloc, bi = 0; - vlib_buffer_t *b; - - n_alloc = vlib_buffer_alloc (vm, &bi, 1); - ASSERT (n_alloc == 1); - - b = vlib_get_buffer (vm, bi); - b->current_length = n_bytes; - *bi_return = bi; - return (void *) b->data; -} - -static void -delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s, - uword index, int notify_application) -{ - mc_stream_peer_t *p = pool_elt_at_index (s->peers, index); - ASSERT (p != 0); - if (s->config.peer_died && notify_application) - s->config.peer_died (mcm, s, p->id); - - s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "delete peer %s from all_peer_bitmap", - .format_args = "T4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer; - } *ed = 0; - - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - } - /* Do not delete the pool / hash table entries, or we lose sequence number state */ -} - -static mc_stream_peer_t * -get_or_create_peer_with_id (mc_main_t * mcm, - mc_stream_t * s, mc_peer_id_t id, int *created) -{ - uword *q = mhash_get (&s->peer_index_by_id, &id); - mc_stream_peer_t *p; - - if (q) - { - p = pool_elt_at_index (s->peers, q[0]); - goto done; - } - - pool_get (s->peers, p); - memset (p, 0, sizeof (p[0])); - p->id = id; - p->last_sequence_received = ~0; - mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0); - if (created) - *created = 1; - -done: - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "get_or_create %s peer %s stream %d seq %d", - .format_args = "t4T4i4i4", - .n_enum_strings = 2, - .enum_strings = { - "old", "new", - }, - }; - /* *INDENT-ON* */ - struct - { - u32 is_new, peer, stream_index, rx_sequence; - } *ed = 0; - - ed = ELOG_DATA (mcm->elog_main, e); - ed->is_new = q ? 0 : 1; - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->stream_index = s->index; - ed->rx_sequence = p->last_sequence_received; - } - /* $$$$ Enable or reenable this peer */ - s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers); - return p; -} - -static void -maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream) -{ - vlib_one_time_waiting_process_t *p; - - if (pool_elts (stream->retry_pool) >= stream->config.window_size) - return; - - vec_foreach (p, stream->procs_waiting_for_open_window) - vlib_signal_one_time_waiting_process (vm, p); - - if (stream->procs_waiting_for_open_window) - _vec_len (stream->procs_waiting_for_open_window) = 0; -} - -static void -mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r) -{ - mc_retry_t record, *retp; - - if (r->unacked_by_peer_bitmap) - _vec_len (r->unacked_by_peer_bitmap) = 0; - - if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) - { - clib_fifo_sub1 (s->retired_fifo, record); - vlib_buffer_free_one (mcm->vlib_main, record.buffer_index); - } - - clib_fifo_add2 (s->retired_fifo, retp); - - retp->buffer_index = r->buffer_index; - retp->local_sequence = r->local_sequence; - - r->buffer_index = ~0; /* poison buffer index in this retry */ -} - -static void -mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence) -{ - mc_retry_t *retry; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "resend-retired: search for local seq %d", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->local_sequence = local_sequence; - } - - /* *INDENT-OFF* */ - clib_fifo_foreach (retry, s->retired_fifo, - ({ - if (retry->local_sequence == local_sequence) - { - elog_tx_msg (mcm, s->index, retry-> local_sequence, -13); - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, - retry->buffer_index); - return; - } - })); - /* *INDENT-ON* */ - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "resend-retired: FAILED search for local seq %d", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->local_sequence = local_sequence; - } -} - -static uword * -delete_retry_fifo_elt (mc_main_t * mcm, - mc_stream_t * stream, - mc_retry_t * r, uword * dead_peer_bitmap) -{ - mc_stream_peer_t *p; - - /* *INDENT-OFF* */ - pool_foreach (p, stream->peers, ({ - uword pi = p - stream->peers; - uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi); - - if (! is_alive) - dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi); - - if (MC_EVENT_LOGGING > 0) - { - ELOG_TYPE_DECLARE (e) = { - .format = "delete_retry_fifo_elt: peer %s is %s", - .format_args = "T4t4", - .n_enum_strings = 2, - .enum_strings = { "alive", "dead", }, - }; - struct { u32 peer, is_alive; } * ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->is_alive = is_alive; - } - })); - /* *INDENT-ON* */ - - hash_unset (stream->retry_index_by_local_sequence, r->local_sequence); - mc_retry_free (mcm, stream, r); - - return dead_peer_bitmap; -} - -always_inline mc_retry_t * -prev_retry (mc_stream_t * s, mc_retry_t * r) -{ - return (r->prev_index != ~0 - ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0); -} - -always_inline mc_retry_t * -next_retry (mc_stream_t * s, mc_retry_t * r) -{ - return (r->next_index != ~0 - ? pool_elt_at_index (s->retry_pool, r->next_index) : 0); -} - -always_inline void -remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r) -{ - mc_retry_t *p = prev_retry (s, r); - mc_retry_t *n = next_retry (s, r); - - if (p) - p->next_index = r->next_index; - else - s->retry_head_index = r->next_index; - if (n) - n->prev_index = r->prev_index; - else - s->retry_tail_index = r->prev_index; - - pool_put_index (s->retry_pool, r - s->retry_pool); -} - -static void -check_retry (mc_main_t * mcm, mc_stream_t * s) -{ - mc_retry_t *r; - vlib_main_t *vm = mcm->vlib_main; - f64 now = vlib_time_now (vm); - uword *dead_peer_bitmap = 0; - u32 ri, ri_next; - - for (ri = s->retry_head_index; ri != ~0; ri = ri_next) - { - r = pool_elt_at_index (s->retry_pool, ri); - ri_next = r->next_index; - - if (now < r->sent_at + s->config.retry_interval) - continue; - - r->n_retries += 1; - if (r->n_retries > s->config.retry_limit) - { - dead_peer_bitmap = - delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap); - remove_retry_from_pool (s, r); - } - else - { - if (MC_EVENT_LOGGING > 0) - { - mc_stream_peer_t *p; - - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "resend local seq %d attempt %d", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers)) - { - ELOG_TYPE_DECLARE (ev) = { - .format = "resend: needed by peer %s local seq %d", - .format_args = "T4i4", - }; - struct { u32 peer, rx_sequence; } * ed; - ed = ELOG_DATA (mcm->elog_main, ev); - ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); - ed->rx_sequence = r->local_sequence; - } - })); - /* *INDENT-ON* */ - - struct - { - u32 sequence; - u32 trail; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->sequence = r->local_sequence; - ed->trail = r->n_retries; - } - - r->sent_at = vlib_time_now (vm); - s->stats.n_retries += 1; - - elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries); - - mcm->transport.tx_buffer - (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index); - } - } - - maybe_send_window_open_event (mcm->vlib_main, s); - - /* Delete any dead peers we've found. */ - if (!clib_bitmap_is_zero (dead_peer_bitmap)) - { - uword i; - - /* *INDENT-OFF* */ - clib_bitmap_foreach (i, dead_peer_bitmap, ({ - delete_peer_with_index (mcm, s, i, /* notify_application */ 1); - - /* Delete any references to just deleted peer in retry pool. */ - pool_foreach (r, s->retry_pool, ({ - r->unacked_by_peer_bitmap = - clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i); - })); - })); -/* *INDENT-ON* */ - clib_bitmap_free (dead_peer_bitmap); - } -} - -always_inline mc_main_t * -mc_node_get_main (vlib_node_runtime_t * node) -{ - mc_main_t **p = (void *) node->runtime_data; - return p[0]; -} - -static uword -mc_retry_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - mc_stream_t *s; - - while (1) - { - vlib_process_suspend (vm, 1.0); - vec_foreach (s, mcm->stream_vector) - { - if (s->state != MC_STREAM_STATE_invalid) - check_retry (mcm, s); - } - } - return 0; /* not likely */ -} - -static void -send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_msg_join_or_leave_request_t *mp; - u32 bi; - - mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); - memset (mp, 0, sizeof (*mp)); - mp->type = MC_MSG_TYPE_join_or_leave_request; - mp->peer_id = mcm->transport.our_ack_peer_id; - mp->stream_index = stream_index; - mp->is_join = is_join; - - mc_byte_swap_msg_join_or_leave_request (mp); - - /* - * These msgs are unnumbered, unordered so send on the from-relay - * channel. - */ - mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); -} - -static uword -mc_join_ager_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - - while (1) - { - if (mcm->joins_in_progress) - { - mc_stream_t *s; - vlib_one_time_waiting_process_t *p; - f64 now = vlib_time_now (vm); - - vec_foreach (s, mcm->stream_vector) - { - if (s->state != MC_STREAM_STATE_join_in_progress) - continue; - - if (now > s->join_timeout) - { - s->state = MC_STREAM_STATE_ready; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream %d join timeout", - }; - /* *INDENT-ON* */ - ELOG (mcm->elog_main, e, s->index); - } - /* Make sure that this app instance exists as a stream peer, - or we may answer a catchup request with a NULL - all_peer_bitmap... */ - (void) get_or_create_peer_with_id - (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0); - - vec_foreach (p, s->procs_waiting_for_join_done) - vlib_signal_one_time_waiting_process (vm, p); - if (s->procs_waiting_for_join_done) - _vec_len (s->procs_waiting_for_join_done) = 0; - - mcm->joins_in_progress--; - ASSERT (mcm->joins_in_progress >= 0); - } - else - { - /* Resent join request which may have been lost. */ - send_join_or_leave_request (mcm, s->index, 1 /* is_join */ ); - - /* We're *not* alone, retry for as long as it takes */ - if (mcm->relay_state == MC_RELAY_STATE_SLAVE) - s->join_timeout = vlib_time_now (vm) + 2.0; - - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream %d resend join request", - }; - /* *INDENT-ON* */ - ELOG (mcm->elog_main, e, s->index); - } - } - } - } - - vlib_process_suspend (vm, .5); - } - - return 0; /* not likely */ -} - -static void -serialize_mc_register_stream_name (serialize_main_t * m, va_list * va) -{ - char *name = va_arg (*va, char *); - serialize_cstring (m, name); -} - -static void -elog_stream_name (char *buf, int n_buf_bytes, char *v) -{ - clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v))); - buf[n_buf_bytes - 1] = 0; -} - -static void -unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - char *name; - mc_stream_t *s; - uword *p; - - unserialize_cstring (m, &name); - - if ((p = hash_get_mem (mcm->stream_index_by_name, name))) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d already named %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = p[0]; - elog_stream_name (ed->name, sizeof (ed->name), name); - } - - vec_free (name); - return; - } - - vec_add2 (mcm->stream_vector, s, 1); - mc_stream_init (s); - s->state = MC_STREAM_STATE_name_known; - s->index = s - mcm->stream_vector; - s->config.name = name; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d named %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = s->index; - elog_stream_name (ed->name, sizeof (ed->name), name); - } - - hash_set_mem (mcm->stream_index_by_name, name, s->index); - - p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name); - if (p) - { - vlib_one_time_waiting_process_t *wp, **w; - w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); - vec_foreach (wp, w[0]) - vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); - pool_put (mcm->procs_waiting_for_stream_name_pool, w); - hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name); - } -} - -/* *INDENT-OFF* */ -MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = -{ - .name = "mc_register_stream_name", - .serialize = serialize_mc_register_stream_name, - .unserialize = unserialize_mc_register_stream_name, -}; -/* *INDENT-ON* */ - -void -mc_rx_buffer_unserialize (mc_main_t * mcm, - mc_stream_t * stream, - mc_peer_id_t peer_id, u32 buffer_index) -{ - return mc_unserialize (mcm, stream, buffer_index); -} - -static u8 * -mc_internal_catchup_snapshot (mc_main_t * mcm, - u8 * data_vector, - u32 last_global_sequence_processed) -{ - serialize_main_t m; - - /* Append serialized data to data vector. */ - serialize_open_vector (&m, data_vector); - m.stream.current_buffer_index = vec_len (data_vector); - - serialize (&m, serialize_mc_main, mcm); - return serialize_close_vector (&m); -} - -static void -mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes) -{ - serialize_main_t s; - - unserialize_open_data (&s, data, n_data_bytes); - - unserialize (&s, unserialize_mc_main, mcm); -} - -/* Overridden from the application layer, not actually used here */ -void mc_stream_join_process_hold (void) __attribute__ ((weak)); -void -mc_stream_join_process_hold (void) -{ -} - -static u32 -mc_stream_join_helper (mc_main_t * mcm, - mc_stream_config_t * config, u32 is_internal) -{ - mc_stream_t *s; - vlib_main_t *vm = mcm->vlib_main; - - s = 0; - if (!is_internal) - { - uword *p; - - /* Already have a stream with given name? */ - if ((s = mc_stream_by_name (mcm, config->name))) - { - /* Already joined and ready? */ - if (s->state == MC_STREAM_STATE_ready) - return s->index; - } - - /* First join MC internal stream. */ - if (!mcm->stream_vector - || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state - == MC_STREAM_STATE_invalid)) - { - static mc_stream_config_t c = { - .name = "mc-internal", - .rx_buffer = mc_rx_buffer_unserialize, - .catchup = mc_internal_catchup, - .catchup_snapshot = mc_internal_catchup_snapshot, - }; - - c.save_snapshot = config->save_snapshot; - - mc_stream_join_helper (mcm, &c, /* is_internal */ 1); - } - - /* If stream is still unknown register this name and wait for - sequenced message to name stream. This way all peers agree - on stream name to index mappings. */ - s = mc_stream_by_name (mcm, config->name); - if (!s) - { - vlib_one_time_waiting_process_t *wp, **w; - u8 *name_copy = format (0, "%s", config->name); - - mc_serialize_stream (mcm, - MC_STREAM_INDEX_INTERNAL, - &mc_register_stream_name_msg, config->name); - - /* Wait for this stream to be named. */ - p = - hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, - name_copy); - if (p) - w = - pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, - p[0]); - else - { - pool_get (mcm->procs_waiting_for_stream_name_pool, w); - if (!mcm->procs_waiting_for_stream_name_by_name) - mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */ - sizeof - (uword)); - hash_set_mem (mcm->procs_waiting_for_stream_name_by_name, - name_copy, - w - mcm->procs_waiting_for_stream_name_pool); - w[0] = 0; - } - - vec_add2 (w[0], wp, 1); - vlib_current_process_wait_for_one_time_event (vm, wp); - vec_free (name_copy); - } - - /* Name should be known now. */ - s = mc_stream_by_name (mcm, config->name); - ASSERT (s != 0); - ASSERT (s->state == MC_STREAM_STATE_name_known); - } - - if (!s) - { - vec_add2 (mcm->stream_vector, s, 1); - mc_stream_init (s); - s->index = s - mcm->stream_vector; - } - - { - /* Save name since we could have already used it as hash key. */ - char *name_save = s->config.name; - - s->config = config[0]; - - if (name_save) - s->config.name = name_save; - } - - if (s->config.window_size == 0) - s->config.window_size = 8; - - if (s->config.retry_interval == 0.0) - s->config.retry_interval = 1.0; - - /* Sanity. */ - ASSERT (s->config.retry_interval < 30); - - if (s->config.retry_limit == 0) - s->config.retry_limit = 7; - - s->state = MC_STREAM_STATE_join_in_progress; - if (!s->peer_index_by_id.hash) - mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); - - /* If we don't hear from someone in 5 seconds, we're alone */ - s->join_timeout = vlib_time_now (vm) + 5.0; - mcm->joins_in_progress++; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "stream index %d join request %s", - .format_args = "i4s16", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - char name[16]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = s->index; - elog_stream_name (ed->name, sizeof (ed->name), s->config.name); - } - - send_join_or_leave_request (mcm, s->index, 1 /* join */ ); - - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_join_done); - - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "join complete stream %d"); - ELOG (mcm->elog_main, e, s->index); - } - - return s->index; -} - -u32 -mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config) -{ - return mc_stream_join_helper (mcm, config, /* is_internal */ 0); -} - -void -mc_stream_leave (mc_main_t * mcm, u32 stream_index) -{ - mc_stream_t *s = mc_stream_by_index (mcm, stream_index); - - if (!s) - return; - - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "leave-stream: %d",.format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 index; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->index = stream_index; - } - - send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ ); - mc_stream_free (s); - s->state = MC_STREAM_STATE_name_known; -} - -void -mc_msg_join_or_leave_request_handler (mc_main_t * mcm, - mc_msg_join_or_leave_request_t * req, - u32 buffer_index) -{ - mc_stream_t *s; - mc_msg_join_reply_t *rep; - u32 bi; - - mc_byte_swap_msg_join_or_leave_request (req); - - s = mc_stream_by_index (mcm, req->stream_index); - if (!s || s->state != MC_STREAM_STATE_ready) - return; - - /* If the peer is joining, create it */ - if (req->is_join) - { - mc_stream_t *this_s; - - /* We're not in a position to catch up a peer until all - stream joins are complete. */ - if (0) - { - /* XXX This is hard to test so we've. */ - vec_foreach (this_s, mcm->stream_vector) - { - if (this_s->state != MC_STREAM_STATE_ready - && this_s->state != MC_STREAM_STATE_name_known) - return; - } - } - else if (mcm->joins_in_progress > 0) - return; - - (void) get_or_create_peer_with_id (mcm, s, req->peer_id, - /* created */ 0); - - rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi); - memset (rep, 0, sizeof (rep[0])); - rep->type = MC_MSG_TYPE_join_reply; - rep->stream_index = req->stream_index; - - mc_byte_swap_msg_join_reply (rep); - /* These two are already in network byte order... */ - rep->peer_id = mcm->transport.our_ack_peer_id; - rep->catchup_peer_id = mcm->transport.our_catchup_peer_id; - - mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); - } - else - { - if (s->config.peer_died) - s->config.peer_died (mcm, s, req->peer_id); - } -} - -void -mc_msg_join_reply_handler (mc_main_t * mcm, - mc_msg_join_reply_t * mp, u32 buffer_index) -{ - mc_stream_t *s; - - mc_byte_swap_msg_join_reply (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - if (!s || s->state != MC_STREAM_STATE_join_in_progress) - return; - - /* Switch to catchup state; next join reply - for this stream will be ignored. */ - s->state = MC_STREAM_STATE_catchup; - - mcm->joins_in_progress--; - mcm->transport.catchup_request_fun (mcm->transport.opaque, - mp->stream_index, mp->catchup_peer_id); -} - -void -mc_wait_for_stream_ready (mc_main_t * m, char *stream_name) -{ - mc_stream_t *s; - - while (1) - { - s = mc_stream_by_name (m, stream_name); - if (s) - break; - vlib_process_suspend (m->vlib_main, .1); - } - - /* It's OK to send a message in catchup and ready states. */ - if (s->state == MC_STREAM_STATE_catchup - || s->state == MC_STREAM_STATE_ready) - return; - - /* Otherwise we are waiting for a join to finish. */ - vlib_current_process_wait_for_one_time_event_vector - (m->vlib_main, &s->procs_waiting_for_join_done); -} - -u32 -mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index) -{ - mc_stream_t *s = mc_stream_by_index (mcm, stream_index); - vlib_main_t *vm = mcm->vlib_main; - mc_retry_t *r; - mc_msg_user_request_t *mp; - vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); - u32 ri; - - if (!s) - return 0; - - if (s->state != MC_STREAM_STATE_ready) - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_join_done); - - while (pool_elts (s->retry_pool) >= s->config.window_size) - { - vlib_current_process_wait_for_one_time_event_vector - (vm, &s->procs_waiting_for_open_window); - } - - pool_get (s->retry_pool, r); - ri = r - s->retry_pool; - - r->prev_index = s->retry_tail_index; - r->next_index = ~0; - s->retry_tail_index = ri; - - if (r->prev_index == ~0) - s->retry_head_index = ri; - else - { - mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index); - p->next_index = ri; - } - - vlib_buffer_advance (b, -sizeof (mp[0])); - mp = vlib_buffer_get_current (b); - - mp->peer_id = mcm->transport.our_ack_peer_id; - /* mp->transport.global_sequence set by relay agent. */ - mp->global_sequence = 0xdeadbeef; - mp->stream_index = s->index; - mp->local_sequence = s->our_local_sequence++; - mp->n_data_bytes = - vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]); - - r->buffer_index = buffer_index; - r->local_sequence = mp->local_sequence; - r->sent_at = vlib_time_now (vm); - r->n_retries = 0; - - /* Retry will be freed when all currently known peers have acked. */ - vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1); - vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap); - - hash_set (s->retry_index_by_local_sequence, r->local_sequence, - r - s->retry_pool); - - elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries); - - mc_byte_swap_msg_user_request (mp); - - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index); - - s->user_requests_sent++; - - /* return amount of window remaining */ - return s->config.window_size - pool_elts (s->retry_pool); -} - -void -mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, - u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_t *s; - mc_stream_peer_t *peer; - i32 seq_cmp_result; - static int once = 0; - - mc_byte_swap_msg_user_request (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - /* Not signed up for this stream? Turf-o-matic */ - if (!s || s->state != MC_STREAM_STATE_ready) - { - vlib_buffer_free_one (vm, buffer_index); - return; - } - - /* Find peer, including ourselves. */ - peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, - /* created */ 0); - - seq_cmp_result = mc_seq_cmp (mp->local_sequence, - peer->last_sequence_received + 1); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d", - .format_args = "T4i4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, stream_index, rx_sequence; - i32 seq_cmp_result; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - ed->stream_index = mp->stream_index; - ed->rx_sequence = mp->local_sequence; - ed->seq_cmp_result = seq_cmp_result; - } - - if (0 && mp->stream_index == 1 && once == 0) - { - once = 1; - ELOG_TYPE (e, "FAKE lost msg on stream 1"); - ELOG (mcm->elog_main, e, 0); - return; - } - - peer->last_sequence_received += seq_cmp_result == 0; - s->user_requests_received++; - - if (seq_cmp_result > 0) - peer->stats.n_msgs_from_future += 1; - - /* Send ack even if msg from future */ - if (1) - { - mc_msg_user_ack_t *rp; - u32 bi; - - rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi); - rp->peer_id = mcm->transport.our_ack_peer_id; - rp->stream_index = s->index; - rp->local_sequence = mp->local_sequence; - rp->seq_cmp_result = seq_cmp_result; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-ack: stream %d local seq %d", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 stream_index; - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->stream_index = rp->stream_index; - ed->local_sequence = rp->local_sequence; - } - - mc_byte_swap_msg_user_ack (rp); - - mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi); - /* Msg from past? If so, free the buffer... */ - if (seq_cmp_result < 0) - { - vlib_buffer_free_one (vm, buffer_index); - peer->stats.n_msgs_from_past += 1; - } - } - - if (seq_cmp_result == 0) - { - vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); - switch (s->state) - { - case MC_STREAM_STATE_ready: - vlib_buffer_advance (b, sizeof (mp[0])); - s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index); - - /* Stream vector can change address via rx callback for mc-internal - stream. */ - s = mc_stream_by_index (mcm, mp->stream_index); - ASSERT (s != 0); - s->last_global_sequence_processed = mp->global_sequence; - break; - - case MC_STREAM_STATE_catchup: - clib_fifo_add1 (s->catchup_fifo, buffer_index); - break; - - default: - clib_warning ("stream in unknown state %U", - format_mc_stream_state, s->state); - break; - } - } -} - -void -mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, - u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - uword *p; - mc_stream_t *s; - mc_stream_peer_t *peer; - mc_retry_t *r; - int peer_created = 0; - - mc_byte_swap_msg_user_ack (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "rx-ack: local seq %d peer %s seq_cmp_result %d", - .format_args = "i4T4i4", - }; - /* *INDENT-ON* */ - - struct - { - u32 local_sequence; - u32 peer; - i32 seq_cmp_result; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - ed->seq_cmp_result = mp->seq_cmp_result; - } - - /* Unknown stream? */ - if (!s) - return; - - /* Find the peer which just ack'ed. */ - peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, - /* created */ &peer_created); - - /* - * Peer reports message from the future. If it's not in the retry - * fifo, look for a retired message. - */ - if (mp->seq_cmp_result > 0) - { - p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - - mp->seq_cmp_result); - if (p == 0) - mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result); - - /* Normal retry should fix it... */ - return; - } - - /* - * Pointer to the indicated retry fifo entry. - * Worth hashing because we could use a window size of 100 or 1000. - */ - p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence); - - /* - * Is this a duplicate ACK, received after we've retired the - * fifo entry. This can happen when learning about new - * peers. - */ - if (p == 0) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: for seq %d from peer %s no fifo elt", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - } - - return; - } - - r = pool_elt_at_index (s->retry_pool, p[0]); - - /* Make sure that this new peer ACKs our msgs from now on */ - if (peer_created) - { - mc_retry_t *later_retry = next_retry (s, r); - - while (later_retry) - { - later_retry->unacked_by_peer_bitmap = - clib_bitmap_ori (later_retry->unacked_by_peer_bitmap, - peer - s->peers); - later_retry = next_retry (s, later_retry); - } - } - - ASSERT (mp->local_sequence == r->local_sequence); - - /* If we weren't expecting to hear from this peer */ - if (!peer_created && - !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers)) - { - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "dup-ack: for seq %d from peer %s", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = r->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - } - if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) - return; - } - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: for seq %d from peer %s", - .format_args = "i4T4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 peer; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = mp->local_sequence; - ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); - } - - r->unacked_by_peer_bitmap = - clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers); - - /* Not all clients have ack'ed */ - if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) - { - return; - } - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "ack: retire fifo elt loc seq %d after %d acks", - .format_args = "i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 seq; - u32 npeers; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->seq = r->local_sequence; - ed->npeers = pool_elts (s->peers); - } - - hash_unset (s->retry_index_by_local_sequence, mp->local_sequence); - mc_retry_free (mcm, s, r); - remove_retry_from_pool (s, r); - maybe_send_window_open_event (vm, s); -} - -#define EVENT_MC_SEND_CATCHUP_DATA 0 - -static uword -mc_catchup_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - uword *event_data = 0; - mc_catchup_process_arg_t *args; - int i; - - while (1) - { - if (event_data) - _vec_len (event_data) = 0; - vlib_process_wait_for_event_with_type (vm, &event_data, - EVENT_MC_SEND_CATCHUP_DATA); - - for (i = 0; i < vec_len (event_data); i++) - { - args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]); - - mcm->transport.catchup_send_fun (mcm->transport.opaque, - args->catchup_opaque, - args->catchup_snapshot); - - /* Send function will free snapshot data vector. */ - pool_put (mcm->catchup_process_args, args); - } - } - - return 0; /* not likely */ -} - -static void -serialize_mc_stream (serialize_main_t * m, va_list * va) -{ - mc_stream_t *s = va_arg (*va, mc_stream_t *); - mc_stream_peer_t *p; - - serialize_integer (m, pool_elts (s->peers), sizeof (u32)); - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - u8 * x = serialize_get (m, sizeof (p->id)); - clib_memcpy (x, p->id.as_u8, sizeof (p->id)); - serialize_integer (m, p->last_sequence_received, - sizeof (p->last_sequence_received)); - })); -/* *INDENT-ON* */ - serialize_bitmap (m, s->all_peer_bitmap); -} - -void -unserialize_mc_stream (serialize_main_t * m, va_list * va) -{ - mc_stream_t *s = va_arg (*va, mc_stream_t *); - u32 i, n_peers; - mc_stream_peer_t *p; - - unserialize_integer (m, &n_peers, sizeof (u32)); - mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); - for (i = 0; i < n_peers; i++) - { - u8 *x; - pool_get (s->peers, p); - x = unserialize_get (m, sizeof (p->id)); - clib_memcpy (p->id.as_u8, x, sizeof (p->id)); - unserialize_integer (m, &p->last_sequence_received, - sizeof (p->last_sequence_received)); - mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ - 0); - } - s->all_peer_bitmap = unserialize_bitmap (m); - - /* This is really bad. */ - if (!s->all_peer_bitmap) - clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name); -} - -void -mc_msg_catchup_request_handler (mc_main_t * mcm, - mc_msg_catchup_request_t * req, - u32 catchup_opaque) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_t *s; - mc_catchup_process_arg_t *args; - - mc_byte_swap_msg_catchup_request (req); - - s = mc_stream_by_index (mcm, req->stream_index); - if (!s || s->state != MC_STREAM_STATE_ready) - return; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup-request: from %s stream %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, stream; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64); - ed->stream = req->stream_index; - } - - /* - * The application has to snapshoot its data structures right - * here, right now. If we process any messages after - * noting the last global sequence we've processed, the client - * won't be able to accurately reconstruct our data structures. - * - * Once the data structures are e.g. vec_dup()'ed, we - * send the resulting messages from a separate process, to - * make sure that we don't cause a bunch of message retransmissions - */ - pool_get (mcm->catchup_process_args, args); - - args->stream_index = s - mcm->stream_vector; - args->catchup_opaque = catchup_opaque; - args->catchup_snapshot = 0; - - /* Construct catchup reply and snapshot state for stream to send as - catchup reply payload. */ - { - mc_msg_catchup_reply_t *rep; - serialize_main_t m; - - vec_resize (args->catchup_snapshot, sizeof (rep[0])); - - rep = (void *) args->catchup_snapshot; - - rep->peer_id = req->peer_id; - rep->stream_index = req->stream_index; - rep->last_global_sequence_included = s->last_global_sequence_processed; - - /* Setup for serialize to append to catchup snapshot. */ - serialize_open_vector (&m, args->catchup_snapshot); - m.stream.current_buffer_index = vec_len (m.stream.buffer); - - serialize (&m, serialize_mc_stream, s); - - args->catchup_snapshot = serialize_close_vector (&m); - - /* Actually copy internal state */ - args->catchup_snapshot = s->config.catchup_snapshot - (mcm, args->catchup_snapshot, rep->last_global_sequence_included); - - rep = (void *) args->catchup_snapshot; - rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]); - - mc_byte_swap_msg_catchup_reply (rep); - } - - /* now go send it... */ - vlib_process_signal_event (vm, mcm->catchup_process, - EVENT_MC_SEND_CATCHUP_DATA, - args - mcm->catchup_process_args); -} - -#define EVENT_MC_UNSERIALIZE_BUFFER 0 -#define EVENT_MC_UNSERIALIZE_CATCHUP 1 - -void -mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, - u32 catchup_opaque) -{ - vlib_process_signal_event (mcm->vlib_main, - mcm->unserialize_process, - EVENT_MC_UNSERIALIZE_CATCHUP, - pointer_to_uword (mp)); -} - -static void -perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp) -{ - mc_stream_t *s; - i32 seq_cmp_result; - - mc_byte_swap_msg_catchup_reply (mp); - - s = mc_stream_by_index (mcm, mp->stream_index); - - /* Never heard of this stream or already caught up. */ - if (!s || s->state == MC_STREAM_STATE_ready) - return; - - { - serialize_main_t m; - mc_stream_peer_t *p; - u32 n_stream_bytes; - - /* For offline sim replay: save the entire catchup snapshot... */ - if (s->config.save_snapshot) - s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, - mp->n_data_bytes); - - unserialize_open_data (&m, mp->data, mp->n_data_bytes); - unserialize (&m, unserialize_mc_stream, s); - - /* Make sure we start numbering our messages as expected */ - /* *INDENT-OFF* */ - pool_foreach (p, s->peers, ({ - if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64) - s->our_local_sequence = p->last_sequence_received + 1; - })); -/* *INDENT-ON* */ - - n_stream_bytes = m.stream.current_buffer_index; - - /* No need to unserialize close; nothing to free. */ - - /* After serialized stream is user's catchup data. */ - s->config.catchup (mcm, mp->data + n_stream_bytes, - mp->n_data_bytes - n_stream_bytes); - } - - /* Vector could have been moved by catchup. - This can only happen for mc-internal stream. */ - s = mc_stream_by_index (mcm, mp->stream_index); - - s->last_global_sequence_processed = mp->last_global_sequence_included; - - while (clib_fifo_elts (s->catchup_fifo)) - { - mc_msg_user_request_t *gp; - u32 bi; - vlib_buffer_t *b; - - clib_fifo_sub1 (s->catchup_fifo, bi); - - b = vlib_get_buffer (mcm->vlib_main, bi); - gp = vlib_buffer_get_current (b); - - /* Make sure we're replaying "new" news */ - seq_cmp_result = mc_seq_cmp (gp->global_sequence, - mp->last_global_sequence_included); - - if (seq_cmp_result > 0) - { - vlib_buffer_advance (b, sizeof (gp[0])); - s->config.rx_buffer (mcm, s, gp->peer_id, bi); - s->last_global_sequence_processed = gp->global_sequence; - - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup replay local sequence 0x%x", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = gp->local_sequence; - } - } - else - { - if (MC_EVENT_LOGGING) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (t) = - { - .format = "catchup discard local sequence 0x%x", - .format_args = "i4", - }; - /* *INDENT-ON* */ - struct - { - u32 local_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, t); - ed->local_sequence = gp->local_sequence; - } - - vlib_buffer_free_one (mcm->vlib_main, bi); - } - } - - s->state = MC_STREAM_STATE_ready; - - /* Now that we are caught up wake up joining process. */ - { - vlib_one_time_waiting_process_t *wp; - vec_foreach (wp, s->procs_waiting_for_join_done) - vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); - if (s->procs_waiting_for_join_done) - _vec_len (s->procs_waiting_for_join_done) = 0; - } -} - -static void -this_node_maybe_master (mc_main_t * mcm) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_msg_master_assert_t *mp; - uword event_type; - int timeouts = 0; - int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER; - clib_error_t *error; - f64 now, time_last_master_assert = -1; - u32 bi; - - while (1) - { - if (!mcm->we_can_be_relay_master) - { - mcm->relay_state = MC_RELAY_STATE_SLAVE; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become slave (config)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - - now = vlib_time_now (vm); - if (now >= time_last_master_assert + 1) - { - time_last_master_assert = now; - mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi); - - mp->peer_id = mcm->transport.our_ack_peer_id; - mp->global_sequence = mcm->relay_global_sequence; - - /* - * these messages clog the event log, set MC_EVENT_LOGGING higher - * if you want them - */ - if (MC_EVENT_LOGGING > 1) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "tx-massert: peer %s global seq %u", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 peer, global_sequence; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); - ed->global_sequence = mp->global_sequence; - } - - mc_byte_swap_msg_master_assert (mp); - - error = - mcm->transport.tx_buffer (mcm->transport.opaque, - MC_TRANSPORT_MASTERSHIP, bi); - if (error) - clib_error_report (error); - } - - vlib_process_wait_for_event_or_clock (vm, 1.0); - event_type = vlib_process_get_events (vm, /* no event data */ 0); - - switch (event_type) - { - case ~0: - if (!is_master && timeouts++ > 2) - { - mcm->relay_state = MC_RELAY_STATE_MASTER; - mcm->relay_master_peer_id = - mcm->transport.our_ack_peer_id.as_u64; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become master (was maybe_master)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - break; - - case MC_RELAY_STATE_SLAVE: - mcm->relay_state = MC_RELAY_STATE_SLAVE; - if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE) - { - ELOG_TYPE (e, "become slave (was maybe_master)"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - } -} - -static void -this_node_slave (mc_main_t * mcm) -{ - vlib_main_t *vm = mcm->vlib_main; - uword event_type; - int timeouts = 0; - - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "become slave"); - ELOG (mcm->elog_main, e, 0); - } - - while (1) - { - vlib_process_wait_for_event_or_clock (vm, 1.0); - event_type = vlib_process_get_events (vm, /* no event data */ 0); - - switch (event_type) - { - case ~0: - if (timeouts++ > 2) - { - mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; - mcm->relay_master_peer_id = ~0ULL; - if (MC_EVENT_LOGGING) - { - ELOG_TYPE (e, "timeouts; negoitate mastership"); - ELOG (mcm->elog_main, e, 0); - } - return; - } - break; - - case MC_RELAY_STATE_SLAVE: - mcm->relay_state = MC_RELAY_STATE_SLAVE; - timeouts = 0; - break; - } - } -} - -static uword -mc_mastership_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - - while (1) - { - switch (mcm->relay_state) - { - case MC_RELAY_STATE_NEGOTIATE: - case MC_RELAY_STATE_MASTER: - this_node_maybe_master (mcm); - break; - - case MC_RELAY_STATE_SLAVE: - this_node_slave (mcm); - break; - } - } - return 0; /* not likely */ -} - -void -mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master) -{ - if (we_can_be_master != mcm->we_can_be_relay_master) - { - mcm->we_can_be_relay_master = we_can_be_master; - vlib_process_signal_event (mcm->vlib_main, - mcm->mastership_process, - MC_RELAY_STATE_NEGOTIATE, 0); - } -} - -void -mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, - u32 buffer_index) -{ - mc_peer_id_t his_peer_id, our_peer_id; - i32 seq_cmp_result; - u8 signal_slave = 0; - u8 update_global_sequence = 0; - - mc_byte_swap_msg_master_assert (mp); - - his_peer_id = mp->peer_id; - our_peer_id = mcm->transport.our_ack_peer_id; - - /* compare the incoming global sequence with ours */ - seq_cmp_result = mc_seq_cmp (mp->global_sequence, - mcm->relay_global_sequence); - - /* If the sender has a lower peer id and the sender's sequence >= - our global sequence, we become a slave. Otherwise we are master. */ - if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 - && seq_cmp_result >= 0) - { - vlib_process_signal_event (mcm->vlib_main, - mcm->mastership_process, - MC_RELAY_STATE_SLAVE, 0); - signal_slave = 1; - } - - /* Update our global sequence. */ - if (seq_cmp_result > 0) - { - mcm->relay_global_sequence = mp->global_sequence; - update_global_sequence = 1; - } - - { - uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id); - mc_mastership_peer_t *p; - - if (q) - p = vec_elt_at_index (mcm->mastership_peers, q[0]); - else - { - vec_add2 (mcm->mastership_peers, p, 1); - p->peer_id = his_peer_id; - mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, - p - mcm->mastership_peers, - /* old_value */ 0); - } - p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main); - } - - /* - * these messages clog the event log, set MC_EVENT_LOGGING higher - * if you want them. - */ - if (MC_EVENT_LOGGING > 1) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "rx-massert: peer %s global seq %u upd %d slave %d", - .format_args = "T4i4i1i1", - }; - /* *INDENT-ON* */ - - struct - { - u32 peer; - u32 global_sequence; - u8 update_sequence; - u8 slave; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64); - ed->global_sequence = mp->global_sequence; - ed->update_sequence = update_global_sequence; - ed->slave = signal_slave; - } -} - -static void -mc_serialize_init (mc_main_t * mcm) -{ - mc_serialize_msg_t *m; - vlib_main_t *vm = vlib_get_main (); - - mcm->global_msg_index_by_name - = hash_create_string ( /* elts */ 0, sizeof (uword)); - - m = vm->mc_msg_registrations; - - while (m) - { - m->global_index = vec_len (mcm->global_msgs); - hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index); - vec_add1 (mcm->global_msgs, m); - m = m->next_registration; - } -} - -clib_error_t * -mc_serialize_va (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, va_list * va) -{ - mc_stream_t *s; - clib_error_t *error; - serialize_main_t *m = &mc->serialize_mains[VLIB_TX]; - vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX]; - u32 bi, n_before, n_after, n_total, n_this_msg; - u32 si, gi; - - if (!sbm->vlib_main) - { - sbm->tx.max_n_data_bytes_per_chain = 4096; - sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX; - } - - if (sbm->first_buffer == 0) - serialize_open_vlib_buffer (m, mc->vlib_main, sbm); - - n_before = serialize_vlib_buffer_n_bytes (m); - - s = mc_stream_by_index (mc, stream_index); - gi = msg->global_index; - ASSERT (msg == vec_elt (mc->global_msgs, gi)); - - si = ~0; - if (gi < vec_len (s->stream_msg_index_by_global_index)) - si = s->stream_msg_index_by_global_index[gi]; - - serialize_likely_small_unsigned_integer (m, si); - - /* For first time message is sent, use name to identify message. */ - if (si == ~0 || MSG_ID_DEBUG) - serialize_cstring (m, msg->name); - - if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "serialize-msg: %s index %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[2]; - } *ed; - ed = ELOG_DATA (mc->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mc, msg->name); - ed->c[1] = si; - } - - error = va_serialize (m, va); - - n_after = serialize_vlib_buffer_n_bytes (m); - n_this_msg = n_after - n_before; - n_total = n_after + sizeof (mc_msg_user_request_t); - - /* For max message size ignore first message where string name is sent. */ - if (si != ~0) - msg->max_n_bytes_serialized = - clib_max (msg->max_n_bytes_serialized, n_this_msg); - - if (!multiple_messages_per_vlib_buffer - || si == ~0 - || n_total + msg->max_n_bytes_serialized > - mc->transport.max_packet_size) - { - bi = serialize_close_vlib_buffer (m); - sbm->first_buffer = 0; - if (!error) - mc_stream_send (mc, stream_index, bi); - else if (bi != ~0) - vlib_buffer_free_one (mc->vlib_main, bi); - } - - return error; -} - -clib_error_t * -mc_serialize_internal (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, ...) -{ - vlib_main_t *vm = mc->vlib_main; - va_list va; - clib_error_t *error; - - if (stream_index == ~0) - { - if (vm->mc_main && vm->mc_stream_index == ~0) - vlib_current_process_wait_for_one_time_event_vector - (vm, &vm->procs_waiting_for_mc_stream_join); - stream_index = vm->mc_stream_index; - } - - va_start (va, msg); - error = mc_serialize_va (mc, stream_index, - multiple_messages_per_vlib_buffer, msg, &va); - va_end (va); - return error; -} - -uword -mc_unserialize_message (mc_main_t * mcm, - mc_stream_t * s, serialize_main_t * m) -{ - mc_serialize_stream_msg_t *sm; - u32 gi, si; - - si = unserialize_likely_small_unsigned_integer (m); - - if (!(si == ~0 || MSG_ID_DEBUG)) - { - sm = vec_elt_at_index (s->stream_msgs, si); - gi = sm->global_index; - } - else - { - char *name; - - unserialize_cstring (m, &name); - - if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "unserialize-msg: %s rx index %d", - .format_args = "T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[2]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - } - - { - uword *p = hash_get_mem (mcm->global_msg_index_by_name, name); - gi = p ? p[0] : ~0; - } - - /* Unknown message? */ - if (gi == ~0) - { - vec_free (name); - goto done; - } - - vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); - si = s->stream_msg_index_by_global_index[gi]; - - /* Stream local index unknown? Create it. */ - if (si == ~0) - { - vec_add2 (s->stream_msgs, sm, 1); - - si = sm - s->stream_msgs; - sm->global_index = gi; - s->stream_msg_index_by_global_index[gi] = si; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "msg-bind: stream %d %s to index %d", - .format_args = "i4T4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[3]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = s->index; - ed->c[1] = elog_id_for_msg_name (mcm, name); - ed->c[2] = si; - } - } - else - { - sm = vec_elt_at_index (s->stream_msgs, si); - if (gi != sm->global_index && MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "msg-id-ERROR: %s index %d expected %d", - .format_args = "T4i4i4", - }; - /* *INDENT-ON* */ - struct - { - u32 c[3]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - ed->c[2] = ~0; - if (sm->global_index < - vec_len (s->stream_msg_index_by_global_index)) - ed->c[2] = - s->stream_msg_index_by_global_index[sm->global_index]; - } - } - - vec_free (name); - } - - if (gi != ~0) - { - mc_serialize_msg_t *msg; - msg = vec_elt (mcm->global_msgs, gi); - unserialize (m, msg->unserialize, mcm); - } - -done: - return gi != ~0; -} - -void -mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - serialize_main_t *m = &mcm->serialize_mains[VLIB_RX]; - vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX]; - mc_stream_and_buffer_t *sb; - mc_stream_t *stream; - u32 buffer_index; - - sb = - pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, - stream_and_buffer_index); - buffer_index = sb->buffer_index; - stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index); - pool_put (mcm->mc_unserialize_stream_and_buffers, sb); - - if (stream->config.save_snapshot) - { - u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index); - static u8 *contents; - vec_reset_length (contents); - vec_validate (contents, n_bytes - 1); - vlib_buffer_contents (vm, buffer_index, contents); - stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, - n_bytes); - } - - ASSERT (vlib_in_process_context (vm)); - - unserialize_open_vlib_buffer (m, vm, sbm); - - clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index); - - while (unserialize_vlib_buffer_n_bytes (m) > 0) - mc_unserialize_message (mcm, stream, m); - - /* Frees buffer. */ - unserialize_close_vlib_buffer (m); -} - -void -mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index) -{ - vlib_main_t *vm = mcm->vlib_main; - mc_stream_and_buffer_t *sb; - pool_get (mcm->mc_unserialize_stream_and_buffers, sb); - sb->stream_index = s->index; - sb->buffer_index = buffer_index; - vlib_process_signal_event (vm, mcm->unserialize_process, - EVENT_MC_UNSERIALIZE_BUFFER, - sb - mcm->mc_unserialize_stream_and_buffers); -} - -static uword -mc_unserialize_process (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * f) -{ - mc_main_t *mcm = mc_node_get_main (node); - uword event_type, *event_data = 0; - int i; - - while (1) - { - if (event_data) - _vec_len (event_data) = 0; - - vlib_process_wait_for_event (vm); - event_type = vlib_process_get_events (vm, &event_data); - switch (event_type) - { - case EVENT_MC_UNSERIALIZE_BUFFER: - for (i = 0; i < vec_len (event_data); i++) - mc_unserialize_internal (mcm, event_data[i]); - break; - - case EVENT_MC_UNSERIALIZE_CATCHUP: - for (i = 0; i < vec_len (event_data); i++) - { - u8 *mp = uword_to_pointer (event_data[i], u8 *); - perform_catchup (mcm, (void *) mp); - vec_free (mp); - } - break; - - default: - break; - } - } - - return 0; /* not likely */ -} - -void -serialize_mc_main (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - mc_stream_t *s; - mc_serialize_stream_msg_t *sm; - mc_serialize_msg_t *msg; - - serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32)); - vec_foreach (s, mcm->stream_vector) - { - /* Stream name. */ - serialize_cstring (m, s->config.name); - - /* Serialize global names for all sent messages. */ - serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32)); - vec_foreach (sm, s->stream_msgs) - { - msg = vec_elt (mcm->global_msgs, sm->global_index); - serialize_cstring (m, msg->name); - } - } -} - -void -unserialize_mc_main (serialize_main_t * m, va_list * va) -{ - mc_main_t *mcm = va_arg (*va, mc_main_t *); - u32 i, n_streams, n_stream_msgs; - char *name; - mc_stream_t *s; - mc_serialize_stream_msg_t *sm; - - unserialize_integer (m, &n_streams, sizeof (u32)); - for (i = 0; i < n_streams; i++) - { - unserialize_cstring (m, &name); - if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name)) - { - vec_validate (mcm->stream_vector, i); - s = vec_elt_at_index (mcm->stream_vector, i); - mc_stream_init (s); - s->index = s - mcm->stream_vector; - s->config.name = name; - s->state = MC_STREAM_STATE_name_known; - hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index); - } - else - vec_free (name); - - s = vec_elt_at_index (mcm->stream_vector, i); - - vec_free (s->stream_msgs); - vec_free (s->stream_msg_index_by_global_index); - - unserialize_integer (m, &n_stream_msgs, sizeof (u32)); - vec_resize (s->stream_msgs, n_stream_msgs); - vec_foreach (sm, s->stream_msgs) - { - uword *p; - u32 si, gi; - - unserialize_cstring (m, &name); - p = hash_get (mcm->global_msg_index_by_name, name); - gi = p ? p[0] : ~0; - si = sm - s->stream_msgs; - - if (MC_EVENT_LOGGING > 0) - { - /* *INDENT-OFF* */ - ELOG_TYPE_DECLARE (e) = - { - .format = "catchup-bind: %s to %d global index %d stream %d", - .format_args = "T4i4i4i4", - }; - /* *INDENT-ON* */ - - struct - { - u32 c[4]; - } *ed; - ed = ELOG_DATA (mcm->elog_main, e); - ed->c[0] = elog_id_for_msg_name (mcm, name); - ed->c[1] = si; - ed->c[2] = gi; - ed->c[3] = s->index; - } - - vec_free (name); - - sm->global_index = gi; - if (gi != ~0) - { - vec_validate_init_empty (s->stream_msg_index_by_global_index, - gi, ~0); - s->stream_msg_index_by_global_index[gi] = si; - } - } - } -} - -void -mc_main_init (mc_main_t * mcm, char *tag) -{ - vlib_main_t *vm = vlib_get_main (); - - mcm->vlib_main = vm; - mcm->elog_main = &vm->elog_main; - - mcm->relay_master_peer_id = ~0ULL; - mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; - - mcm->stream_index_by_name - = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); - - { - vlib_node_registration_t r; - - memset (&r, 0, sizeof (r)); - - r.type = VLIB_NODE_TYPE_PROCESS; - - /* Point runtime data to main instance. */ - r.runtime_data = &mcm; - r.runtime_data_bytes = sizeof (&mcm); - - r.name = (char *) format (0, "mc-mastership-%s", tag); - r.function = mc_mastership_process; - mcm->mastership_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-join-ager-%s", tag); - r.function = mc_join_ager_process; - mcm->join_ager_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-retry-%s", tag); - r.function = mc_retry_process; - mcm->retry_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-catchup-%s", tag); - r.function = mc_catchup_process; - mcm->catchup_process = vlib_register_node (vm, &r); - - r.name = (char *) format (0, "mc-unserialize-%s", tag); - r.function = mc_unserialize_process; - mcm->unserialize_process = vlib_register_node (vm, &r); - } - - if (MC_EVENT_LOGGING > 0) - mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), - sizeof (mc_peer_id_t)); - - mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), - sizeof (mc_peer_id_t)); - mc_serialize_init (mcm); -} - -static u8 * -format_mc_relay_state (u8 * s, va_list * args) -{ - mc_relay_state_t state = va_arg (*args, mc_relay_state_t); - char *t = 0; - switch (state) - { - case MC_RELAY_STATE_NEGOTIATE: - t = "negotiate"; - break; - case MC_RELAY_STATE_MASTER: - t = "master"; - break; - case MC_RELAY_STATE_SLAVE: - t = "slave"; - break; - default: - return format (s, "unknown 0x%x", state); - } - - return format (s, "%s", t); -} - -static u8 * -format_mc_stream_state (u8 * s, va_list * args) -{ - mc_stream_state_t state = va_arg (*args, mc_stream_state_t); - char *t = 0; - switch (state) - { -#define _(f) case MC_STREAM_STATE_##f: t = #f; break; - foreach_mc_stream_state -#undef _ - default: - return format (s, "unknown 0x%x", state); - } - - return format (s, "%s", t); -} - -static int -mc_peer_comp (void *a1, void *a2) -{ - mc_stream_peer_t *p1 = a1; - mc_stream_peer_t *p2 = a2; - - return mc_peer_id_compare (p1->id, p2->id); -} - -u8 * -format_mc_main (u8 * s, va_list * args) -{ - mc_main_t *mcm = va_arg (*args, mc_main_t *); - mc_stream_t *t; - mc_stream_peer_t *p, *ps; - u32 indent = format_get_indent (s); - - s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", - format_mc_relay_state, mcm->relay_state, - vec_len (mcm->stream_vector), mcm->relay_global_sequence); - - { - mc_mastership_peer_t *mp; - f64 now = vlib_time_now (mcm->vlib_main); - s = format (s, "\n%UMost recent mastership peers:", - format_white_space, indent + 2); - vec_foreach (mp, mcm->mastership_peers) - { - s = format (s, "\n%U%-30U%.4e", - format_white_space, indent + 4, - mcm->transport.format_peer_id, mp->peer_id, - now - mp->time_last_master_assert_received); - } - } - - vec_foreach (t, mcm->stream_vector) - { - s = format (s, "\n%Ustream `%s' index %d", - format_white_space, indent + 2, t->config.name, t->index); - - s = format (s, "\n%Ustate %U", - format_white_space, indent + 4, - format_mc_stream_state, t->state); - - s = - format (s, - "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent", - format_white_space, indent + 4, t->config.retry_interval, - t->config.retry_limit, pool_elts (t->retry_pool), - t->stats.n_retries - t->stats_last_clear.n_retries); - - s = format (s, "\n%U%Ld/%Ld user requests sent/received", - format_white_space, indent + 4, - t->user_requests_sent, t->user_requests_received); - - s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x", - format_white_space, indent + 4, - pool_elts (t->peers), - t->our_local_sequence, t->last_global_sequence_processed); - - ps = 0; - /* *INDENT-OFF* */ - pool_foreach (p, t->peers, - ({ - if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers)) - vec_add1 (ps, p[0]); - })); - /* *INDENT-ON* */ - vec_sort_with_function (ps, mc_peer_comp); - s = format (s, "\n%U%=30s%10s%16s%16s", - format_white_space, indent + 6, - "Peer", "Last seq", "Retries", "Future"); - - vec_foreach (p, ps) - { - s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s", - format_white_space, indent + 6, - mcm->transport.format_peer_id, p->id.as_u64, - p->last_sequence_received, - p->stats.n_msgs_from_past - - p->stats_last_clear.n_msgs_from_past, - p->stats.n_msgs_from_future - - p->stats_last_clear.n_msgs_from_future, - (mcm->transport.our_ack_peer_id.as_u64 == - p->id.as_u64 ? " (self)" : "")); - } - vec_free (ps); - } - - return s; -} - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ |