diff options
author | Damjan Marion <damarion@cisco.com> | 2016-12-19 23:05:39 +0100 |
---|---|---|
committer | Damjan Marion <damarion@cisco.com> | 2016-12-28 12:25:14 +0100 |
commit | 7cd468a3d7dee7d6c92f69a0bb7061ae208ec727 (patch) | |
tree | 5de62f8dbd3a752f5a676ca600e43d2652d1ff1a /src/vlib/mc.c | |
parent | 696f1adec0df3b8f161862566dd9c86174302658 (diff) |
Reorganize source tree to use single autotools instance
Change-Id: I7b51f88292e057c6443b12224486f2d0c9f8ae23
Signed-off-by: Damjan Marion <damarion@cisco.com>
Diffstat (limited to 'src/vlib/mc.c')
-rw-r--r-- | src/vlib/mc.c | 2609 |
1 files changed, 2609 insertions, 0 deletions
diff --git a/src/vlib/mc.c b/src/vlib/mc.c new file mode 100644 index 00000000000..8fde091389e --- /dev/null +++ b/src/vlib/mc.c @@ -0,0 +1,2609 @@ +/* + * mc.c: vlib reliable sequenced multicast distributed applications + * + * Copyright (c) 2010 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vlib/vlib.h> + +/* + * 1 to enable msg id training wheels, which are useful for tracking + * down catchup and/or partitioned network problems + */ +#define MSG_ID_DEBUG 0 + +static format_function_t format_mc_stream_state; + +static u32 +elog_id_for_peer_id (mc_main_t * m, u64 peer_id) +{ + uword *p, r; + mhash_t *h = &m->elog_id_by_peer_id; + + if (!m->elog_id_by_peer_id.hash) + mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t)); + + p = mhash_get (h, &peer_id); + if (p) + return p[0]; + r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id); + mhash_set (h, &peer_id, r, /* old_value */ 0); + return r; +} + +static u32 +elog_id_for_msg_name (mc_main_t * m, char *msg_name) +{ + uword *p, r; + uword *h = m->elog_id_by_msg_name; + u8 *name_copy; + + if (!h) + h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword)); + + p = hash_get_mem (h, msg_name); + if (p) + return p[0]; + r = elog_string (m->elog_main, "%s", msg_name); + + name_copy = format (0, "%s%c", msg_name, 0); + + hash_set_mem (h, name_copy, r); + m->elog_id_by_msg_name = h; + + return r; +} + +static void +elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, + u32 retry_count) +{ + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "tx-msg: stream %d local seq %d attempt %d", + .format_args = "i4i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 stream_id, local_sequence, retry_count; + } *ed; + ed = ELOG_DATA (m->elog_main, e); + ed->stream_id = stream_id; + ed->local_sequence = local_sequence; + ed->retry_count = retry_count; + } +} + +/* + * seq_cmp + * correctly compare two unsigned sequence numbers. + * This function works so long as x and y are within 2**(n-1) of each + * other, where n = bits(x, y). + * + * Magic decoder ring: + * seq_cmp == 0 => x and y are equal + * seq_cmp < 0 => x is "in the past" with respect to y + * seq_cmp > 0 => x is "in the future" with respect to y + */ +always_inline i32 +mc_seq_cmp (u32 x, u32 y) +{ + return (i32) x - (i32) y; +} + +void * +mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return) +{ + u32 n_alloc, bi; + vlib_buffer_t *b; + + n_alloc = vlib_buffer_alloc (vm, &bi, 1); + ASSERT (n_alloc == 1); + + b = vlib_get_buffer (vm, bi); + b->current_length = n_bytes; + *bi_return = bi; + return (void *) b->data; +} + +static void +delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s, + uword index, int notify_application) +{ + mc_stream_peer_t *p = pool_elt_at_index (s->peers, index); + ASSERT (p != 0); + if (s->config.peer_died && notify_application) + s->config.peer_died (mcm, s, p->id); + + s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers); + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "delete peer %s from all_peer_bitmap", + .format_args = "T4", + }; + /* *INDENT-ON* */ + struct + { + u32 peer; + } *ed = 0; + + ed = ELOG_DATA (mcm->elog_main, e); + ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); + } + /* Do not delete the pool / hash table entries, or we lose sequence number state */ +} + +static mc_stream_peer_t * +get_or_create_peer_with_id (mc_main_t * mcm, + mc_stream_t * s, mc_peer_id_t id, int *created) +{ + uword *q = mhash_get (&s->peer_index_by_id, &id); + mc_stream_peer_t *p; + + if (q) + { + p = pool_elt_at_index (s->peers, q[0]); + goto done; + } + + pool_get (s->peers, p); + memset (p, 0, sizeof (p[0])); + p->id = id; + p->last_sequence_received = ~0; + mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0); + if (created) + *created = 1; + +done: + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "get_or_create %s peer %s stream %d seq %d", + .format_args = "t4T4i4i4", + .n_enum_strings = 2, + .enum_strings = { + "old", "new", + }, + }; + /* *INDENT-ON* */ + struct + { + u32 is_new, peer, stream_index, rx_sequence; + } *ed = 0; + + ed = ELOG_DATA (mcm->elog_main, e); + ed->is_new = q ? 0 : 1; + ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); + ed->stream_index = s->index; + ed->rx_sequence = p->last_sequence_received; + } + /* $$$$ Enable or reenable this peer */ + s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers); + return p; +} + +static void +maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream) +{ + vlib_one_time_waiting_process_t *p; + + if (pool_elts (stream->retry_pool) >= stream->config.window_size) + return; + + vec_foreach (p, stream->procs_waiting_for_open_window) + vlib_signal_one_time_waiting_process (vm, p); + + if (stream->procs_waiting_for_open_window) + _vec_len (stream->procs_waiting_for_open_window) = 0; +} + +static void +mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r) +{ + mc_retry_t record, *retp; + + if (r->unacked_by_peer_bitmap) + _vec_len (r->unacked_by_peer_bitmap) = 0; + + if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) + { + clib_fifo_sub1 (s->retired_fifo, record); + vlib_buffer_free_one (mcm->vlib_main, record.buffer_index); + } + + clib_fifo_add2 (s->retired_fifo, retp); + + retp->buffer_index = r->buffer_index; + retp->local_sequence = r->local_sequence; + + r->buffer_index = ~0; /* poison buffer index in this retry */ +} + +static void +mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence) +{ + mc_retry_t *retry; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "resend-retired: search for local seq %d", + .format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 local_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->local_sequence = local_sequence; + } + + /* *INDENT-OFF* */ + clib_fifo_foreach (retry, s->retired_fifo, + ({ + if (retry->local_sequence == local_sequence) + { + elog_tx_msg (mcm, s->index, retry-> local_sequence, -13); + mcm->transport.tx_buffer (mcm->transport.opaque, + MC_TRANSPORT_USER_REQUEST_TO_RELAY, + retry->buffer_index); + return; + } + })); + /* *INDENT-ON* */ + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "resend-retired: FAILED search for local seq %d", + .format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 local_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->local_sequence = local_sequence; + } +} + +static uword * +delete_retry_fifo_elt (mc_main_t * mcm, + mc_stream_t * stream, + mc_retry_t * r, uword * dead_peer_bitmap) +{ + mc_stream_peer_t *p; + + /* *INDENT-OFF* */ + pool_foreach (p, stream->peers, ({ + uword pi = p - stream->peers; + uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi); + + if (! is_alive) + dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi); + + if (MC_EVENT_LOGGING > 0) + { + ELOG_TYPE_DECLARE (e) = { + .format = "delete_retry_fifo_elt: peer %s is %s", + .format_args = "T4t4", + .n_enum_strings = 2, + .enum_strings = { "alive", "dead", }, + }; + struct { u32 peer, is_alive; } * ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); + ed->is_alive = is_alive; + } + })); + /* *INDENT-ON* */ + + hash_unset (stream->retry_index_by_local_sequence, r->local_sequence); + mc_retry_free (mcm, stream, r); + + return dead_peer_bitmap; +} + +always_inline mc_retry_t * +prev_retry (mc_stream_t * s, mc_retry_t * r) +{ + return (r->prev_index != ~0 + ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0); +} + +always_inline mc_retry_t * +next_retry (mc_stream_t * s, mc_retry_t * r) +{ + return (r->next_index != ~0 + ? pool_elt_at_index (s->retry_pool, r->next_index) : 0); +} + +always_inline void +remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r) +{ + mc_retry_t *p = prev_retry (s, r); + mc_retry_t *n = next_retry (s, r); + + if (p) + p->next_index = r->next_index; + else + s->retry_head_index = r->next_index; + if (n) + n->prev_index = r->prev_index; + else + s->retry_tail_index = r->prev_index; + + pool_put_index (s->retry_pool, r - s->retry_pool); +} + +static void +check_retry (mc_main_t * mcm, mc_stream_t * s) +{ + mc_retry_t *r; + vlib_main_t *vm = mcm->vlib_main; + f64 now = vlib_time_now (vm); + uword *dead_peer_bitmap = 0; + u32 ri, ri_next; + + for (ri = s->retry_head_index; ri != ~0; ri = ri_next) + { + r = pool_elt_at_index (s->retry_pool, ri); + ri_next = r->next_index; + + if (now < r->sent_at + s->config.retry_interval) + continue; + + r->n_retries += 1; + if (r->n_retries > s->config.retry_limit) + { + dead_peer_bitmap = + delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap); + remove_retry_from_pool (s, r); + } + else + { + if (MC_EVENT_LOGGING > 0) + { + mc_stream_peer_t *p; + + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "resend local seq %d attempt %d", + .format_args = "i4i4", + }; + /* *INDENT-ON* */ + + /* *INDENT-OFF* */ + pool_foreach (p, s->peers, ({ + if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers)) + { + ELOG_TYPE_DECLARE (ev) = { + .format = "resend: needed by peer %s local seq %d", + .format_args = "T4i4", + }; + struct { u32 peer, rx_sequence; } * ed; + ed = ELOG_DATA (mcm->elog_main, ev); + ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); + ed->rx_sequence = r->local_sequence; + } + })); + /* *INDENT-ON* */ + + struct + { + u32 sequence; + u32 trail; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->sequence = r->local_sequence; + ed->trail = r->n_retries; + } + + r->sent_at = vlib_time_now (vm); + s->stats.n_retries += 1; + + elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries); + + mcm->transport.tx_buffer + (mcm->transport.opaque, + MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index); + } + } + + maybe_send_window_open_event (mcm->vlib_main, s); + + /* Delete any dead peers we've found. */ + if (!clib_bitmap_is_zero (dead_peer_bitmap)) + { + uword i; + + /* *INDENT-OFF* */ + clib_bitmap_foreach (i, dead_peer_bitmap, ({ + delete_peer_with_index (mcm, s, i, /* notify_application */ 1); + + /* Delete any references to just deleted peer in retry pool. */ + pool_foreach (r, s->retry_pool, ({ + r->unacked_by_peer_bitmap = + clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i); + })); + })); +/* *INDENT-ON* */ + clib_bitmap_free (dead_peer_bitmap); + } +} + +always_inline mc_main_t * +mc_node_get_main (vlib_node_runtime_t * node) +{ + mc_main_t **p = (void *) node->runtime_data; + return p[0]; +} + +static uword +mc_retry_process (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * f) +{ + mc_main_t *mcm = mc_node_get_main (node); + mc_stream_t *s; + + while (1) + { + vlib_process_suspend (vm, 1.0); + vec_foreach (s, mcm->stream_vector) + { + if (s->state != MC_STREAM_STATE_invalid) + check_retry (mcm, s); + } + } + return 0; /* not likely */ +} + +static void +send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join) +{ + vlib_main_t *vm = mcm->vlib_main; + mc_msg_join_or_leave_request_t *mp; + u32 bi; + + mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); + memset (mp, 0, sizeof (*mp)); + mp->type = MC_MSG_TYPE_join_or_leave_request; + mp->peer_id = mcm->transport.our_ack_peer_id; + mp->stream_index = stream_index; + mp->is_join = is_join; + + mc_byte_swap_msg_join_or_leave_request (mp); + + /* + * These msgs are unnumbered, unordered so send on the from-relay + * channel. + */ + mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); +} + +static uword +mc_join_ager_process (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * f) +{ + mc_main_t *mcm = mc_node_get_main (node); + + while (1) + { + if (mcm->joins_in_progress) + { + mc_stream_t *s; + vlib_one_time_waiting_process_t *p; + f64 now = vlib_time_now (vm); + + vec_foreach (s, mcm->stream_vector) + { + if (s->state != MC_STREAM_STATE_join_in_progress) + continue; + + if (now > s->join_timeout) + { + s->state = MC_STREAM_STATE_ready; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "stream %d join timeout", + }; + /* *INDENT-ON* */ + ELOG (mcm->elog_main, e, s->index); + } + /* Make sure that this app instance exists as a stream peer, + or we may answer a catchup request with a NULL + all_peer_bitmap... */ + (void) get_or_create_peer_with_id + (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0); + + vec_foreach (p, s->procs_waiting_for_join_done) + vlib_signal_one_time_waiting_process (vm, p); + if (s->procs_waiting_for_join_done) + _vec_len (s->procs_waiting_for_join_done) = 0; + + mcm->joins_in_progress--; + ASSERT (mcm->joins_in_progress >= 0); + } + else + { + /* Resent join request which may have been lost. */ + send_join_or_leave_request (mcm, s->index, 1 /* is_join */ ); + + /* We're *not* alone, retry for as long as it takes */ + if (mcm->relay_state == MC_RELAY_STATE_SLAVE) + s->join_timeout = vlib_time_now (vm) + 2.0; + + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "stream %d resend join request", + }; + /* *INDENT-ON* */ + ELOG (mcm->elog_main, e, s->index); + } + } + } + } + + vlib_process_suspend (vm, .5); + } + + return 0; /* not likely */ +} + +static void +serialize_mc_register_stream_name (serialize_main_t * m, va_list * va) +{ + char *name = va_arg (*va, char *); + serialize_cstring (m, name); +} + +static void +elog_stream_name (char *buf, int n_buf_bytes, char *v) +{ + clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v))); + buf[n_buf_bytes - 1] = 0; +} + +static void +unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va) +{ + mc_main_t *mcm = va_arg (*va, mc_main_t *); + char *name; + mc_stream_t *s; + uword *p; + + unserialize_cstring (m, &name); + + if ((p = hash_get_mem (mcm->stream_index_by_name, name))) + { + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "stream index %d already named %s", + .format_args = "i4s16", + }; + /* *INDENT-ON* */ + struct + { + u32 stream_index; + char name[16]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->stream_index = p[0]; + elog_stream_name (ed->name, sizeof (ed->name), name); + } + + vec_free (name); + return; + } + + vec_add2 (mcm->stream_vector, s, 1); + mc_stream_init (s); + s->state = MC_STREAM_STATE_name_known; + s->index = s - mcm->stream_vector; + s->config.name = name; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "stream index %d named %s", + .format_args = "i4s16", + }; + /* *INDENT-ON* */ + struct + { + u32 stream_index; + char name[16]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->stream_index = s->index; + elog_stream_name (ed->name, sizeof (ed->name), name); + } + + hash_set_mem (mcm->stream_index_by_name, name, s->index); + + p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name); + if (p) + { + vlib_one_time_waiting_process_t *wp, **w; + w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); + vec_foreach (wp, w[0]) + vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); + pool_put (mcm->procs_waiting_for_stream_name_pool, w); + hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name); + } +} + +/* *INDENT-OFF* */ +MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = +{ + .name = "mc_register_stream_name", + .serialize = serialize_mc_register_stream_name, + .unserialize = unserialize_mc_register_stream_name, +}; +/* *INDENT-ON* */ + +void +mc_rx_buffer_unserialize (mc_main_t * mcm, + mc_stream_t * stream, + mc_peer_id_t peer_id, u32 buffer_index) +{ + return mc_unserialize (mcm, stream, buffer_index); +} + +static u8 * +mc_internal_catchup_snapshot (mc_main_t * mcm, + u8 * data_vector, + u32 last_global_sequence_processed) +{ + serialize_main_t m; + + /* Append serialized data to data vector. */ + serialize_open_vector (&m, data_vector); + m.stream.current_buffer_index = vec_len (data_vector); + + serialize (&m, serialize_mc_main, mcm); + return serialize_close_vector (&m); +} + +static void +mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes) +{ + serialize_main_t s; + + unserialize_open_data (&s, data, n_data_bytes); + + unserialize (&s, unserialize_mc_main, mcm); +} + +/* Overridden from the application layer, not actually used here */ +void mc_stream_join_process_hold (void) __attribute__ ((weak)); +void +mc_stream_join_process_hold (void) +{ +} + +static u32 +mc_stream_join_helper (mc_main_t * mcm, + mc_stream_config_t * config, u32 is_internal) +{ + mc_stream_t *s; + vlib_main_t *vm = mcm->vlib_main; + + s = 0; + if (!is_internal) + { + uword *p; + + /* Already have a stream with given name? */ + if ((s = mc_stream_by_name (mcm, config->name))) + { + /* Already joined and ready? */ + if (s->state == MC_STREAM_STATE_ready) + return s->index; + } + + /* First join MC internal stream. */ + if (!mcm->stream_vector + || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state + == MC_STREAM_STATE_invalid)) + { + static mc_stream_config_t c = { + .name = "mc-internal", + .rx_buffer = mc_rx_buffer_unserialize, + .catchup = mc_internal_catchup, + .catchup_snapshot = mc_internal_catchup_snapshot, + }; + + c.save_snapshot = config->save_snapshot; + + mc_stream_join_helper (mcm, &c, /* is_internal */ 1); + } + + /* If stream is still unknown register this name and wait for + sequenced message to name stream. This way all peers agree + on stream name to index mappings. */ + s = mc_stream_by_name (mcm, config->name); + if (!s) + { + vlib_one_time_waiting_process_t *wp, **w; + u8 *name_copy = format (0, "%s", config->name); + + mc_serialize_stream (mcm, + MC_STREAM_INDEX_INTERNAL, + &mc_register_stream_name_msg, config->name); + + /* Wait for this stream to be named. */ + p = + hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, + name_copy); + if (p) + w = + pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, + p[0]); + else + { + pool_get (mcm->procs_waiting_for_stream_name_pool, w); + if (!mcm->procs_waiting_for_stream_name_by_name) + mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */ + sizeof + (uword)); + hash_set_mem (mcm->procs_waiting_for_stream_name_by_name, + name_copy, + w - mcm->procs_waiting_for_stream_name_pool); + w[0] = 0; + } + + vec_add2 (w[0], wp, 1); + vlib_current_process_wait_for_one_time_event (vm, wp); + vec_free (name_copy); + } + + /* Name should be known now. */ + s = mc_stream_by_name (mcm, config->name); + ASSERT (s != 0); + ASSERT (s->state == MC_STREAM_STATE_name_known); + } + + if (!s) + { + vec_add2 (mcm->stream_vector, s, 1); + mc_stream_init (s); + s->index = s - mcm->stream_vector; + } + + { + /* Save name since we could have already used it as hash key. */ + char *name_save = s->config.name; + + s->config = config[0]; + + if (name_save) + s->config.name = name_save; + } + + if (s->config.window_size == 0) + s->config.window_size = 8; + + if (s->config.retry_interval == 0.0) + s->config.retry_interval = 1.0; + + /* Sanity. */ + ASSERT (s->config.retry_interval < 30); + + if (s->config.retry_limit == 0) + s->config.retry_limit = 7; + + s->state = MC_STREAM_STATE_join_in_progress; + if (!s->peer_index_by_id.hash) + mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); + + /* If we don't hear from someone in 5 seconds, we're alone */ + s->join_timeout = vlib_time_now (vm) + 5.0; + mcm->joins_in_progress++; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "stream index %d join request %s", + .format_args = "i4s16", + }; + /* *INDENT-ON* */ + struct + { + u32 stream_index; + char name[16]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->stream_index = s->index; + elog_stream_name (ed->name, sizeof (ed->name), s->config.name); + } + + send_join_or_leave_request (mcm, s->index, 1 /* join */ ); + + vlib_current_process_wait_for_one_time_event_vector + (vm, &s->procs_waiting_for_join_done); + + if (MC_EVENT_LOGGING) + { + ELOG_TYPE (e, "join complete stream %d"); + ELOG (mcm->elog_main, e, s->index); + } + + return s->index; +} + +u32 +mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config) +{ + return mc_stream_join_helper (mcm, config, /* is_internal */ 0); +} + +void +mc_stream_leave (mc_main_t * mcm, u32 stream_index) +{ + mc_stream_t *s = mc_stream_by_index (mcm, stream_index); + + if (!s) + return; + + if (MC_EVENT_LOGGING) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "leave-stream: %d",.format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 index; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->index = stream_index; + } + + send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ ); + mc_stream_free (s); + s->state = MC_STREAM_STATE_name_known; +} + +void +mc_msg_join_or_leave_request_handler (mc_main_t * mcm, + mc_msg_join_or_leave_request_t * req, + u32 buffer_index) +{ + mc_stream_t *s; + mc_msg_join_reply_t *rep; + u32 bi; + + mc_byte_swap_msg_join_or_leave_request (req); + + s = mc_stream_by_index (mcm, req->stream_index); + if (!s || s->state != MC_STREAM_STATE_ready) + return; + + /* If the peer is joining, create it */ + if (req->is_join) + { + mc_stream_t *this_s; + + /* We're not in a position to catch up a peer until all + stream joins are complete. */ + if (0) + { + /* XXX This is hard to test so we've. */ + vec_foreach (this_s, mcm->stream_vector) + { + if (this_s->state != MC_STREAM_STATE_ready + && this_s->state != MC_STREAM_STATE_name_known) + return; + } + } + else if (mcm->joins_in_progress > 0) + return; + + (void) get_or_create_peer_with_id (mcm, s, req->peer_id, + /* created */ 0); + + rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi); + memset (rep, 0, sizeof (rep[0])); + rep->type = MC_MSG_TYPE_join_reply; + rep->stream_index = req->stream_index; + + mc_byte_swap_msg_join_reply (rep); + /* These two are already in network byte order... */ + rep->peer_id = mcm->transport.our_ack_peer_id; + rep->catchup_peer_id = mcm->transport.our_catchup_peer_id; + + mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); + } + else + { + if (s->config.peer_died) + s->config.peer_died (mcm, s, req->peer_id); + } +} + +void +mc_msg_join_reply_handler (mc_main_t * mcm, + mc_msg_join_reply_t * mp, u32 buffer_index) +{ + mc_stream_t *s; + + mc_byte_swap_msg_join_reply (mp); + + s = mc_stream_by_index (mcm, mp->stream_index); + + if (!s || s->state != MC_STREAM_STATE_join_in_progress) + return; + + /* Switch to catchup state; next join reply + for this stream will be ignored. */ + s->state = MC_STREAM_STATE_catchup; + + mcm->joins_in_progress--; + mcm->transport.catchup_request_fun (mcm->transport.opaque, + mp->stream_index, mp->catchup_peer_id); +} + +void +mc_wait_for_stream_ready (mc_main_t * m, char *stream_name) +{ + mc_stream_t *s; + + while (1) + { + s = mc_stream_by_name (m, stream_name); + if (s) + break; + vlib_process_suspend (m->vlib_main, .1); + } + + /* It's OK to send a message in catchup and ready states. */ + if (s->state == MC_STREAM_STATE_catchup + || s->state == MC_STREAM_STATE_ready) + return; + + /* Otherwise we are waiting for a join to finish. */ + vlib_current_process_wait_for_one_time_event_vector + (m->vlib_main, &s->procs_waiting_for_join_done); +} + +u32 +mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index) +{ + mc_stream_t *s = mc_stream_by_index (mcm, stream_index); + vlib_main_t *vm = mcm->vlib_main; + mc_retry_t *r; + mc_msg_user_request_t *mp; + vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); + u32 ri; + + if (!s) + return 0; + + if (s->state != MC_STREAM_STATE_ready) + vlib_current_process_wait_for_one_time_event_vector + (vm, &s->procs_waiting_for_join_done); + + while (pool_elts (s->retry_pool) >= s->config.window_size) + { + vlib_current_process_wait_for_one_time_event_vector + (vm, &s->procs_waiting_for_open_window); + } + + pool_get (s->retry_pool, r); + ri = r - s->retry_pool; + + r->prev_index = s->retry_tail_index; + r->next_index = ~0; + s->retry_tail_index = ri; + + if (r->prev_index == ~0) + s->retry_head_index = ri; + else + { + mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index); + p->next_index = ri; + } + + vlib_buffer_advance (b, -sizeof (mp[0])); + mp = vlib_buffer_get_current (b); + + mp->peer_id = mcm->transport.our_ack_peer_id; + /* mp->transport.global_sequence set by relay agent. */ + mp->global_sequence = 0xdeadbeef; + mp->stream_index = s->index; + mp->local_sequence = s->our_local_sequence++; + mp->n_data_bytes = + vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]); + + r->buffer_index = buffer_index; + r->local_sequence = mp->local_sequence; + r->sent_at = vlib_time_now (vm); + r->n_retries = 0; + + /* Retry will be freed when all currently known peers have acked. */ + vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1); + vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap); + + hash_set (s->retry_index_by_local_sequence, r->local_sequence, + r - s->retry_pool); + + elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries); + + mc_byte_swap_msg_user_request (mp); + + mcm->transport.tx_buffer (mcm->transport.opaque, + MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index); + + s->user_requests_sent++; + + /* return amount of window remaining */ + return s->config.window_size - pool_elts (s->retry_pool); +} + +void +mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, + u32 buffer_index) +{ + vlib_main_t *vm = mcm->vlib_main; + mc_stream_t *s; + mc_stream_peer_t *peer; + i32 seq_cmp_result; + static int once = 0; + + mc_byte_swap_msg_user_request (mp); + + s = mc_stream_by_index (mcm, mp->stream_index); + + /* Not signed up for this stream? Turf-o-matic */ + if (!s || s->state != MC_STREAM_STATE_ready) + { + vlib_buffer_free_one (vm, buffer_index); + return; + } + + /* Find peer, including ourselves. */ + peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, + /* created */ 0); + + seq_cmp_result = mc_seq_cmp (mp->local_sequence, + peer->last_sequence_received + 1); + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d", + .format_args = "T4i4i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 peer, stream_index, rx_sequence; + i32 seq_cmp_result; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); + ed->stream_index = mp->stream_index; + ed->rx_sequence = mp->local_sequence; + ed->seq_cmp_result = seq_cmp_result; + } + + if (0 && mp->stream_index == 1 && once == 0) + { + once = 1; + ELOG_TYPE (e, "FAKE lost msg on stream 1"); + ELOG (mcm->elog_main, e, 0); + return; + } + + peer->last_sequence_received += seq_cmp_result == 0; + s->user_requests_received++; + + if (seq_cmp_result > 0) + peer->stats.n_msgs_from_future += 1; + + /* Send ack even if msg from future */ + if (1) + { + mc_msg_user_ack_t *rp; + u32 bi; + + rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi); + rp->peer_id = mcm->transport.our_ack_peer_id; + rp->stream_index = s->index; + rp->local_sequence = mp->local_sequence; + rp->seq_cmp_result = seq_cmp_result; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "tx-ack: stream %d local seq %d", + .format_args = "i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 stream_index; + u32 local_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->stream_index = rp->stream_index; + ed->local_sequence = rp->local_sequence; + } + + mc_byte_swap_msg_user_ack (rp); + + mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi); + /* Msg from past? If so, free the buffer... */ + if (seq_cmp_result < 0) + { + vlib_buffer_free_one (vm, buffer_index); + peer->stats.n_msgs_from_past += 1; + } + } + + if (seq_cmp_result == 0) + { + vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); + switch (s->state) + { + case MC_STREAM_STATE_ready: + vlib_buffer_advance (b, sizeof (mp[0])); + s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index); + + /* Stream vector can change address via rx callback for mc-internal + stream. */ + s = mc_stream_by_index (mcm, mp->stream_index); + ASSERT (s != 0); + s->last_global_sequence_processed = mp->global_sequence; + break; + + case MC_STREAM_STATE_catchup: + clib_fifo_add1 (s->catchup_fifo, buffer_index); + break; + + default: + clib_warning ("stream in unknown state %U", + format_mc_stream_state, s->state); + break; + } + } +} + +void +mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, + u32 buffer_index) +{ + vlib_main_t *vm = mcm->vlib_main; + uword *p; + mc_stream_t *s; + mc_stream_peer_t *peer; + mc_retry_t *r; + int peer_created = 0; + + mc_byte_swap_msg_user_ack (mp); + + s = mc_stream_by_index (mcm, mp->stream_index); + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "rx-ack: local seq %d peer %s seq_cmp_result %d", + .format_args = "i4T4i4", + }; + /* *INDENT-ON* */ + + struct + { + u32 local_sequence; + u32 peer; + i32 seq_cmp_result; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->local_sequence = mp->local_sequence; + ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); + ed->seq_cmp_result = mp->seq_cmp_result; + } + + /* Unknown stream? */ + if (!s) + return; + + /* Find the peer which just ack'ed. */ + peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, + /* created */ &peer_created); + + /* + * Peer reports message from the future. If it's not in the retry + * fifo, look for a retired message. + */ + if (mp->seq_cmp_result > 0) + { + p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - + mp->seq_cmp_result); + if (p == 0) + mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result); + + /* Normal retry should fix it... */ + return; + } + + /* + * Pointer to the indicated retry fifo entry. + * Worth hashing because we could use a window size of 100 or 1000. + */ + p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence); + + /* + * Is this a duplicate ACK, received after we've retired the + * fifo entry. This can happen when learning about new + * peers. + */ + if (p == 0) + { + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "ack: for seq %d from peer %s no fifo elt", + .format_args = "i4T4", + }; + /* *INDENT-ON* */ + + struct + { + u32 seq; + u32 peer; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->seq = mp->local_sequence; + ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); + } + + return; + } + + r = pool_elt_at_index (s->retry_pool, p[0]); + + /* Make sure that this new peer ACKs our msgs from now on */ + if (peer_created) + { + mc_retry_t *later_retry = next_retry (s, r); + + while (later_retry) + { + later_retry->unacked_by_peer_bitmap = + clib_bitmap_ori (later_retry->unacked_by_peer_bitmap, + peer - s->peers); + later_retry = next_retry (s, later_retry); + } + } + + ASSERT (mp->local_sequence == r->local_sequence); + + /* If we weren't expecting to hear from this peer */ + if (!peer_created && + !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers)) + { + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "dup-ack: for seq %d from peer %s", + .format_args = "i4T4", + }; + /* *INDENT-ON* */ + struct + { + u32 seq; + u32 peer; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->seq = r->local_sequence; + ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); + } + if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) + return; + } + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "ack: for seq %d from peer %s", + .format_args = "i4T4", + }; + /* *INDENT-ON* */ + struct + { + u32 seq; + u32 peer; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->seq = mp->local_sequence; + ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); + } + + r->unacked_by_peer_bitmap = + clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers); + + /* Not all clients have ack'ed */ + if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) + { + return; + } + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "ack: retire fifo elt loc seq %d after %d acks", + .format_args = "i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 seq; + u32 npeers; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->seq = r->local_sequence; + ed->npeers = pool_elts (s->peers); + } + + hash_unset (s->retry_index_by_local_sequence, mp->local_sequence); + mc_retry_free (mcm, s, r); + remove_retry_from_pool (s, r); + maybe_send_window_open_event (vm, s); +} + +#define EVENT_MC_SEND_CATCHUP_DATA 0 + +static uword +mc_catchup_process (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * f) +{ + mc_main_t *mcm = mc_node_get_main (node); + uword *event_data = 0; + mc_catchup_process_arg_t *args; + int i; + + while (1) + { + if (event_data) + _vec_len (event_data) = 0; + vlib_process_wait_for_event_with_type (vm, &event_data, + EVENT_MC_SEND_CATCHUP_DATA); + + for (i = 0; i < vec_len (event_data); i++) + { + args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]); + + mcm->transport.catchup_send_fun (mcm->transport.opaque, + args->catchup_opaque, + args->catchup_snapshot); + + /* Send function will free snapshot data vector. */ + pool_put (mcm->catchup_process_args, args); + } + } + + return 0; /* not likely */ +} + +static void +serialize_mc_stream (serialize_main_t * m, va_list * va) +{ + mc_stream_t *s = va_arg (*va, mc_stream_t *); + mc_stream_peer_t *p; + + serialize_integer (m, pool_elts (s->peers), sizeof (u32)); + /* *INDENT-OFF* */ + pool_foreach (p, s->peers, ({ + u8 * x = serialize_get (m, sizeof (p->id)); + clib_memcpy (x, p->id.as_u8, sizeof (p->id)); + serialize_integer (m, p->last_sequence_received, + sizeof (p->last_sequence_received)); + })); +/* *INDENT-ON* */ + serialize_bitmap (m, s->all_peer_bitmap); +} + +void +unserialize_mc_stream (serialize_main_t * m, va_list * va) +{ + mc_stream_t *s = va_arg (*va, mc_stream_t *); + u32 i, n_peers; + mc_stream_peer_t *p; + + unserialize_integer (m, &n_peers, sizeof (u32)); + mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); + for (i = 0; i < n_peers; i++) + { + u8 *x; + pool_get (s->peers, p); + x = unserialize_get (m, sizeof (p->id)); + clib_memcpy (p->id.as_u8, x, sizeof (p->id)); + unserialize_integer (m, &p->last_sequence_received, + sizeof (p->last_sequence_received)); + mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ + 0); + } + s->all_peer_bitmap = unserialize_bitmap (m); + + /* This is really bad. */ + if (!s->all_peer_bitmap) + clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name); +} + +void +mc_msg_catchup_request_handler (mc_main_t * mcm, + mc_msg_catchup_request_t * req, + u32 catchup_opaque) +{ + vlib_main_t *vm = mcm->vlib_main; + mc_stream_t *s; + mc_catchup_process_arg_t *args; + + mc_byte_swap_msg_catchup_request (req); + + s = mc_stream_by_index (mcm, req->stream_index); + if (!s || s->state != MC_STREAM_STATE_ready) + return; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "catchup-request: from %s stream %d", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 peer, stream; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64); + ed->stream = req->stream_index; + } + + /* + * The application has to snapshoot its data structures right + * here, right now. If we process any messages after + * noting the last global sequence we've processed, the client + * won't be able to accurately reconstruct our data structures. + * + * Once the data structures are e.g. vec_dup()'ed, we + * send the resulting messages from a separate process, to + * make sure that we don't cause a bunch of message retransmissions + */ + pool_get (mcm->catchup_process_args, args); + + args->stream_index = s - mcm->stream_vector; + args->catchup_opaque = catchup_opaque; + args->catchup_snapshot = 0; + + /* Construct catchup reply and snapshot state for stream to send as + catchup reply payload. */ + { + mc_msg_catchup_reply_t *rep; + serialize_main_t m; + + vec_resize (args->catchup_snapshot, sizeof (rep[0])); + + rep = (void *) args->catchup_snapshot; + + rep->peer_id = req->peer_id; + rep->stream_index = req->stream_index; + rep->last_global_sequence_included = s->last_global_sequence_processed; + + /* Setup for serialize to append to catchup snapshot. */ + serialize_open_vector (&m, args->catchup_snapshot); + m.stream.current_buffer_index = vec_len (m.stream.buffer); + + serialize (&m, serialize_mc_stream, s); + + args->catchup_snapshot = serialize_close_vector (&m); + + /* Actually copy internal state */ + args->catchup_snapshot = s->config.catchup_snapshot + (mcm, args->catchup_snapshot, rep->last_global_sequence_included); + + rep = (void *) args->catchup_snapshot; + rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]); + + mc_byte_swap_msg_catchup_reply (rep); + } + + /* now go send it... */ + vlib_process_signal_event (vm, mcm->catchup_process, + EVENT_MC_SEND_CATCHUP_DATA, + args - mcm->catchup_process_args); +} + +#define EVENT_MC_UNSERIALIZE_BUFFER 0 +#define EVENT_MC_UNSERIALIZE_CATCHUP 1 + +void +mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, + u32 catchup_opaque) +{ + vlib_process_signal_event (mcm->vlib_main, + mcm->unserialize_process, + EVENT_MC_UNSERIALIZE_CATCHUP, + pointer_to_uword (mp)); +} + +static void +perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp) +{ + mc_stream_t *s; + i32 seq_cmp_result; + + mc_byte_swap_msg_catchup_reply (mp); + + s = mc_stream_by_index (mcm, mp->stream_index); + + /* Never heard of this stream or already caught up. */ + if (!s || s->state == MC_STREAM_STATE_ready) + return; + + { + serialize_main_t m; + mc_stream_peer_t *p; + u32 n_stream_bytes; + + /* For offline sim replay: save the entire catchup snapshot... */ + if (s->config.save_snapshot) + s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, + mp->n_data_bytes); + + unserialize_open_data (&m, mp->data, mp->n_data_bytes); + unserialize (&m, unserialize_mc_stream, s); + + /* Make sure we start numbering our messages as expected */ + /* *INDENT-OFF* */ + pool_foreach (p, s->peers, ({ + if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64) + s->our_local_sequence = p->last_sequence_received + 1; + })); +/* *INDENT-ON* */ + + n_stream_bytes = m.stream.current_buffer_index; + + /* No need to unserialize close; nothing to free. */ + + /* After serialized stream is user's catchup data. */ + s->config.catchup (mcm, mp->data + n_stream_bytes, + mp->n_data_bytes - n_stream_bytes); + } + + /* Vector could have been moved by catchup. + This can only happen for mc-internal stream. */ + s = mc_stream_by_index (mcm, mp->stream_index); + + s->last_global_sequence_processed = mp->last_global_sequence_included; + + while (clib_fifo_elts (s->catchup_fifo)) + { + mc_msg_user_request_t *gp; + u32 bi; + vlib_buffer_t *b; + + clib_fifo_sub1 (s->catchup_fifo, bi); + + b = vlib_get_buffer (mcm->vlib_main, bi); + gp = vlib_buffer_get_current (b); + + /* Make sure we're replaying "new" news */ + seq_cmp_result = mc_seq_cmp (gp->global_sequence, + mp->last_global_sequence_included); + + if (seq_cmp_result > 0) + { + vlib_buffer_advance (b, sizeof (gp[0])); + s->config.rx_buffer (mcm, s, gp->peer_id, bi); + s->last_global_sequence_processed = gp->global_sequence; + + if (MC_EVENT_LOGGING) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "catchup replay local sequence 0x%x", + .format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 local_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->local_sequence = gp->local_sequence; + } + } + else + { + if (MC_EVENT_LOGGING) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (t) = + { + .format = "catchup discard local sequence 0x%x", + .format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 local_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, t); + ed->local_sequence = gp->local_sequence; + } + + vlib_buffer_free_one (mcm->vlib_main, bi); + } + } + + s->state = MC_STREAM_STATE_ready; + + /* Now that we are caught up wake up joining process. */ + { + vlib_one_time_waiting_process_t *wp; + vec_foreach (wp, s->procs_waiting_for_join_done) + vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); + if (s->procs_waiting_for_join_done) + _vec_len (s->procs_waiting_for_join_done) = 0; + } +} + +static void +this_node_maybe_master (mc_main_t * mcm) +{ + vlib_main_t *vm = mcm->vlib_main; + mc_msg_master_assert_t *mp; + uword event_type; + int timeouts = 0; + int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER; + clib_error_t *error; + f64 now, time_last_master_assert = -1; + u32 bi; + + while (1) + { + if (!mcm->we_can_be_relay_master) + { + mcm->relay_state = MC_RELAY_STATE_SLAVE; + if (MC_EVENT_LOGGING) + { + ELOG_TYPE (e, "become slave (config)"); + ELOG (mcm->elog_main, e, 0); + } + return; + } + + now = vlib_time_now (vm); + if (now >= time_last_master_assert + 1) + { + time_last_master_assert = now; + mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi); + + mp->peer_id = mcm->transport.our_ack_peer_id; + mp->global_sequence = mcm->relay_global_sequence; + + /* + * these messages clog the event log, set MC_EVENT_LOGGING higher + * if you want them + */ + if (MC_EVENT_LOGGING > 1) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "tx-massert: peer %s global seq %u", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 peer, global_sequence; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); + ed->global_sequence = mp->global_sequence; + } + + mc_byte_swap_msg_master_assert (mp); + + error = + mcm->transport.tx_buffer (mcm->transport.opaque, + MC_TRANSPORT_MASTERSHIP, bi); + if (error) + clib_error_report (error); + } + + vlib_process_wait_for_event_or_clock (vm, 1.0); + event_type = vlib_process_get_events (vm, /* no event data */ 0); + + switch (event_type) + { + case ~0: + if (!is_master && timeouts++ > 2) + { + mcm->relay_state = MC_RELAY_STATE_MASTER; + mcm->relay_master_peer_id = + mcm->transport.our_ack_peer_id.as_u64; + if (MC_EVENT_LOGGING) + { + ELOG_TYPE (e, "become master (was maybe_master)"); + ELOG (mcm->elog_main, e, 0); + } + return; + } + break; + + case MC_RELAY_STATE_SLAVE: + mcm->relay_state = MC_RELAY_STATE_SLAVE; + if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE) + { + ELOG_TYPE (e, "become slave (was maybe_master)"); + ELOG (mcm->elog_main, e, 0); + } + return; + } + } +} + +static void +this_node_slave (mc_main_t * mcm) +{ + vlib_main_t *vm = mcm->vlib_main; + uword event_type; + int timeouts = 0; + + if (MC_EVENT_LOGGING) + { + ELOG_TYPE (e, "become slave"); + ELOG (mcm->elog_main, e, 0); + } + + while (1) + { + vlib_process_wait_for_event_or_clock (vm, 1.0); + event_type = vlib_process_get_events (vm, /* no event data */ 0); + + switch (event_type) + { + case ~0: + if (timeouts++ > 2) + { + mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; + mcm->relay_master_peer_id = ~0ULL; + if (MC_EVENT_LOGGING) + { + ELOG_TYPE (e, "timeouts; negoitate mastership"); + ELOG (mcm->elog_main, e, 0); + } + return; + } + break; + + case MC_RELAY_STATE_SLAVE: + mcm->relay_state = MC_RELAY_STATE_SLAVE; + timeouts = 0; + break; + } + } +} + +static uword +mc_mastership_process (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * f) +{ + mc_main_t *mcm = mc_node_get_main (node); + + while (1) + { + switch (mcm->relay_state) + { + case MC_RELAY_STATE_NEGOTIATE: + case MC_RELAY_STATE_MASTER: + this_node_maybe_master (mcm); + break; + + case MC_RELAY_STATE_SLAVE: + this_node_slave (mcm); + break; + } + } + return 0; /* not likely */ +} + +void +mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master) +{ + if (we_can_be_master != mcm->we_can_be_relay_master) + { + mcm->we_can_be_relay_master = we_can_be_master; + vlib_process_signal_event (mcm->vlib_main, + mcm->mastership_process, + MC_RELAY_STATE_NEGOTIATE, 0); + } +} + +void +mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, + u32 buffer_index) +{ + mc_peer_id_t his_peer_id, our_peer_id; + i32 seq_cmp_result; + u8 signal_slave = 0; + u8 update_global_sequence = 0; + + mc_byte_swap_msg_master_assert (mp); + + his_peer_id = mp->peer_id; + our_peer_id = mcm->transport.our_ack_peer_id; + + /* compare the incoming global sequence with ours */ + seq_cmp_result = mc_seq_cmp (mp->global_sequence, + mcm->relay_global_sequence); + + /* If the sender has a lower peer id and the sender's sequence >= + our global sequence, we become a slave. Otherwise we are master. */ + if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 + && seq_cmp_result >= 0) + { + vlib_process_signal_event (mcm->vlib_main, + mcm->mastership_process, + MC_RELAY_STATE_SLAVE, 0); + signal_slave = 1; + } + + /* Update our global sequence. */ + if (seq_cmp_result > 0) + { + mcm->relay_global_sequence = mp->global_sequence; + update_global_sequence = 1; + } + + { + uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id); + mc_mastership_peer_t *p; + + if (q) + p = vec_elt_at_index (mcm->mastership_peers, q[0]); + else + { + vec_add2 (mcm->mastership_peers, p, 1); + p->peer_id = his_peer_id; + mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, + p - mcm->mastership_peers, + /* old_value */ 0); + } + p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main); + } + + /* + * these messages clog the event log, set MC_EVENT_LOGGING higher + * if you want them. + */ + if (MC_EVENT_LOGGING > 1) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "rx-massert: peer %s global seq %u upd %d slave %d", + .format_args = "T4i4i1i1", + }; + /* *INDENT-ON* */ + + struct + { + u32 peer; + u32 global_sequence; + u8 update_sequence; + u8 slave; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64); + ed->global_sequence = mp->global_sequence; + ed->update_sequence = update_global_sequence; + ed->slave = signal_slave; + } +} + +static void +mc_serialize_init (mc_main_t * mcm) +{ + mc_serialize_msg_t *m; + vlib_main_t *vm = vlib_get_main (); + + mcm->global_msg_index_by_name + = hash_create_string ( /* elts */ 0, sizeof (uword)); + + m = vm->mc_msg_registrations; + + while (m) + { + m->global_index = vec_len (mcm->global_msgs); + hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index); + vec_add1 (mcm->global_msgs, m); + m = m->next_registration; + } +} + +clib_error_t * +mc_serialize_va (mc_main_t * mc, + u32 stream_index, + u32 multiple_messages_per_vlib_buffer, + mc_serialize_msg_t * msg, va_list * va) +{ + mc_stream_t *s; + clib_error_t *error; + serialize_main_t *m = &mc->serialize_mains[VLIB_TX]; + vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX]; + u32 bi, n_before, n_after, n_total, n_this_msg; + u32 si, gi; + + if (!sbm->vlib_main) + { + sbm->tx.max_n_data_bytes_per_chain = 4096; + sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX; + } + + if (sbm->first_buffer == 0) + serialize_open_vlib_buffer (m, mc->vlib_main, sbm); + + n_before = serialize_vlib_buffer_n_bytes (m); + + s = mc_stream_by_index (mc, stream_index); + gi = msg->global_index; + ASSERT (msg == vec_elt (mc->global_msgs, gi)); + + si = ~0; + if (gi < vec_len (s->stream_msg_index_by_global_index)) + si = s->stream_msg_index_by_global_index[gi]; + + serialize_likely_small_unsigned_integer (m, si); + + /* For first time message is sent, use name to identify message. */ + if (si == ~0 || MSG_ID_DEBUG) + serialize_cstring (m, msg->name); + + if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "serialize-msg: %s index %d", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 c[2]; + } *ed; + ed = ELOG_DATA (mc->elog_main, e); + ed->c[0] = elog_id_for_msg_name (mc, msg->name); + ed->c[1] = si; + } + + error = va_serialize (m, va); + + n_after = serialize_vlib_buffer_n_bytes (m); + n_this_msg = n_after - n_before; + n_total = n_after + sizeof (mc_msg_user_request_t); + + /* For max message size ignore first message where string name is sent. */ + if (si != ~0) + msg->max_n_bytes_serialized = + clib_max (msg->max_n_bytes_serialized, n_this_msg); + + if (!multiple_messages_per_vlib_buffer + || si == ~0 + || n_total + msg->max_n_bytes_serialized > + mc->transport.max_packet_size) + { + bi = serialize_close_vlib_buffer (m); + sbm->first_buffer = 0; + if (!error) + mc_stream_send (mc, stream_index, bi); + else if (bi != ~0) + vlib_buffer_free_one (mc->vlib_main, bi); + } + + return error; +} + +clib_error_t * +mc_serialize_internal (mc_main_t * mc, + u32 stream_index, + u32 multiple_messages_per_vlib_buffer, + mc_serialize_msg_t * msg, ...) +{ + vlib_main_t *vm = mc->vlib_main; + va_list va; + clib_error_t *error; + + if (stream_index == ~0) + { + if (vm->mc_main && vm->mc_stream_index == ~0) + vlib_current_process_wait_for_one_time_event_vector + (vm, &vm->procs_waiting_for_mc_stream_join); + stream_index = vm->mc_stream_index; + } + + va_start (va, msg); + error = mc_serialize_va (mc, stream_index, + multiple_messages_per_vlib_buffer, msg, &va); + va_end (va); + return error; +} + +uword +mc_unserialize_message (mc_main_t * mcm, + mc_stream_t * s, serialize_main_t * m) +{ + mc_serialize_stream_msg_t *sm; + u32 gi, si; + + si = unserialize_likely_small_unsigned_integer (m); + + if (!(si == ~0 || MSG_ID_DEBUG)) + { + sm = vec_elt_at_index (s->stream_msgs, si); + gi = sm->global_index; + } + else + { + char *name; + + unserialize_cstring (m, &name); + + if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "unserialize-msg: %s rx index %d", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 c[2]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->c[0] = elog_id_for_msg_name (mcm, name); + ed->c[1] = si; + } + + { + uword *p = hash_get_mem (mcm->global_msg_index_by_name, name); + gi = p ? p[0] : ~0; + } + + /* Unknown message? */ + if (gi == ~0) + { + vec_free (name); + goto done; + } + + vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); + si = s->stream_msg_index_by_global_index[gi]; + + /* Stream local index unknown? Create it. */ + if (si == ~0) + { + vec_add2 (s->stream_msgs, sm, 1); + + si = sm - s->stream_msgs; + sm->global_index = gi; + s->stream_msg_index_by_global_index[gi] = si; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "msg-bind: stream %d %s to index %d", + .format_args = "i4T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 c[3]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->c[0] = s->index; + ed->c[1] = elog_id_for_msg_name (mcm, name); + ed->c[2] = si; + } + } + else + { + sm = vec_elt_at_index (s->stream_msgs, si); + if (gi != sm->global_index && MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "msg-id-ERROR: %s index %d expected %d", + .format_args = "T4i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 c[3]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->c[0] = elog_id_for_msg_name (mcm, name); + ed->c[1] = si; + ed->c[2] = ~0; + if (sm->global_index < + vec_len (s->stream_msg_index_by_global_index)) + ed->c[2] = + s->stream_msg_index_by_global_index[sm->global_index]; + } + } + + vec_free (name); + } + + if (gi != ~0) + { + mc_serialize_msg_t *msg; + msg = vec_elt (mcm->global_msgs, gi); + unserialize (m, msg->unserialize, mcm); + } + +done: + return gi != ~0; +} + +void +mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index) +{ + vlib_main_t *vm = mcm->vlib_main; + serialize_main_t *m = &mcm->serialize_mains[VLIB_RX]; + vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX]; + mc_stream_and_buffer_t *sb; + mc_stream_t *stream; + u32 buffer_index; + + sb = + pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, + stream_and_buffer_index); + buffer_index = sb->buffer_index; + stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index); + pool_put (mcm->mc_unserialize_stream_and_buffers, sb); + + if (stream->config.save_snapshot) + { + u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index); + static u8 *contents; + vec_reset_length (contents); + vec_validate (contents, n_bytes - 1); + vlib_buffer_contents (vm, buffer_index, contents); + stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, + n_bytes); + } + + ASSERT (vlib_in_process_context (vm)); + + unserialize_open_vlib_buffer (m, vm, sbm); + + clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index); + + while (unserialize_vlib_buffer_n_bytes (m) > 0) + mc_unserialize_message (mcm, stream, m); + + /* Frees buffer. */ + unserialize_close_vlib_buffer (m); +} + +void +mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index) +{ + vlib_main_t *vm = mcm->vlib_main; + mc_stream_and_buffer_t *sb; + pool_get (mcm->mc_unserialize_stream_and_buffers, sb); + sb->stream_index = s->index; + sb->buffer_index = buffer_index; + vlib_process_signal_event (vm, mcm->unserialize_process, + EVENT_MC_UNSERIALIZE_BUFFER, + sb - mcm->mc_unserialize_stream_and_buffers); +} + +static uword +mc_unserialize_process (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * f) +{ + mc_main_t *mcm = mc_node_get_main (node); + uword event_type, *event_data = 0; + int i; + + while (1) + { + if (event_data) + _vec_len (event_data) = 0; + + vlib_process_wait_for_event (vm); + event_type = vlib_process_get_events (vm, &event_data); + switch (event_type) + { + case EVENT_MC_UNSERIALIZE_BUFFER: + for (i = 0; i < vec_len (event_data); i++) + mc_unserialize_internal (mcm, event_data[i]); + break; + + case EVENT_MC_UNSERIALIZE_CATCHUP: + for (i = 0; i < vec_len (event_data); i++) + { + u8 *mp = uword_to_pointer (event_data[i], u8 *); + perform_catchup (mcm, (void *) mp); + vec_free (mp); + } + break; + + default: + break; + } + } + + return 0; /* not likely */ +} + +void +serialize_mc_main (serialize_main_t * m, va_list * va) +{ + mc_main_t *mcm = va_arg (*va, mc_main_t *); + mc_stream_t *s; + mc_serialize_stream_msg_t *sm; + mc_serialize_msg_t *msg; + + serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32)); + vec_foreach (s, mcm->stream_vector) + { + /* Stream name. */ + serialize_cstring (m, s->config.name); + + /* Serialize global names for all sent messages. */ + serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32)); + vec_foreach (sm, s->stream_msgs) + { + msg = vec_elt (mcm->global_msgs, sm->global_index); + serialize_cstring (m, msg->name); + } + } +} + +void +unserialize_mc_main (serialize_main_t * m, va_list * va) +{ + mc_main_t *mcm = va_arg (*va, mc_main_t *); + u32 i, n_streams, n_stream_msgs; + char *name; + mc_stream_t *s; + mc_serialize_stream_msg_t *sm; + + unserialize_integer (m, &n_streams, sizeof (u32)); + for (i = 0; i < n_streams; i++) + { + unserialize_cstring (m, &name); + if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name)) + { + vec_validate (mcm->stream_vector, i); + s = vec_elt_at_index (mcm->stream_vector, i); + mc_stream_init (s); + s->index = s - mcm->stream_vector; + s->config.name = name; + s->state = MC_STREAM_STATE_name_known; + hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index); + } + else + vec_free (name); + + s = vec_elt_at_index (mcm->stream_vector, i); + + vec_free (s->stream_msgs); + vec_free (s->stream_msg_index_by_global_index); + + unserialize_integer (m, &n_stream_msgs, sizeof (u32)); + vec_resize (s->stream_msgs, n_stream_msgs); + vec_foreach (sm, s->stream_msgs) + { + uword *p; + u32 si, gi; + + unserialize_cstring (m, &name); + p = hash_get (mcm->global_msg_index_by_name, name); + gi = p ? p[0] : ~0; + si = sm - s->stream_msgs; + + if (MC_EVENT_LOGGING > 0) + { + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "catchup-bind: %s to %d global index %d stream %d", + .format_args = "T4i4i4i4", + }; + /* *INDENT-ON* */ + + struct + { + u32 c[4]; + } *ed; + ed = ELOG_DATA (mcm->elog_main, e); + ed->c[0] = elog_id_for_msg_name (mcm, name); + ed->c[1] = si; + ed->c[2] = gi; + ed->c[3] = s->index; + } + + vec_free (name); + + sm->global_index = gi; + if (gi != ~0) + { + vec_validate_init_empty (s->stream_msg_index_by_global_index, + gi, ~0); + s->stream_msg_index_by_global_index[gi] = si; + } + } + } +} + +void +mc_main_init (mc_main_t * mcm, char *tag) +{ + vlib_main_t *vm = vlib_get_main (); + + mcm->vlib_main = vm; + mcm->elog_main = &vm->elog_main; + + mcm->relay_master_peer_id = ~0ULL; + mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; + + mcm->stream_index_by_name + = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); + + { + vlib_node_registration_t r; + + memset (&r, 0, sizeof (r)); + + r.type = VLIB_NODE_TYPE_PROCESS; + + /* Point runtime data to main instance. */ + r.runtime_data = &mcm; + r.runtime_data_bytes = sizeof (&mcm); + + r.name = (char *) format (0, "mc-mastership-%s", tag); + r.function = mc_mastership_process; + mcm->mastership_process = vlib_register_node (vm, &r); + + r.name = (char *) format (0, "mc-join-ager-%s", tag); + r.function = mc_join_ager_process; + mcm->join_ager_process = vlib_register_node (vm, &r); + + r.name = (char *) format (0, "mc-retry-%s", tag); + r.function = mc_retry_process; + mcm->retry_process = vlib_register_node (vm, &r); + + r.name = (char *) format (0, "mc-catchup-%s", tag); + r.function = mc_catchup_process; + mcm->catchup_process = vlib_register_node (vm, &r); + + r.name = (char *) format (0, "mc-unserialize-%s", tag); + r.function = mc_unserialize_process; + mcm->unserialize_process = vlib_register_node (vm, &r); + } + + if (MC_EVENT_LOGGING > 0) + mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), + sizeof (mc_peer_id_t)); + + mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), + sizeof (mc_peer_id_t)); + mc_serialize_init (mcm); +} + +static u8 * +format_mc_relay_state (u8 * s, va_list * args) +{ + mc_relay_state_t state = va_arg (*args, mc_relay_state_t); + char *t = 0; + switch (state) + { + case MC_RELAY_STATE_NEGOTIATE: + t = "negotiate"; + break; + case MC_RELAY_STATE_MASTER: + t = "master"; + break; + case MC_RELAY_STATE_SLAVE: + t = "slave"; + break; + default: + return format (s, "unknown 0x%x", state); + } + + return format (s, "%s", t); +} + +static u8 * +format_mc_stream_state (u8 * s, va_list * args) +{ + mc_stream_state_t state = va_arg (*args, mc_stream_state_t); + char *t = 0; + switch (state) + { +#define _(f) case MC_STREAM_STATE_##f: t = #f; break; + foreach_mc_stream_state +#undef _ + default: + return format (s, "unknown 0x%x", state); + } + + return format (s, "%s", t); +} + +static int +mc_peer_comp (void *a1, void *a2) +{ + mc_stream_peer_t *p1 = a1; + mc_stream_peer_t *p2 = a2; + + return mc_peer_id_compare (p1->id, p2->id); +} + +u8 * +format_mc_main (u8 * s, va_list * args) +{ + mc_main_t *mcm = va_arg (*args, mc_main_t *); + mc_stream_t *t; + mc_stream_peer_t *p, *ps; + uword indent = format_get_indent (s); + + s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", + format_mc_relay_state, mcm->relay_state, + vec_len (mcm->stream_vector), mcm->relay_global_sequence); + + { + mc_mastership_peer_t *mp; + f64 now = vlib_time_now (mcm->vlib_main); + s = format (s, "\n%UMost recent mastership peers:", + format_white_space, indent + 2); + vec_foreach (mp, mcm->mastership_peers) + { + s = format (s, "\n%U%-30U%.4e", + format_white_space, indent + 4, + mcm->transport.format_peer_id, mp->peer_id, + now - mp->time_last_master_assert_received); + } + } + + vec_foreach (t, mcm->stream_vector) + { + s = format (s, "\n%Ustream `%s' index %d", + format_white_space, indent + 2, t->config.name, t->index); + + s = format (s, "\n%Ustate %U", + format_white_space, indent + 4, + format_mc_stream_state, t->state); + + s = + format (s, + "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent", + format_white_space, indent + 4, t->config.retry_interval, + t->config.retry_limit, pool_elts (t->retry_pool), + t->stats.n_retries - t->stats_last_clear.n_retries); + + s = format (s, "\n%U%Ld/%Ld user requests sent/received", + format_white_space, indent + 4, + t->user_requests_sent, t->user_requests_received); + + s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x", + format_white_space, indent + 4, + pool_elts (t->peers), + t->our_local_sequence, t->last_global_sequence_processed); + + ps = 0; + /* *INDENT-OFF* */ + pool_foreach (p, t->peers, + ({ + if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers)) + vec_add1 (ps, p[0]); + })); + /* *INDENT-ON* */ + vec_sort_with_function (ps, mc_peer_comp); + s = format (s, "\n%U%=30s%10s%16s%16s", + format_white_space, indent + 6, + "Peer", "Last seq", "Retries", "Future"); + + vec_foreach (p, ps) + { + s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s", + format_white_space, indent + 6, + mcm->transport.format_peer_id, p->id.as_u64, + p->last_sequence_received, + p->stats.n_msgs_from_past - + p->stats_last_clear.n_msgs_from_past, + p->stats.n_msgs_from_future - + p->stats_last_clear.n_msgs_from_future, + (mcm->transport.our_ack_peer_id.as_u64 == + p->id.as_u64 ? " (self)" : "")); + } + vec_free (ps); + } + + return s; +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ |