summaryrefslogtreecommitdiffstats
path: root/src/vlib/mc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vlib/mc.c')
-rw-r--r--src/vlib/mc.c2609
1 files changed, 0 insertions, 2609 deletions
diff --git a/src/vlib/mc.c b/src/vlib/mc.c
deleted file mode 100644
index a289871f570..00000000000
--- a/src/vlib/mc.c
+++ /dev/null
@@ -1,2609 +0,0 @@
-/*
- * mc.c: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <vlib/vlib.h>
-
-/*
- * 1 to enable msg id training wheels, which are useful for tracking
- * down catchup and/or partitioned network problems
- */
-#define MSG_ID_DEBUG 0
-
-static format_function_t format_mc_stream_state;
-
-static u32
-elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
-{
- uword *p, r;
- mhash_t *h = &m->elog_id_by_peer_id;
-
- if (!m->elog_id_by_peer_id.hash)
- mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
-
- p = mhash_get (h, &peer_id);
- if (p)
- return p[0];
- r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
- mhash_set (h, &peer_id, r, /* old_value */ 0);
- return r;
-}
-
-static u32
-elog_id_for_msg_name (mc_main_t * m, char *msg_name)
-{
- uword *p, r;
- uword *h = m->elog_id_by_msg_name;
- u8 *name_copy;
-
- if (!h)
- h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
-
- p = hash_get_mem (h, msg_name);
- if (p)
- return p[0];
- r = elog_string (m->elog_main, "%s", msg_name);
-
- name_copy = format (0, "%s%c", msg_name, 0);
-
- hash_set_mem (h, name_copy, r);
- m->elog_id_by_msg_name = h;
-
- return r;
-}
-
-static void
-elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
- u32 retry_count)
-{
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-msg: stream %d local seq %d attempt %d",
- .format_args = "i4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_id, local_sequence, retry_count;
- } *ed;
- ed = ELOG_DATA (m->elog_main, e);
- ed->stream_id = stream_id;
- ed->local_sequence = local_sequence;
- ed->retry_count = retry_count;
- }
-}
-
-/*
- * seq_cmp
- * correctly compare two unsigned sequence numbers.
- * This function works so long as x and y are within 2**(n-1) of each
- * other, where n = bits(x, y).
- *
- * Magic decoder ring:
- * seq_cmp == 0 => x and y are equal
- * seq_cmp < 0 => x is "in the past" with respect to y
- * seq_cmp > 0 => x is "in the future" with respect to y
- */
-always_inline i32
-mc_seq_cmp (u32 x, u32 y)
-{
- return (i32) x - (i32) y;
-}
-
-void *
-mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
-{
- u32 n_alloc, bi = 0;
- vlib_buffer_t *b;
-
- n_alloc = vlib_buffer_alloc (vm, &bi, 1);
- ASSERT (n_alloc == 1);
-
- b = vlib_get_buffer (vm, bi);
- b->current_length = n_bytes;
- *bi_return = bi;
- return (void *) b->data;
-}
-
-static void
-delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
- uword index, int notify_application)
-{
- mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
- ASSERT (p != 0);
- if (s->config.peer_died && notify_application)
- s->config.peer_died (mcm, s, p->id);
-
- s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "delete peer %s from all_peer_bitmap",
- .format_args = "T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer;
- } *ed = 0;
-
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- }
- /* Do not delete the pool / hash table entries, or we lose sequence number state */
-}
-
-static mc_stream_peer_t *
-get_or_create_peer_with_id (mc_main_t * mcm,
- mc_stream_t * s, mc_peer_id_t id, int *created)
-{
- uword *q = mhash_get (&s->peer_index_by_id, &id);
- mc_stream_peer_t *p;
-
- if (q)
- {
- p = pool_elt_at_index (s->peers, q[0]);
- goto done;
- }
-
- pool_get (s->peers, p);
- memset (p, 0, sizeof (p[0]));
- p->id = id;
- p->last_sequence_received = ~0;
- mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
- if (created)
- *created = 1;
-
-done:
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "get_or_create %s peer %s stream %d seq %d",
- .format_args = "t4T4i4i4",
- .n_enum_strings = 2,
- .enum_strings = {
- "old", "new",
- },
- };
- /* *INDENT-ON* */
- struct
- {
- u32 is_new, peer, stream_index, rx_sequence;
- } *ed = 0;
-
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->is_new = q ? 0 : 1;
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->stream_index = s->index;
- ed->rx_sequence = p->last_sequence_received;
- }
- /* $$$$ Enable or reenable this peer */
- s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
- return p;
-}
-
-static void
-maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
-{
- vlib_one_time_waiting_process_t *p;
-
- if (pool_elts (stream->retry_pool) >= stream->config.window_size)
- return;
-
- vec_foreach (p, stream->procs_waiting_for_open_window)
- vlib_signal_one_time_waiting_process (vm, p);
-
- if (stream->procs_waiting_for_open_window)
- _vec_len (stream->procs_waiting_for_open_window) = 0;
-}
-
-static void
-mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
-{
- mc_retry_t record, *retp;
-
- if (r->unacked_by_peer_bitmap)
- _vec_len (r->unacked_by_peer_bitmap) = 0;
-
- if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
- {
- clib_fifo_sub1 (s->retired_fifo, record);
- vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
- }
-
- clib_fifo_add2 (s->retired_fifo, retp);
-
- retp->buffer_index = r->buffer_index;
- retp->local_sequence = r->local_sequence;
-
- r->buffer_index = ~0; /* poison buffer index in this retry */
-}
-
-static void
-mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
-{
- mc_retry_t *retry;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "resend-retired: search for local seq %d",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->local_sequence = local_sequence;
- }
-
- /* *INDENT-OFF* */
- clib_fifo_foreach (retry, s->retired_fifo,
- ({
- if (retry->local_sequence == local_sequence)
- {
- elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- retry->buffer_index);
- return;
- }
- }));
- /* *INDENT-ON* */
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "resend-retired: FAILED search for local seq %d",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->local_sequence = local_sequence;
- }
-}
-
-static uword *
-delete_retry_fifo_elt (mc_main_t * mcm,
- mc_stream_t * stream,
- mc_retry_t * r, uword * dead_peer_bitmap)
-{
- mc_stream_peer_t *p;
-
- /* *INDENT-OFF* */
- pool_foreach (p, stream->peers, ({
- uword pi = p - stream->peers;
- uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
-
- if (! is_alive)
- dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
-
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "delete_retry_fifo_elt: peer %s is %s",
- .format_args = "T4t4",
- .n_enum_strings = 2,
- .enum_strings = { "alive", "dead", },
- };
- struct { u32 peer, is_alive; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->is_alive = is_alive;
- }
- }));
- /* *INDENT-ON* */
-
- hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
- mc_retry_free (mcm, stream, r);
-
- return dead_peer_bitmap;
-}
-
-always_inline mc_retry_t *
-prev_retry (mc_stream_t * s, mc_retry_t * r)
-{
- return (r->prev_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
-}
-
-always_inline mc_retry_t *
-next_retry (mc_stream_t * s, mc_retry_t * r)
-{
- return (r->next_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
-}
-
-always_inline void
-remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
-{
- mc_retry_t *p = prev_retry (s, r);
- mc_retry_t *n = next_retry (s, r);
-
- if (p)
- p->next_index = r->next_index;
- else
- s->retry_head_index = r->next_index;
- if (n)
- n->prev_index = r->prev_index;
- else
- s->retry_tail_index = r->prev_index;
-
- pool_put_index (s->retry_pool, r - s->retry_pool);
-}
-
-static void
-check_retry (mc_main_t * mcm, mc_stream_t * s)
-{
- mc_retry_t *r;
- vlib_main_t *vm = mcm->vlib_main;
- f64 now = vlib_time_now (vm);
- uword *dead_peer_bitmap = 0;
- u32 ri, ri_next;
-
- for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
- {
- r = pool_elt_at_index (s->retry_pool, ri);
- ri_next = r->next_index;
-
- if (now < r->sent_at + s->config.retry_interval)
- continue;
-
- r->n_retries += 1;
- if (r->n_retries > s->config.retry_limit)
- {
- dead_peer_bitmap =
- delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
- remove_retry_from_pool (s, r);
- }
- else
- {
- if (MC_EVENT_LOGGING > 0)
- {
- mc_stream_peer_t *p;
-
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "resend local seq %d attempt %d",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
-
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
- {
- ELOG_TYPE_DECLARE (ev) = {
- .format = "resend: needed by peer %s local seq %d",
- .format_args = "T4i4",
- };
- struct { u32 peer, rx_sequence; } * ed;
- ed = ELOG_DATA (mcm->elog_main, ev);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->rx_sequence = r->local_sequence;
- }
- }));
- /* *INDENT-ON* */
-
- struct
- {
- u32 sequence;
- u32 trail;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->sequence = r->local_sequence;
- ed->trail = r->n_retries;
- }
-
- r->sent_at = vlib_time_now (vm);
- s->stats.n_retries += 1;
-
- elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
-
- mcm->transport.tx_buffer
- (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
- }
- }
-
- maybe_send_window_open_event (mcm->vlib_main, s);
-
- /* Delete any dead peers we've found. */
- if (!clib_bitmap_is_zero (dead_peer_bitmap))
- {
- uword i;
-
- /* *INDENT-OFF* */
- clib_bitmap_foreach (i, dead_peer_bitmap, ({
- delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
-
- /* Delete any references to just deleted peer in retry pool. */
- pool_foreach (r, s->retry_pool, ({
- r->unacked_by_peer_bitmap =
- clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
- }));
- }));
-/* *INDENT-ON* */
- clib_bitmap_free (dead_peer_bitmap);
- }
-}
-
-always_inline mc_main_t *
-mc_node_get_main (vlib_node_runtime_t * node)
-{
- mc_main_t **p = (void *) node->runtime_data;
- return p[0];
-}
-
-static uword
-mc_retry_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- mc_stream_t *s;
-
- while (1)
- {
- vlib_process_suspend (vm, 1.0);
- vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_invalid)
- check_retry (mcm, s);
- }
- }
- return 0; /* not likely */
-}
-
-static void
-send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_msg_join_or_leave_request_t *mp;
- u32 bi;
-
- mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
- memset (mp, 0, sizeof (*mp));
- mp->type = MC_MSG_TYPE_join_or_leave_request;
- mp->peer_id = mcm->transport.our_ack_peer_id;
- mp->stream_index = stream_index;
- mp->is_join = is_join;
-
- mc_byte_swap_msg_join_or_leave_request (mp);
-
- /*
- * These msgs are unnumbered, unordered so send on the from-relay
- * channel.
- */
- mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
-}
-
-static uword
-mc_join_ager_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
-
- while (1)
- {
- if (mcm->joins_in_progress)
- {
- mc_stream_t *s;
- vlib_one_time_waiting_process_t *p;
- f64 now = vlib_time_now (vm);
-
- vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_join_in_progress)
- continue;
-
- if (now > s->join_timeout)
- {
- s->state = MC_STREAM_STATE_ready;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream %d join timeout",
- };
- /* *INDENT-ON* */
- ELOG (mcm->elog_main, e, s->index);
- }
- /* Make sure that this app instance exists as a stream peer,
- or we may answer a catchup request with a NULL
- all_peer_bitmap... */
- (void) get_or_create_peer_with_id
- (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
-
- vec_foreach (p, s->procs_waiting_for_join_done)
- vlib_signal_one_time_waiting_process (vm, p);
- if (s->procs_waiting_for_join_done)
- _vec_len (s->procs_waiting_for_join_done) = 0;
-
- mcm->joins_in_progress--;
- ASSERT (mcm->joins_in_progress >= 0);
- }
- else
- {
- /* Resent join request which may have been lost. */
- send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
-
- /* We're *not* alone, retry for as long as it takes */
- if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
- s->join_timeout = vlib_time_now (vm) + 2.0;
-
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream %d resend join request",
- };
- /* *INDENT-ON* */
- ELOG (mcm->elog_main, e, s->index);
- }
- }
- }
- }
-
- vlib_process_suspend (vm, .5);
- }
-
- return 0; /* not likely */
-}
-
-static void
-serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
- char *name = va_arg (*va, char *);
- serialize_cstring (m, name);
-}
-
-static void
-elog_stream_name (char *buf, int n_buf_bytes, char *v)
-{
- clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
- buf[n_buf_bytes - 1] = 0;
-}
-
-static void
-unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- char *name;
- mc_stream_t *s;
- uword *p;
-
- unserialize_cstring (m, &name);
-
- if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d already named %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = p[0];
- elog_stream_name (ed->name, sizeof (ed->name), name);
- }
-
- vec_free (name);
- return;
- }
-
- vec_add2 (mcm->stream_vector, s, 1);
- mc_stream_init (s);
- s->state = MC_STREAM_STATE_name_known;
- s->index = s - mcm->stream_vector;
- s->config.name = name;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d named %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = s->index;
- elog_stream_name (ed->name, sizeof (ed->name), name);
- }
-
- hash_set_mem (mcm->stream_index_by_name, name, s->index);
-
- p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
- if (p)
- {
- vlib_one_time_waiting_process_t *wp, **w;
- w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
- vec_foreach (wp, w[0])
- vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
- pool_put (mcm->procs_waiting_for_stream_name_pool, w);
- hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
- }
-}
-
-/* *INDENT-OFF* */
-MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
-{
- .name = "mc_register_stream_name",
- .serialize = serialize_mc_register_stream_name,
- .unserialize = unserialize_mc_register_stream_name,
-};
-/* *INDENT-ON* */
-
-void
-mc_rx_buffer_unserialize (mc_main_t * mcm,
- mc_stream_t * stream,
- mc_peer_id_t peer_id, u32 buffer_index)
-{
- return mc_unserialize (mcm, stream, buffer_index);
-}
-
-static u8 *
-mc_internal_catchup_snapshot (mc_main_t * mcm,
- u8 * data_vector,
- u32 last_global_sequence_processed)
-{
- serialize_main_t m;
-
- /* Append serialized data to data vector. */
- serialize_open_vector (&m, data_vector);
- m.stream.current_buffer_index = vec_len (data_vector);
-
- serialize (&m, serialize_mc_main, mcm);
- return serialize_close_vector (&m);
-}
-
-static void
-mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
-{
- serialize_main_t s;
-
- unserialize_open_data (&s, data, n_data_bytes);
-
- unserialize (&s, unserialize_mc_main, mcm);
-}
-
-/* Overridden from the application layer, not actually used here */
-void mc_stream_join_process_hold (void) __attribute__ ((weak));
-void
-mc_stream_join_process_hold (void)
-{
-}
-
-static u32
-mc_stream_join_helper (mc_main_t * mcm,
- mc_stream_config_t * config, u32 is_internal)
-{
- mc_stream_t *s;
- vlib_main_t *vm = mcm->vlib_main;
-
- s = 0;
- if (!is_internal)
- {
- uword *p;
-
- /* Already have a stream with given name? */
- if ((s = mc_stream_by_name (mcm, config->name)))
- {
- /* Already joined and ready? */
- if (s->state == MC_STREAM_STATE_ready)
- return s->index;
- }
-
- /* First join MC internal stream. */
- if (!mcm->stream_vector
- || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
- == MC_STREAM_STATE_invalid))
- {
- static mc_stream_config_t c = {
- .name = "mc-internal",
- .rx_buffer = mc_rx_buffer_unserialize,
- .catchup = mc_internal_catchup,
- .catchup_snapshot = mc_internal_catchup_snapshot,
- };
-
- c.save_snapshot = config->save_snapshot;
-
- mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
- }
-
- /* If stream is still unknown register this name and wait for
- sequenced message to name stream. This way all peers agree
- on stream name to index mappings. */
- s = mc_stream_by_name (mcm, config->name);
- if (!s)
- {
- vlib_one_time_waiting_process_t *wp, **w;
- u8 *name_copy = format (0, "%s", config->name);
-
- mc_serialize_stream (mcm,
- MC_STREAM_INDEX_INTERNAL,
- &mc_register_stream_name_msg, config->name);
-
- /* Wait for this stream to be named. */
- p =
- hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
- name_copy);
- if (p)
- w =
- pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
- p[0]);
- else
- {
- pool_get (mcm->procs_waiting_for_stream_name_pool, w);
- if (!mcm->procs_waiting_for_stream_name_by_name)
- mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
- sizeof
- (uword));
- hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
- name_copy,
- w - mcm->procs_waiting_for_stream_name_pool);
- w[0] = 0;
- }
-
- vec_add2 (w[0], wp, 1);
- vlib_current_process_wait_for_one_time_event (vm, wp);
- vec_free (name_copy);
- }
-
- /* Name should be known now. */
- s = mc_stream_by_name (mcm, config->name);
- ASSERT (s != 0);
- ASSERT (s->state == MC_STREAM_STATE_name_known);
- }
-
- if (!s)
- {
- vec_add2 (mcm->stream_vector, s, 1);
- mc_stream_init (s);
- s->index = s - mcm->stream_vector;
- }
-
- {
- /* Save name since we could have already used it as hash key. */
- char *name_save = s->config.name;
-
- s->config = config[0];
-
- if (name_save)
- s->config.name = name_save;
- }
-
- if (s->config.window_size == 0)
- s->config.window_size = 8;
-
- if (s->config.retry_interval == 0.0)
- s->config.retry_interval = 1.0;
-
- /* Sanity. */
- ASSERT (s->config.retry_interval < 30);
-
- if (s->config.retry_limit == 0)
- s->config.retry_limit = 7;
-
- s->state = MC_STREAM_STATE_join_in_progress;
- if (!s->peer_index_by_id.hash)
- mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
-
- /* If we don't hear from someone in 5 seconds, we're alone */
- s->join_timeout = vlib_time_now (vm) + 5.0;
- mcm->joins_in_progress++;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d join request %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = s->index;
- elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
- }
-
- send_join_or_leave_request (mcm, s->index, 1 /* join */ );
-
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_join_done);
-
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "join complete stream %d");
- ELOG (mcm->elog_main, e, s->index);
- }
-
- return s->index;
-}
-
-u32
-mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
-{
- return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
-}
-
-void
-mc_stream_leave (mc_main_t * mcm, u32 stream_index)
-{
- mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
-
- if (!s)
- return;
-
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "leave-stream: %d",.format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 index;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->index = stream_index;
- }
-
- send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
- mc_stream_free (s);
- s->state = MC_STREAM_STATE_name_known;
-}
-
-void
-mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
- mc_msg_join_or_leave_request_t * req,
- u32 buffer_index)
-{
- mc_stream_t *s;
- mc_msg_join_reply_t *rep;
- u32 bi;
-
- mc_byte_swap_msg_join_or_leave_request (req);
-
- s = mc_stream_by_index (mcm, req->stream_index);
- if (!s || s->state != MC_STREAM_STATE_ready)
- return;
-
- /* If the peer is joining, create it */
- if (req->is_join)
- {
- mc_stream_t *this_s;
-
- /* We're not in a position to catch up a peer until all
- stream joins are complete. */
- if (0)
- {
- /* XXX This is hard to test so we've. */
- vec_foreach (this_s, mcm->stream_vector)
- {
- if (this_s->state != MC_STREAM_STATE_ready
- && this_s->state != MC_STREAM_STATE_name_known)
- return;
- }
- }
- else if (mcm->joins_in_progress > 0)
- return;
-
- (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
- /* created */ 0);
-
- rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
- memset (rep, 0, sizeof (rep[0]));
- rep->type = MC_MSG_TYPE_join_reply;
- rep->stream_index = req->stream_index;
-
- mc_byte_swap_msg_join_reply (rep);
- /* These two are already in network byte order... */
- rep->peer_id = mcm->transport.our_ack_peer_id;
- rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
-
- mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
- }
- else
- {
- if (s->config.peer_died)
- s->config.peer_died (mcm, s, req->peer_id);
- }
-}
-
-void
-mc_msg_join_reply_handler (mc_main_t * mcm,
- mc_msg_join_reply_t * mp, u32 buffer_index)
-{
- mc_stream_t *s;
-
- mc_byte_swap_msg_join_reply (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- if (!s || s->state != MC_STREAM_STATE_join_in_progress)
- return;
-
- /* Switch to catchup state; next join reply
- for this stream will be ignored. */
- s->state = MC_STREAM_STATE_catchup;
-
- mcm->joins_in_progress--;
- mcm->transport.catchup_request_fun (mcm->transport.opaque,
- mp->stream_index, mp->catchup_peer_id);
-}
-
-void
-mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
-{
- mc_stream_t *s;
-
- while (1)
- {
- s = mc_stream_by_name (m, stream_name);
- if (s)
- break;
- vlib_process_suspend (m->vlib_main, .1);
- }
-
- /* It's OK to send a message in catchup and ready states. */
- if (s->state == MC_STREAM_STATE_catchup
- || s->state == MC_STREAM_STATE_ready)
- return;
-
- /* Otherwise we are waiting for a join to finish. */
- vlib_current_process_wait_for_one_time_event_vector
- (m->vlib_main, &s->procs_waiting_for_join_done);
-}
-
-u32
-mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
-{
- mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
- vlib_main_t *vm = mcm->vlib_main;
- mc_retry_t *r;
- mc_msg_user_request_t *mp;
- vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
- u32 ri;
-
- if (!s)
- return 0;
-
- if (s->state != MC_STREAM_STATE_ready)
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_join_done);
-
- while (pool_elts (s->retry_pool) >= s->config.window_size)
- {
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_open_window);
- }
-
- pool_get (s->retry_pool, r);
- ri = r - s->retry_pool;
-
- r->prev_index = s->retry_tail_index;
- r->next_index = ~0;
- s->retry_tail_index = ri;
-
- if (r->prev_index == ~0)
- s->retry_head_index = ri;
- else
- {
- mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
- p->next_index = ri;
- }
-
- vlib_buffer_advance (b, -sizeof (mp[0]));
- mp = vlib_buffer_get_current (b);
-
- mp->peer_id = mcm->transport.our_ack_peer_id;
- /* mp->transport.global_sequence set by relay agent. */
- mp->global_sequence = 0xdeadbeef;
- mp->stream_index = s->index;
- mp->local_sequence = s->our_local_sequence++;
- mp->n_data_bytes =
- vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
-
- r->buffer_index = buffer_index;
- r->local_sequence = mp->local_sequence;
- r->sent_at = vlib_time_now (vm);
- r->n_retries = 0;
-
- /* Retry will be freed when all currently known peers have acked. */
- vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
- vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
-
- hash_set (s->retry_index_by_local_sequence, r->local_sequence,
- r - s->retry_pool);
-
- elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
-
- mc_byte_swap_msg_user_request (mp);
-
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
-
- s->user_requests_sent++;
-
- /* return amount of window remaining */
- return s->config.window_size - pool_elts (s->retry_pool);
-}
-
-void
-mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
- u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_t *s;
- mc_stream_peer_t *peer;
- i32 seq_cmp_result;
- static int once = 0;
-
- mc_byte_swap_msg_user_request (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- /* Not signed up for this stream? Turf-o-matic */
- if (!s || s->state != MC_STREAM_STATE_ready)
- {
- vlib_buffer_free_one (vm, buffer_index);
- return;
- }
-
- /* Find peer, including ourselves. */
- peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
- /* created */ 0);
-
- seq_cmp_result = mc_seq_cmp (mp->local_sequence,
- peer->last_sequence_received + 1);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
- .format_args = "T4i4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, stream_index, rx_sequence;
- i32 seq_cmp_result;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- ed->stream_index = mp->stream_index;
- ed->rx_sequence = mp->local_sequence;
- ed->seq_cmp_result = seq_cmp_result;
- }
-
- if (0 && mp->stream_index == 1 && once == 0)
- {
- once = 1;
- ELOG_TYPE (e, "FAKE lost msg on stream 1");
- ELOG (mcm->elog_main, e, 0);
- return;
- }
-
- peer->last_sequence_received += seq_cmp_result == 0;
- s->user_requests_received++;
-
- if (seq_cmp_result > 0)
- peer->stats.n_msgs_from_future += 1;
-
- /* Send ack even if msg from future */
- if (1)
- {
- mc_msg_user_ack_t *rp;
- u32 bi;
-
- rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
- rp->peer_id = mcm->transport.our_ack_peer_id;
- rp->stream_index = s->index;
- rp->local_sequence = mp->local_sequence;
- rp->seq_cmp_result = seq_cmp_result;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-ack: stream %d local seq %d",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = rp->stream_index;
- ed->local_sequence = rp->local_sequence;
- }
-
- mc_byte_swap_msg_user_ack (rp);
-
- mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
- /* Msg from past? If so, free the buffer... */
- if (seq_cmp_result < 0)
- {
- vlib_buffer_free_one (vm, buffer_index);
- peer->stats.n_msgs_from_past += 1;
- }
- }
-
- if (seq_cmp_result == 0)
- {
- vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
- switch (s->state)
- {
- case MC_STREAM_STATE_ready:
- vlib_buffer_advance (b, sizeof (mp[0]));
- s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
-
- /* Stream vector can change address via rx callback for mc-internal
- stream. */
- s = mc_stream_by_index (mcm, mp->stream_index);
- ASSERT (s != 0);
- s->last_global_sequence_processed = mp->global_sequence;
- break;
-
- case MC_STREAM_STATE_catchup:
- clib_fifo_add1 (s->catchup_fifo, buffer_index);
- break;
-
- default:
- clib_warning ("stream in unknown state %U",
- format_mc_stream_state, s->state);
- break;
- }
- }
-}
-
-void
-mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
- u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- uword *p;
- mc_stream_t *s;
- mc_stream_peer_t *peer;
- mc_retry_t *r;
- int peer_created = 0;
-
- mc_byte_swap_msg_user_ack (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
- .format_args = "i4T4i4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 local_sequence;
- u32 peer;
- i32 seq_cmp_result;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- ed->seq_cmp_result = mp->seq_cmp_result;
- }
-
- /* Unknown stream? */
- if (!s)
- return;
-
- /* Find the peer which just ack'ed. */
- peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
- /* created */ &peer_created);
-
- /*
- * Peer reports message from the future. If it's not in the retry
- * fifo, look for a retired message.
- */
- if (mp->seq_cmp_result > 0)
- {
- p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
- mp->seq_cmp_result);
- if (p == 0)
- mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
-
- /* Normal retry should fix it... */
- return;
- }
-
- /*
- * Pointer to the indicated retry fifo entry.
- * Worth hashing because we could use a window size of 100 or 1000.
- */
- p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
-
- /*
- * Is this a duplicate ACK, received after we've retired the
- * fifo entry. This can happen when learning about new
- * peers.
- */
- if (p == 0)
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: for seq %d from peer %s no fifo elt",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- }
-
- return;
- }
-
- r = pool_elt_at_index (s->retry_pool, p[0]);
-
- /* Make sure that this new peer ACKs our msgs from now on */
- if (peer_created)
- {
- mc_retry_t *later_retry = next_retry (s, r);
-
- while (later_retry)
- {
- later_retry->unacked_by_peer_bitmap =
- clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
- peer - s->peers);
- later_retry = next_retry (s, later_retry);
- }
- }
-
- ASSERT (mp->local_sequence == r->local_sequence);
-
- /* If we weren't expecting to hear from this peer */
- if (!peer_created &&
- !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "dup-ack: for seq %d from peer %s",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = r->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- }
- if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
- return;
- }
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: for seq %d from peer %s",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- }
-
- r->unacked_by_peer_bitmap =
- clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
-
- /* Not all clients have ack'ed */
- if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
- {
- return;
- }
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: retire fifo elt loc seq %d after %d acks",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 npeers;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = r->local_sequence;
- ed->npeers = pool_elts (s->peers);
- }
-
- hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
- mc_retry_free (mcm, s, r);
- remove_retry_from_pool (s, r);
- maybe_send_window_open_event (vm, s);
-}
-
-#define EVENT_MC_SEND_CATCHUP_DATA 0
-
-static uword
-mc_catchup_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- uword *event_data = 0;
- mc_catchup_process_arg_t *args;
- int i;
-
- while (1)
- {
- if (event_data)
- _vec_len (event_data) = 0;
- vlib_process_wait_for_event_with_type (vm, &event_data,
- EVENT_MC_SEND_CATCHUP_DATA);
-
- for (i = 0; i < vec_len (event_data); i++)
- {
- args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
-
- mcm->transport.catchup_send_fun (mcm->transport.opaque,
- args->catchup_opaque,
- args->catchup_snapshot);
-
- /* Send function will free snapshot data vector. */
- pool_put (mcm->catchup_process_args, args);
- }
- }
-
- return 0; /* not likely */
-}
-
-static void
-serialize_mc_stream (serialize_main_t * m, va_list * va)
-{
- mc_stream_t *s = va_arg (*va, mc_stream_t *);
- mc_stream_peer_t *p;
-
- serialize_integer (m, pool_elts (s->peers), sizeof (u32));
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- u8 * x = serialize_get (m, sizeof (p->id));
- clib_memcpy (x, p->id.as_u8, sizeof (p->id));
- serialize_integer (m, p->last_sequence_received,
- sizeof (p->last_sequence_received));
- }));
-/* *INDENT-ON* */
- serialize_bitmap (m, s->all_peer_bitmap);
-}
-
-void
-unserialize_mc_stream (serialize_main_t * m, va_list * va)
-{
- mc_stream_t *s = va_arg (*va, mc_stream_t *);
- u32 i, n_peers;
- mc_stream_peer_t *p;
-
- unserialize_integer (m, &n_peers, sizeof (u32));
- mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
- for (i = 0; i < n_peers; i++)
- {
- u8 *x;
- pool_get (s->peers, p);
- x = unserialize_get (m, sizeof (p->id));
- clib_memcpy (p->id.as_u8, x, sizeof (p->id));
- unserialize_integer (m, &p->last_sequence_received,
- sizeof (p->last_sequence_received));
- mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
- 0);
- }
- s->all_peer_bitmap = unserialize_bitmap (m);
-
- /* This is really bad. */
- if (!s->all_peer_bitmap)
- clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
-}
-
-void
-mc_msg_catchup_request_handler (mc_main_t * mcm,
- mc_msg_catchup_request_t * req,
- u32 catchup_opaque)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_t *s;
- mc_catchup_process_arg_t *args;
-
- mc_byte_swap_msg_catchup_request (req);
-
- s = mc_stream_by_index (mcm, req->stream_index);
- if (!s || s->state != MC_STREAM_STATE_ready)
- return;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup-request: from %s stream %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, stream;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
- ed->stream = req->stream_index;
- }
-
- /*
- * The application has to snapshoot its data structures right
- * here, right now. If we process any messages after
- * noting the last global sequence we've processed, the client
- * won't be able to accurately reconstruct our data structures.
- *
- * Once the data structures are e.g. vec_dup()'ed, we
- * send the resulting messages from a separate process, to
- * make sure that we don't cause a bunch of message retransmissions
- */
- pool_get (mcm->catchup_process_args, args);
-
- args->stream_index = s - mcm->stream_vector;
- args->catchup_opaque = catchup_opaque;
- args->catchup_snapshot = 0;
-
- /* Construct catchup reply and snapshot state for stream to send as
- catchup reply payload. */
- {
- mc_msg_catchup_reply_t *rep;
- serialize_main_t m;
-
- vec_resize (args->catchup_snapshot, sizeof (rep[0]));
-
- rep = (void *) args->catchup_snapshot;
-
- rep->peer_id = req->peer_id;
- rep->stream_index = req->stream_index;
- rep->last_global_sequence_included = s->last_global_sequence_processed;
-
- /* Setup for serialize to append to catchup snapshot. */
- serialize_open_vector (&m, args->catchup_snapshot);
- m.stream.current_buffer_index = vec_len (m.stream.buffer);
-
- serialize (&m, serialize_mc_stream, s);
-
- args->catchup_snapshot = serialize_close_vector (&m);
-
- /* Actually copy internal state */
- args->catchup_snapshot = s->config.catchup_snapshot
- (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
-
- rep = (void *) args->catchup_snapshot;
- rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
-
- mc_byte_swap_msg_catchup_reply (rep);
- }
-
- /* now go send it... */
- vlib_process_signal_event (vm, mcm->catchup_process,
- EVENT_MC_SEND_CATCHUP_DATA,
- args - mcm->catchup_process_args);
-}
-
-#define EVENT_MC_UNSERIALIZE_BUFFER 0
-#define EVENT_MC_UNSERIALIZE_CATCHUP 1
-
-void
-mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
- u32 catchup_opaque)
-{
- vlib_process_signal_event (mcm->vlib_main,
- mcm->unserialize_process,
- EVENT_MC_UNSERIALIZE_CATCHUP,
- pointer_to_uword (mp));
-}
-
-static void
-perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
-{
- mc_stream_t *s;
- i32 seq_cmp_result;
-
- mc_byte_swap_msg_catchup_reply (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- /* Never heard of this stream or already caught up. */
- if (!s || s->state == MC_STREAM_STATE_ready)
- return;
-
- {
- serialize_main_t m;
- mc_stream_peer_t *p;
- u32 n_stream_bytes;
-
- /* For offline sim replay: save the entire catchup snapshot... */
- if (s->config.save_snapshot)
- s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
- mp->n_data_bytes);
-
- unserialize_open_data (&m, mp->data, mp->n_data_bytes);
- unserialize (&m, unserialize_mc_stream, s);
-
- /* Make sure we start numbering our messages as expected */
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
- s->our_local_sequence = p->last_sequence_received + 1;
- }));
-/* *INDENT-ON* */
-
- n_stream_bytes = m.stream.current_buffer_index;
-
- /* No need to unserialize close; nothing to free. */
-
- /* After serialized stream is user's catchup data. */
- s->config.catchup (mcm, mp->data + n_stream_bytes,
- mp->n_data_bytes - n_stream_bytes);
- }
-
- /* Vector could have been moved by catchup.
- This can only happen for mc-internal stream. */
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- s->last_global_sequence_processed = mp->last_global_sequence_included;
-
- while (clib_fifo_elts (s->catchup_fifo))
- {
- mc_msg_user_request_t *gp;
- u32 bi;
- vlib_buffer_t *b;
-
- clib_fifo_sub1 (s->catchup_fifo, bi);
-
- b = vlib_get_buffer (mcm->vlib_main, bi);
- gp = vlib_buffer_get_current (b);
-
- /* Make sure we're replaying "new" news */
- seq_cmp_result = mc_seq_cmp (gp->global_sequence,
- mp->last_global_sequence_included);
-
- if (seq_cmp_result > 0)
- {
- vlib_buffer_advance (b, sizeof (gp[0]));
- s->config.rx_buffer (mcm, s, gp->peer_id, bi);
- s->last_global_sequence_processed = gp->global_sequence;
-
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup replay local sequence 0x%x",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = gp->local_sequence;
- }
- }
- else
- {
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup discard local sequence 0x%x",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = gp->local_sequence;
- }
-
- vlib_buffer_free_one (mcm->vlib_main, bi);
- }
- }
-
- s->state = MC_STREAM_STATE_ready;
-
- /* Now that we are caught up wake up joining process. */
- {
- vlib_one_time_waiting_process_t *wp;
- vec_foreach (wp, s->procs_waiting_for_join_done)
- vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
- if (s->procs_waiting_for_join_done)
- _vec_len (s->procs_waiting_for_join_done) = 0;
- }
-}
-
-static void
-this_node_maybe_master (mc_main_t * mcm)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_msg_master_assert_t *mp;
- uword event_type;
- int timeouts = 0;
- int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
- clib_error_t *error;
- f64 now, time_last_master_assert = -1;
- u32 bi;
-
- while (1)
- {
- if (!mcm->we_can_be_relay_master)
- {
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become slave (config)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
-
- now = vlib_time_now (vm);
- if (now >= time_last_master_assert + 1)
- {
- time_last_master_assert = now;
- mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
-
- mp->peer_id = mcm->transport.our_ack_peer_id;
- mp->global_sequence = mcm->relay_global_sequence;
-
- /*
- * these messages clog the event log, set MC_EVENT_LOGGING higher
- * if you want them
- */
- if (MC_EVENT_LOGGING > 1)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-massert: peer %s global seq %u",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, global_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- ed->global_sequence = mp->global_sequence;
- }
-
- mc_byte_swap_msg_master_assert (mp);
-
- error =
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_MASTERSHIP, bi);
- if (error)
- clib_error_report (error);
- }
-
- vlib_process_wait_for_event_or_clock (vm, 1.0);
- event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
- switch (event_type)
- {
- case ~0:
- if (!is_master && timeouts++ > 2)
- {
- mcm->relay_state = MC_RELAY_STATE_MASTER;
- mcm->relay_master_peer_id =
- mcm->transport.our_ack_peer_id.as_u64;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become master (was maybe_master)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- break;
-
- case MC_RELAY_STATE_SLAVE:
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
- {
- ELOG_TYPE (e, "become slave (was maybe_master)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- }
-}
-
-static void
-this_node_slave (mc_main_t * mcm)
-{
- vlib_main_t *vm = mcm->vlib_main;
- uword event_type;
- int timeouts = 0;
-
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become slave");
- ELOG (mcm->elog_main, e, 0);
- }
-
- while (1)
- {
- vlib_process_wait_for_event_or_clock (vm, 1.0);
- event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
- switch (event_type)
- {
- case ~0:
- if (timeouts++ > 2)
- {
- mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
- mcm->relay_master_peer_id = ~0ULL;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "timeouts; negoitate mastership");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- break;
-
- case MC_RELAY_STATE_SLAVE:
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- timeouts = 0;
- break;
- }
- }
-}
-
-static uword
-mc_mastership_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
-
- while (1)
- {
- switch (mcm->relay_state)
- {
- case MC_RELAY_STATE_NEGOTIATE:
- case MC_RELAY_STATE_MASTER:
- this_node_maybe_master (mcm);
- break;
-
- case MC_RELAY_STATE_SLAVE:
- this_node_slave (mcm);
- break;
- }
- }
- return 0; /* not likely */
-}
-
-void
-mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
-{
- if (we_can_be_master != mcm->we_can_be_relay_master)
- {
- mcm->we_can_be_relay_master = we_can_be_master;
- vlib_process_signal_event (mcm->vlib_main,
- mcm->mastership_process,
- MC_RELAY_STATE_NEGOTIATE, 0);
- }
-}
-
-void
-mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
- u32 buffer_index)
-{
- mc_peer_id_t his_peer_id, our_peer_id;
- i32 seq_cmp_result;
- u8 signal_slave = 0;
- u8 update_global_sequence = 0;
-
- mc_byte_swap_msg_master_assert (mp);
-
- his_peer_id = mp->peer_id;
- our_peer_id = mcm->transport.our_ack_peer_id;
-
- /* compare the incoming global sequence with ours */
- seq_cmp_result = mc_seq_cmp (mp->global_sequence,
- mcm->relay_global_sequence);
-
- /* If the sender has a lower peer id and the sender's sequence >=
- our global sequence, we become a slave. Otherwise we are master. */
- if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
- && seq_cmp_result >= 0)
- {
- vlib_process_signal_event (mcm->vlib_main,
- mcm->mastership_process,
- MC_RELAY_STATE_SLAVE, 0);
- signal_slave = 1;
- }
-
- /* Update our global sequence. */
- if (seq_cmp_result > 0)
- {
- mcm->relay_global_sequence = mp->global_sequence;
- update_global_sequence = 1;
- }
-
- {
- uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
- mc_mastership_peer_t *p;
-
- if (q)
- p = vec_elt_at_index (mcm->mastership_peers, q[0]);
- else
- {
- vec_add2 (mcm->mastership_peers, p, 1);
- p->peer_id = his_peer_id;
- mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
- p - mcm->mastership_peers,
- /* old_value */ 0);
- }
- p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
- }
-
- /*
- * these messages clog the event log, set MC_EVENT_LOGGING higher
- * if you want them.
- */
- if (MC_EVENT_LOGGING > 1)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "rx-massert: peer %s global seq %u upd %d slave %d",
- .format_args = "T4i4i1i1",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 peer;
- u32 global_sequence;
- u8 update_sequence;
- u8 slave;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
- ed->global_sequence = mp->global_sequence;
- ed->update_sequence = update_global_sequence;
- ed->slave = signal_slave;
- }
-}
-
-static void
-mc_serialize_init (mc_main_t * mcm)
-{
- mc_serialize_msg_t *m;
- vlib_main_t *vm = vlib_get_main ();
-
- mcm->global_msg_index_by_name
- = hash_create_string ( /* elts */ 0, sizeof (uword));
-
- m = vm->mc_msg_registrations;
-
- while (m)
- {
- m->global_index = vec_len (mcm->global_msgs);
- hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
- vec_add1 (mcm->global_msgs, m);
- m = m->next_registration;
- }
-}
-
-clib_error_t *
-mc_serialize_va (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, va_list * va)
-{
- mc_stream_t *s;
- clib_error_t *error;
- serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
- vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
- u32 bi, n_before, n_after, n_total, n_this_msg;
- u32 si, gi;
-
- if (!sbm->vlib_main)
- {
- sbm->tx.max_n_data_bytes_per_chain = 4096;
- sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
- }
-
- if (sbm->first_buffer == 0)
- serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
-
- n_before = serialize_vlib_buffer_n_bytes (m);
-
- s = mc_stream_by_index (mc, stream_index);
- gi = msg->global_index;
- ASSERT (msg == vec_elt (mc->global_msgs, gi));
-
- si = ~0;
- if (gi < vec_len (s->stream_msg_index_by_global_index))
- si = s->stream_msg_index_by_global_index[gi];
-
- serialize_likely_small_unsigned_integer (m, si);
-
- /* For first time message is sent, use name to identify message. */
- if (si == ~0 || MSG_ID_DEBUG)
- serialize_cstring (m, msg->name);
-
- if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "serialize-msg: %s index %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[2];
- } *ed;
- ed = ELOG_DATA (mc->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mc, msg->name);
- ed->c[1] = si;
- }
-
- error = va_serialize (m, va);
-
- n_after = serialize_vlib_buffer_n_bytes (m);
- n_this_msg = n_after - n_before;
- n_total = n_after + sizeof (mc_msg_user_request_t);
-
- /* For max message size ignore first message where string name is sent. */
- if (si != ~0)
- msg->max_n_bytes_serialized =
- clib_max (msg->max_n_bytes_serialized, n_this_msg);
-
- if (!multiple_messages_per_vlib_buffer
- || si == ~0
- || n_total + msg->max_n_bytes_serialized >
- mc->transport.max_packet_size)
- {
- bi = serialize_close_vlib_buffer (m);
- sbm->first_buffer = 0;
- if (!error)
- mc_stream_send (mc, stream_index, bi);
- else if (bi != ~0)
- vlib_buffer_free_one (mc->vlib_main, bi);
- }
-
- return error;
-}
-
-clib_error_t *
-mc_serialize_internal (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, ...)
-{
- vlib_main_t *vm = mc->vlib_main;
- va_list va;
- clib_error_t *error;
-
- if (stream_index == ~0)
- {
- if (vm->mc_main && vm->mc_stream_index == ~0)
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &vm->procs_waiting_for_mc_stream_join);
- stream_index = vm->mc_stream_index;
- }
-
- va_start (va, msg);
- error = mc_serialize_va (mc, stream_index,
- multiple_messages_per_vlib_buffer, msg, &va);
- va_end (va);
- return error;
-}
-
-uword
-mc_unserialize_message (mc_main_t * mcm,
- mc_stream_t * s, serialize_main_t * m)
-{
- mc_serialize_stream_msg_t *sm;
- u32 gi, si;
-
- si = unserialize_likely_small_unsigned_integer (m);
-
- if (!(si == ~0 || MSG_ID_DEBUG))
- {
- sm = vec_elt_at_index (s->stream_msgs, si);
- gi = sm->global_index;
- }
- else
- {
- char *name;
-
- unserialize_cstring (m, &name);
-
- if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "unserialize-msg: %s rx index %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[2];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- }
-
- {
- uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
- }
-
- /* Unknown message? */
- if (gi == ~0)
- {
- vec_free (name);
- goto done;
- }
-
- vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
- si = s->stream_msg_index_by_global_index[gi];
-
- /* Stream local index unknown? Create it. */
- if (si == ~0)
- {
- vec_add2 (s->stream_msgs, sm, 1);
-
- si = sm - s->stream_msgs;
- sm->global_index = gi;
- s->stream_msg_index_by_global_index[gi] = si;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "msg-bind: stream %d %s to index %d",
- .format_args = "i4T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[3];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = s->index;
- ed->c[1] = elog_id_for_msg_name (mcm, name);
- ed->c[2] = si;
- }
- }
- else
- {
- sm = vec_elt_at_index (s->stream_msgs, si);
- if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "msg-id-ERROR: %s index %d expected %d",
- .format_args = "T4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[3];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = ~0;
- if (sm->global_index <
- vec_len (s->stream_msg_index_by_global_index))
- ed->c[2] =
- s->stream_msg_index_by_global_index[sm->global_index];
- }
- }
-
- vec_free (name);
- }
-
- if (gi != ~0)
- {
- mc_serialize_msg_t *msg;
- msg = vec_elt (mcm->global_msgs, gi);
- unserialize (m, msg->unserialize, mcm);
- }
-
-done:
- return gi != ~0;
-}
-
-void
-mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
- vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
- mc_stream_and_buffer_t *sb;
- mc_stream_t *stream;
- u32 buffer_index;
-
- sb =
- pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
- stream_and_buffer_index);
- buffer_index = sb->buffer_index;
- stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
- pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
-
- if (stream->config.save_snapshot)
- {
- u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
- static u8 *contents;
- vec_reset_length (contents);
- vec_validate (contents, n_bytes - 1);
- vlib_buffer_contents (vm, buffer_index, contents);
- stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
- n_bytes);
- }
-
- ASSERT (vlib_in_process_context (vm));
-
- unserialize_open_vlib_buffer (m, vm, sbm);
-
- clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
-
- while (unserialize_vlib_buffer_n_bytes (m) > 0)
- mc_unserialize_message (mcm, stream, m);
-
- /* Frees buffer. */
- unserialize_close_vlib_buffer (m);
-}
-
-void
-mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_and_buffer_t *sb;
- pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
- sb->stream_index = s->index;
- sb->buffer_index = buffer_index;
- vlib_process_signal_event (vm, mcm->unserialize_process,
- EVENT_MC_UNSERIALIZE_BUFFER,
- sb - mcm->mc_unserialize_stream_and_buffers);
-}
-
-static uword
-mc_unserialize_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- uword event_type, *event_data = 0;
- int i;
-
- while (1)
- {
- if (event_data)
- _vec_len (event_data) = 0;
-
- vlib_process_wait_for_event (vm);
- event_type = vlib_process_get_events (vm, &event_data);
- switch (event_type)
- {
- case EVENT_MC_UNSERIALIZE_BUFFER:
- for (i = 0; i < vec_len (event_data); i++)
- mc_unserialize_internal (mcm, event_data[i]);
- break;
-
- case EVENT_MC_UNSERIALIZE_CATCHUP:
- for (i = 0; i < vec_len (event_data); i++)
- {
- u8 *mp = uword_to_pointer (event_data[i], u8 *);
- perform_catchup (mcm, (void *) mp);
- vec_free (mp);
- }
- break;
-
- default:
- break;
- }
- }
-
- return 0; /* not likely */
-}
-
-void
-serialize_mc_main (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- mc_stream_t *s;
- mc_serialize_stream_msg_t *sm;
- mc_serialize_msg_t *msg;
-
- serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
- vec_foreach (s, mcm->stream_vector)
- {
- /* Stream name. */
- serialize_cstring (m, s->config.name);
-
- /* Serialize global names for all sent messages. */
- serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
- vec_foreach (sm, s->stream_msgs)
- {
- msg = vec_elt (mcm->global_msgs, sm->global_index);
- serialize_cstring (m, msg->name);
- }
- }
-}
-
-void
-unserialize_mc_main (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- u32 i, n_streams, n_stream_msgs;
- char *name;
- mc_stream_t *s;
- mc_serialize_stream_msg_t *sm;
-
- unserialize_integer (m, &n_streams, sizeof (u32));
- for (i = 0; i < n_streams; i++)
- {
- unserialize_cstring (m, &name);
- if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
- {
- vec_validate (mcm->stream_vector, i);
- s = vec_elt_at_index (mcm->stream_vector, i);
- mc_stream_init (s);
- s->index = s - mcm->stream_vector;
- s->config.name = name;
- s->state = MC_STREAM_STATE_name_known;
- hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
- }
- else
- vec_free (name);
-
- s = vec_elt_at_index (mcm->stream_vector, i);
-
- vec_free (s->stream_msgs);
- vec_free (s->stream_msg_index_by_global_index);
-
- unserialize_integer (m, &n_stream_msgs, sizeof (u32));
- vec_resize (s->stream_msgs, n_stream_msgs);
- vec_foreach (sm, s->stream_msgs)
- {
- uword *p;
- u32 si, gi;
-
- unserialize_cstring (m, &name);
- p = hash_get (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
- si = sm - s->stream_msgs;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "catchup-bind: %s to %d global index %d stream %d",
- .format_args = "T4i4i4i4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 c[4];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = gi;
- ed->c[3] = s->index;
- }
-
- vec_free (name);
-
- sm->global_index = gi;
- if (gi != ~0)
- {
- vec_validate_init_empty (s->stream_msg_index_by_global_index,
- gi, ~0);
- s->stream_msg_index_by_global_index[gi] = si;
- }
- }
- }
-}
-
-void
-mc_main_init (mc_main_t * mcm, char *tag)
-{
- vlib_main_t *vm = vlib_get_main ();
-
- mcm->vlib_main = vm;
- mcm->elog_main = &vm->elog_main;
-
- mcm->relay_master_peer_id = ~0ULL;
- mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
-
- mcm->stream_index_by_name
- = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
-
- {
- vlib_node_registration_t r;
-
- memset (&r, 0, sizeof (r));
-
- r.type = VLIB_NODE_TYPE_PROCESS;
-
- /* Point runtime data to main instance. */
- r.runtime_data = &mcm;
- r.runtime_data_bytes = sizeof (&mcm);
-
- r.name = (char *) format (0, "mc-mastership-%s", tag);
- r.function = mc_mastership_process;
- mcm->mastership_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-join-ager-%s", tag);
- r.function = mc_join_ager_process;
- mcm->join_ager_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-retry-%s", tag);
- r.function = mc_retry_process;
- mcm->retry_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-catchup-%s", tag);
- r.function = mc_catchup_process;
- mcm->catchup_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-unserialize-%s", tag);
- r.function = mc_unserialize_process;
- mcm->unserialize_process = vlib_register_node (vm, &r);
- }
-
- if (MC_EVENT_LOGGING > 0)
- mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
- sizeof (mc_peer_id_t));
-
- mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
- sizeof (mc_peer_id_t));
- mc_serialize_init (mcm);
-}
-
-static u8 *
-format_mc_relay_state (u8 * s, va_list * args)
-{
- mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
- char *t = 0;
- switch (state)
- {
- case MC_RELAY_STATE_NEGOTIATE:
- t = "negotiate";
- break;
- case MC_RELAY_STATE_MASTER:
- t = "master";
- break;
- case MC_RELAY_STATE_SLAVE:
- t = "slave";
- break;
- default:
- return format (s, "unknown 0x%x", state);
- }
-
- return format (s, "%s", t);
-}
-
-static u8 *
-format_mc_stream_state (u8 * s, va_list * args)
-{
- mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
- char *t = 0;
- switch (state)
- {
-#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
- foreach_mc_stream_state
-#undef _
- default:
- return format (s, "unknown 0x%x", state);
- }
-
- return format (s, "%s", t);
-}
-
-static int
-mc_peer_comp (void *a1, void *a2)
-{
- mc_stream_peer_t *p1 = a1;
- mc_stream_peer_t *p2 = a2;
-
- return mc_peer_id_compare (p1->id, p2->id);
-}
-
-u8 *
-format_mc_main (u8 * s, va_list * args)
-{
- mc_main_t *mcm = va_arg (*args, mc_main_t *);
- mc_stream_t *t;
- mc_stream_peer_t *p, *ps;
- u32 indent = format_get_indent (s);
-
- s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
- format_mc_relay_state, mcm->relay_state,
- vec_len (mcm->stream_vector), mcm->relay_global_sequence);
-
- {
- mc_mastership_peer_t *mp;
- f64 now = vlib_time_now (mcm->vlib_main);
- s = format (s, "\n%UMost recent mastership peers:",
- format_white_space, indent + 2);
- vec_foreach (mp, mcm->mastership_peers)
- {
- s = format (s, "\n%U%-30U%.4e",
- format_white_space, indent + 4,
- mcm->transport.format_peer_id, mp->peer_id,
- now - mp->time_last_master_assert_received);
- }
- }
-
- vec_foreach (t, mcm->stream_vector)
- {
- s = format (s, "\n%Ustream `%s' index %d",
- format_white_space, indent + 2, t->config.name, t->index);
-
- s = format (s, "\n%Ustate %U",
- format_white_space, indent + 4,
- format_mc_stream_state, t->state);
-
- s =
- format (s,
- "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
- format_white_space, indent + 4, t->config.retry_interval,
- t->config.retry_limit, pool_elts (t->retry_pool),
- t->stats.n_retries - t->stats_last_clear.n_retries);
-
- s = format (s, "\n%U%Ld/%Ld user requests sent/received",
- format_white_space, indent + 4,
- t->user_requests_sent, t->user_requests_received);
-
- s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
- format_white_space, indent + 4,
- pool_elts (t->peers),
- t->our_local_sequence, t->last_global_sequence_processed);
-
- ps = 0;
- /* *INDENT-OFF* */
- pool_foreach (p, t->peers,
- ({
- if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
- vec_add1 (ps, p[0]);
- }));
- /* *INDENT-ON* */
- vec_sort_with_function (ps, mc_peer_comp);
- s = format (s, "\n%U%=30s%10s%16s%16s",
- format_white_space, indent + 6,
- "Peer", "Last seq", "Retries", "Future");
-
- vec_foreach (p, ps)
- {
- s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
- format_white_space, indent + 6,
- mcm->transport.format_peer_id, p->id.as_u64,
- p->last_sequence_received,
- p->stats.n_msgs_from_past -
- p->stats_last_clear.n_msgs_from_past,
- p->stats.n_msgs_from_future -
- p->stats_last_clear.n_msgs_from_future,
- (mcm->transport.our_ack_peer_id.as_u64 ==
- p->id.as_u64 ? " (self)" : ""));
- }
- vec_free (ps);
- }
-
- return s;
-}
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */