/* * mc.c: vlib reliable sequenced multicast distributed applications * * Copyright (c) 2010 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include /* * 1 to enable msg id training wheels, which are useful for tracking * down catchup and/or partitioned network problems */ #define MSG_ID_DEBUG 0 static format_function_t format_mc_stream_state; static u32 elog_id_for_peer_id (mc_main_t * m, u64 peer_id) { uword *p, r; mhash_t *h = &m->elog_id_by_peer_id; if (!m->elog_id_by_peer_id.hash) mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t)); p = mhash_get (h, &peer_id); if (p) return p[0]; r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id); mhash_set (h, &peer_id, r, /* old_value */ 0); return r; } static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name) { uword *p, r; uword *h = m->elog_id_by_msg_name; u8 *name_copy; if (!h) h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword)); p = hash_get_mem (h, msg_name); if (p) return p[0]; r = elog_string (m->elog_main, "%s", msg_name); name_copy = format (0, "%s%c", msg_name, 0); hash_set_mem (h, name_copy, r); m->elog_id_by_msg_name = h; return r; } static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 retry_count) { if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "tx-msg: stream %d local seq %d attempt %d", .format_args = "i4i4i4", }; /* *INDENT-ON* */ struct { u32 stream_id, local_sequence, retry_count; } *ed; ed = ELOG_DATA (m->elog_main, e); ed->stream_id = stream_id; ed->local_sequence = local_sequence; ed->retry_count = retry_count; } } /* * seq_cmp * correctly compare two unsigned sequence numbers. * This function works so long as x and y are within 2**(n-1) of each * other, where n = bits(x, y). * * Magic decoder ring: * seq_cmp == 0 => x and y are equal * seq_cmp < 0 => x is "in the past" with respect to y * seq_cmp > 0 => x is "in the future" with respect to y */ always_inline i32 mc_seq_cmp (u32 x, u32 y) { return (i32) x - (i32) y; } void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return) { u32 n_alloc, bi = 0; vlib_buffer_t *b; n_alloc = vlib_buffer_alloc (vm, &bi, 1); ASSERT (n_alloc == 1); b = vlib_get_buffer (vm, bi); b->current_length = n_bytes; *bi_return = bi; return (void *) b->data; } static void delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s, uword index, int notify_application) { mc_stream_peer_t *p = pool_elt_at_index (s->peers, index); ASSERT (p != 0); if (s->config.peer_died && notify_application) s->config.peer_died (mcm, s, p->id); s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers); if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "delete peer %s from all_peer_bitmap", .format_args = "T4", }; /* *INDENT-ON* */ struct { u32 peer; } *ed = 0; ed = ELOG_DATA (mcm->elog_main, e); ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); } /* Do not delete the pool / hash table entries, or we lose sequence number state */ } static mc_stream_peer_t * get_or_create_peer_with_id (mc_main_t * mcm, mc_stream_t * s, mc_peer_id_t id, int *created) { uword *q = mhash_get (&s->peer_index_by_id, &id); mc_stream_peer_t *p; if (q) { p = pool_elt_at_index (s->peers, q[0]); goto done; } pool_get (s->peers, p); memset (p, 0, sizeof (p[0])); p->id = id; p->last_sequence_received = ~0; mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0); if (created) *created = 1; done: if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "get_or_create %s peer %s stream %d seq %d", .format_args = "t4T4i4i4", .n_enum_strings = 2, .enum_strings = { "old", "new", }, }; /* *INDENT-ON* */ struct { u32 is_new, peer, stream_index, rx_sequence; } *ed = 0; ed = ELOG_DATA (mcm->elog_main, e); ed->is_new = q ? 0 : 1; ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); ed->stream_index = s->index; ed->rx_sequence = p->last_sequence_received; } /* $$$$ Enable or reenable this peer */ s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers); return p; } static void maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream) { vlib_one_time_waiting_process_t *p; if (pool_elts (stream->retry_pool) >= stream->config.window_size) return; vec_foreach (p, stream->procs_waiting_for_open_window) vlib_signal_one_time_waiting_process (vm, p); if (stream->procs_waiting_for_open_window) _vec_len (stream->procs_waiting_for_open_window) = 0; } static void mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r) { mc_retry_t record, *retp; if (r->unacked_by_peer_bitmap) _vec_len (r->unacked_by_peer_bitmap) = 0; if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) { clib_fifo_sub1 (s->retired_fifo, record); vlib_buffer_free_one (mcm->vlib_main, record.buffer_index); } clib_fifo_add2 (s->retired_fifo, retp); retp->buffer_index = r->buffer_index; retp->local_sequence = r->local_sequence; r->buffer_index = ~0; /* poison buffer index in this retry */ } static void mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence) { mc_retry_t *retry; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "resend-retired: search for local seq %d", .format_args = "i4", }; /* *INDENT-ON* */ struct { u32 local_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->local_sequence = local_sequence; } /* *INDENT-OFF* */ clib_fifo_foreach (retry, s->retired_fifo, ({ if (retry->local_sequence == local_sequence) { elog_tx_msg (mcm, s->index, retry-> local_sequence, -13); mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, retry->buffer_index); return; } })); /* *INDENT-ON* */ if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "resend-retired: FAILED search for local seq %d", .format_args = "i4", }; /* *INDENT-ON* */ struct { u32 local_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->local_sequence = local_sequence; } } static uword * delete_retry_fifo_elt (mc_main_t * mcm, mc_stream_t * stream, mc_retry_t * r, uword * dead_peer_bitmap) { mc_stream_peer_t *p; /* *INDENT-OFF* */ pool_foreach (p, stream->peers, ({ uword pi = p - stream->peers; uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi); if (! is_alive) dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi); if (MC_EVENT_LOGGING > 0) { ELOG_TYPE_DECLARE (e) = { .format = "delete_retry_fifo_elt: peer %s is %s", .format_args = "T4t4", .n_enum_strings = 2, .enum_strings = { "alive", "dead", }, }; struct { u32 peer, is_alive; } * ed; ed = ELOG_DATA (mcm->elog_main, e); ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); ed->is_alive = is_alive; } })); /* *INDENT-ON* */ hash_unset (stream->retry_index_by_local_sequence, r->local_sequence); mc_retry_free (mcm, stream, r); return dead_peer_bitmap; } always_inline mc_retry_t * prev_retry (mc_stream_t * s, mc_retry_t * r) { return (r->prev_index != ~0 ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0); } always_inline mc_retry_t * next_retry (mc_stream_t * s, mc_retry_t * r) { return (r->next_index != ~0 ? pool_elt_at_index (s->retry_pool, r->next_index) : 0); } always_inline void remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r) { mc_retry_t *p = prev_retry (s, r); mc_retry_t *n = next_retry (s, r); if (p) p->next_index = r->next_index; else s->retry_head_index = r->next_index; if (n) n->prev_index = r->prev_index; else s->retry_tail_index = r->prev_index; pool_put_index (s->retry_pool, r - s->retry_pool); } static void check_retry (mc_main_t * mcm, mc_stream_t * s) { mc_retry_t *r; vlib_main_t *vm = mcm->vlib_main; f64 now = vlib_time_now (vm); uword *dead_peer_bitmap = 0; u32 ri, ri_next; for (ri = s->retry_head_index; ri != ~0; ri = ri_next) { r = pool_elt_at_index (s->retry_pool, ri); ri_next = r->next_index; if (now < r->sent_at + s->config.retry_interval) continue; r->n_retries += 1; if (r->n_retries > s->config.retry_limit) { dead_peer_bitmap = delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap); remove_retry_from_pool (s, r); } else { if (MC_EVENT_LOGGING > 0) { mc_stream_peer_t *p; /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "resend local seq %d attempt %d", .format_args = "i4i4", }; /* *INDENT-ON* */ /* *INDENT-OFF* */ pool_foreach (p, s->peers, ({ if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers)) { ELOG_TYPE_DECLARE (ev) = { .format = "resend: needed by peer %s local seq %d", .format_args = "T4i4", }; struct { u32 peer, rx_sequence; } * ed; ed = ELOG_DATA (mcm->elog_main, ev); ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); ed->rx_sequence = r->local_sequence; } })); /* *INDENT-ON* */ struct { u32 sequence; u32 trail; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->sequence = r->local_sequence; ed->trail = r->n_retries; } r->sent_at = vlib_time_now (vm); s->stats.n_retries += 1; elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries); mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index); } } maybe_send_window_open_event (mcm->vlib_main, s); /* Delete any dead peers we've found. */ if (!clib_bitmap_is_zero (dead_peer_bitmap)) { uword i; /* *INDENT-OFF* */ clib_bitmap_foreach (i, dead_peer_bitmap, ({ delete_peer_with_index (mcm, s, i, /* notify_application */ 1); /* Delete any references to just deleted peer in retry pool. */ pool_foreach (r, s->retry_pool, ({ r->unacked_by_peer_bitmap = clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i); })); })); /* *INDENT-ON* */ clib_bitmap_free (dead_peer_bitmap); } } always_inline mc_main_t * mc_node_get_main (vlib_node_runtime_t * node) { mc_main_t **p = (void *) node->runtime_data; return p[0]; } static uword mc_retry_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { mc_main_t *mcm = mc_node_get_main (node); mc_stream_t *s; while (1) { vlib_process_suspend (vm, 1.0); vec_foreach (s, mcm->stream_vector) { if (s->state != MC_STREAM_STATE_invalid) check_retry (mcm, s); } } return 0; /* not likely */ } static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join) { vlib_main_t *vm = mcm->vlib_main; mc_msg_join_or_leave_request_t *mp; u32 bi; mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); memset (mp, 0, sizeof (*mp)); mp->type = MC_MSG_TYPE_join_or_leave_request; mp->peer_id = mcm->transport.our_ack_peer_id; mp->stream_index = stream_index; mp->is_join = is_join; mc_byte_swap_msg_join_or_leave_request (mp); /* * These msgs are unnumbered, unordered so send on the from-relay * channel. */ mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); } static uword mc_join_ager_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { mc_main_t *mcm = mc_node_get_main (node); while (1) { if (mcm->joins_in_progress) { mc_stream_t *s; vlib_one_time_waiting_process_t *p; f64 now = vlib_time_now (vm); vec_foreach (s, mcm->stream_vector) { if (s->state != MC_STREAM_STATE_join_in_progress) continue; if (now > s->join_timeout) { s->state = MC_STREAM_STATE_ready; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "stream %d join timeout", }; /* *INDENT-ON* */ ELOG (mcm->elog_main, e, s->index); } /* Make sure that this app instance exists as a stream peer, or we may answer a catchup request with a NULL all_peer_bitmap... */ (void) get_or_create_peer_with_id (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0); vec_foreach (p, s->procs_waiting_for_join_done) vlib_signal_one_time_waiting_process (vm, p); if (s->procs_waiting_for_join_done) _vec_len (s->procs_waiting_for_join_done) = 0; mcm->joins_in_progress--; ASSERT (mcm->joins_in_progress >= 0); } else { /* Resent join request which may have been lost. */ send_join_or_leave_request (mcm, s->index, 1 /* is_join */ ); /* We're *not* alone, retry for as long as it takes */ if (mcm->relay_state == MC_RELAY_STATE_SLAVE) s->join_timeout = vlib_time_now (vm) + 2.0; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "stream %d resend join request", }; /* *INDENT-ON* */ ELOG (mcm->elog_main, e, s->index); } } } } vlib_process_suspend (vm, .5); } return 0; /* not likely */ } static void serialize_mc_register_stream_name (serialize_main_t * m, va_list * va) { char *name = va_arg (*va, char *); serialize_cstring (m, name); } static void elog_stream_name (char *buf, int n_buf_bytes, char *v) { clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v))); buf[n_buf_bytes - 1] = 0; } static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va) { mc_main_t *mcm = va_arg (*va, mc_main_t *); char *name; mc_stream_t *s; uword *p; unserialize_cstring (m, &name); if ((p = hash_get_mem (mcm->stream_index_by_name, name))) { if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "stream index %d already named %s", .format_args = "i4s16", }; /* *INDENT-ON* */ struct { u32 stream_index; char name[16]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->stream_index = p[0]; elog_stream_name (ed->name, sizeof (ed->name), name); } vec_free (name); return; } vec_add2 (mcm->stream_vector, s, 1); mc_stream_init (s); s->state = MC_STREAM_STATE_name_known; s->index = s - mcm->stream_vector; s->config.name = name; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "stream index %d named %s", .format_args = "i4s16", }; /* *INDENT-ON* */ struct { u32 stream_index; char name[16]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->stream_index = s->index; elog_stream_name (ed->name, sizeof (ed->name), name); } hash_set_mem (mcm->stream_index_by_name, name, s->index); p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name); if (p) { vlib_one_time_waiting_process_t *wp, **w; w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); vec_foreach (wp, w[0]) vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); pool_put (mcm->procs_waiting_for_stream_name_pool, w); hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name); } } /* *INDENT-OFF* */ MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = { .name = "mc_register_stream_name", .serialize = serialize_mc_register_stream_name, .unserialize = unserialize_mc_register_stream_name, }; /* *INDENT-ON* */ void mc_rx_buffer_unserialize (mc_main_t * mcm, mc_stream_t * stream, mc_peer_id_t peer_id, u32 buffer_index) { return mc_unserialize (mcm, stream, buffer_index); } static u8 * mc_internal_catchup_snapshot (mc_main_t * mcm, u8 * data_vector, u32 last_global_sequence_processed) { serialize_main_t m; /* Append serialized data to data vector. */ serialize_open_vector (&m, data_vector); m.stream.current_buffer_index = vec_len (data_vector); serialize (&m, serialize_mc_main, mcm); return serialize_close_vector (&m); } static void mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes) { serialize_main_t s; unserialize_open_data (&s, data, n_data_bytes); unserialize (&s, unserialize_mc_main, mcm); } /* Overridden from the application layer, not actually used here */ void mc_stream_join_process_hold (void) __attribute__ ((weak)); void mc_stream_join_process_hold (void) { } static u32 mc_stream_join_helper (mc_main_t * mcm, mc_stream_config_t * config, u32 is_internal) { mc_stream_t *s; vlib_main_t *vm = mcm->vlib_main; s = 0; if (!is_internal) { uword *p; /* Already have a stream with given name? */ if ((s = mc_stream_by_name (mcm, config->name))) { /* Already joined and ready? */ if (s->state == MC_STREAM_STATE_ready) return s->index; } /* First join MC internal stream. */ if (!mcm->stream_vector || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state == MC_STREAM_STATE_invalid)) { static mc_stream_config_t c = { .name = "mc-internal", .rx_buffer = mc_rx_buffer_unserialize, .catchup = mc_internal_catchup, .catchup_snapshot = mc_internal_catchup_snapshot, }; c.save_snapshot = config->save_snapshot; mc_stream_join_helper (mcm, &c, /* is_internal */ 1); } /* If stream is still unknown register this name and wait for sequenced message to name stream. This way all peers agree on stream name to index mappings. */ s = mc_stream_by_name (mcm, config->name); if (!s) { vlib_one_time_waiting_process_t *wp, **w; u8 *name_copy = format (0, "%s", config->name); mc_serialize_stream (mcm, MC_STREAM_INDEX_INTERNAL, &mc_register_stream_name_msg, config->name); /* Wait for this stream to be named. */ p = hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, name_copy); if (p) w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); else { pool_get (mcm->procs_waiting_for_stream_name_pool, w); if (!mcm->procs_waiting_for_stream_name_by_name) mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); hash_set_mem (mcm->procs_waiting_for_stream_name_by_name, name_copy, w - mcm->procs_waiting_for_stream_name_pool); w[0] = 0; } vec_add2 (w[0], wp, 1); vlib_current_process_wait_for_one_time_event (vm, wp); vec_free (name_copy); } /* Name should be known now. */ s = mc_stream_by_name (mcm, config->name); ASSERT (s != 0); ASSERT (s->state == MC_STREAM_STATE_name_known); } if (!s) { vec_add2 (mcm->stream_vector, s, 1); mc_stream_init (s); s->index = s - mcm->stream_vector; } { /* Save name since we could have already used it as hash key. */ char *name_save = s->config.name; s->config = config[0]; if (name_save) s->config.name = name_save; } if (s->config.window_size == 0) s->config.window_size = 8; if (s->config.retry_interval == 0.0) s->config.retry_interval = 1.0; /* Sanity. */ ASSERT (s->config.retry_interval < 30); if (s->config.retry_limit == 0) s->config.retry_limit = 7; s->state = MC_STREAM_STATE_join_in_progress; if (!s->peer_index_by_id.hash) mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); /* If we don't hear from someone in 5 seconds, we're alone */ s->join_timeout = vlib_time_now (vm) + 5.0; mcm->joins_in_progress++; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "stream index %d join request %s", .format_args = "i4s16", }; /* *INDENT-ON* */ struct { u32 stream_index; char name[16]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->stream_index = s->index; elog_stream_name (ed->name, sizeof (ed->name), s->config.name); } send_join_or_leave_request (mcm, s->index, 1 /* join */ ); vlib_current_process_wait_for_one_time_event_vector (vm, &s->procs_waiting_for_join_done); if (MC_EVENT_LOGGING) { ELOG_TYPE (e, "join complete stream %d"); ELOG (mcm->elog_main, e, s->index); } return s->index; } u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config) { return mc_stream_join_helper (mcm, config, /* is_internal */ 0); } void mc_stream_leave (mc_main_t * mcm, u32 stream_index) { mc_stream_t *s = mc_stream_by_index (mcm, stream_index); if (!s) return; if (MC_EVENT_LOGGING) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "leave-stream: %d",.format_args = "i4", }; /* *INDENT-ON* */ struct { u32 index; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->index = stream_index; } send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ ); mc_stream_free (s); s->state = MC_STREAM_STATE_name_known; } void mc_msg_join_or_leave_request_handler (mc_main_t * mcm, mc_msg_join_or_leave_request_t * req, u32 buffer_index) { mc_stream_t *s; mc_msg_join_reply_t *rep; u32 bi; mc_byte_swap_msg_join_or_leave_request (req); s = mc_stream_by_index (mcm, req->stream_index); if (!s || s->state != MC_STREAM_STATE_ready) return; /* If the peer is joining, create it */ if (req->is_join) { mc_stream_t *this_s; /* We're not in a position to catch up a peer until all stream joins are complete. */ if (0) { /* XXX This is hard to test so we've. */ vec_foreach (this_s, mcm->stream_vector) { if (this_s->state != MC_STREAM_STATE_ready && this_s->state != MC_STREAM_STATE_name_known) return; } } else if (mcm->joins_in_progress > 0) return; (void) get_or_create_peer_with_id (mcm, s, req->peer_id, /* created */ 0); rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi); memset (rep, 0, sizeof (rep[0])); rep->type = MC_MSG_TYPE_join_reply; rep->stream_index = req->stream_index; mc_byte_swap_msg_join_reply (rep); /* These two are already in network byte order... */ rep->peer_id = mcm->transport.our_ack_peer_id; rep->catchup_peer_id = mcm->transport.our_catchup_peer_id; mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); } else { if (s->config.peer_died) s->config.peer_died (mcm, s, req->peer_id); } } void mc_msg_join_reply_handler (mc_main_t * mcm, mc_msg_join_reply_t * mp, u32 buffer_index) { mc_stream_t *s; mc_byte_swap_msg_join_reply (mp); s = mc_stream_by_index (mcm, mp->stream_index); if (!s || s->state != MC_STREAM_STATE_join_in_progress) return; /* Switch to catchup state; next join reply for this stream will be ignored. */ s->state = MC_STREAM_STATE_catchup; mcm->joins_in_progress--; mcm->transport.catchup_request_fun (mcm->transport.opaque, mp->stream_index, mp->catchup_peer_id); } void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name) { mc_stream_t *s; while (1) { s = mc_stream_by_name (m, stream_name); if (s) break; vlib_process_suspend (m->vlib_main, .1); } /* It's OK to send a message in catchup and ready states. */ if (s->state == MC_STREAM_STATE_catchup || s->state == MC_STREAM_STATE_ready) return; /* Otherwise we are waiting for a join to finish. */ vlib_current_process_wait_for_one_time_event_vector (m->vlib_main, &s->procs_waiting_for_join_done); } u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index) { mc_stream_t *s = mc_stream_by_index (mcm, stream_index); vlib_main_t *vm = mcm->vlib_main; mc_retry_t *r; mc_msg_user_request_t *mp; vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); u32 ri; if (!s) return 0; if (s->state != MC_STREAM_STATE_ready) vlib_current_process_wait_for_one_time_event_vector (vm, &s->procs_waiting_for_join_done); while (pool_elts (s->retry_pool) >= s->config.window_size) { vlib_current_process_wait_for_one_time_event_vector (vm, &s->procs_waiting_for_open_window); } pool_get (s->retry_pool, r); ri = r - s->retry_pool; r->prev_index = s->retry_tail_index; r->next_index = ~0; s->retry_tail_index = ri; if (r->prev_index == ~0) s->retry_head_index = ri; else { mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index); p->next_index = ri; } vlib_buffer_advance (b, -sizeof (mp[0])); mp = vlib_buffer_get_current (b); mp->peer_id = mcm->transport.our_ack_peer_id; /* mp->transport.global_sequence set by relay agent. */ mp->global_sequence = 0xdeadbeef; mp->stream_index = s->index; mp->local_sequence = s->our_local_sequence++; mp->n_data_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]); r->buffer_index = buffer_index; r->local_sequence = mp->local_sequence; r->sent_at = vlib_time_now (vm); r->n_retries = 0; /* Retry will be freed when all currently known peers have acked. */ vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1); vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap); hash_set (s->retry_index_by_local_sequence, r->local_sequence, r - s->retry_pool); elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries); mc_byte_swap_msg_user_request (mp); mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index); s->user_requests_sent++; /* return amount of window remaining */ return s->config.window_size - pool_elts (s->retry_pool); } void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u32 buffer_index) { vlib_main_t *vm = mcm->vlib_main; mc_stream_t *s; mc_stream_peer_t *peer; i32 seq_cmp_result; static int once = 0; mc_byte_swap_msg_user_request (mp); s = mc_stream_by_index (mcm, mp->stream_index); /* Not signed up for this stream? Turf-o-matic */ if (!s || s->state != MC_STREAM_STATE_ready) { vlib_buffer_free_one (vm, buffer_index); return; } /* Find peer, including ourselves. */ peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, /* created */ 0); seq_cmp_result = mc_seq_cmp (mp->local_sequence, peer->last_sequence_received + 1); if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d", .format_args = "T4i4i4i4", }; /* *INDENT-ON* */ struct { u32 peer, stream_index, rx_sequence; i32 seq_cmp_result; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); ed->stream_index = mp->stream_index; ed->rx_sequence = mp->local_sequence; ed->seq_cmp_result = seq_cmp_result; } if (0 && mp->stream_index == 1 && once == 0) { once = 1; ELOG_TYPE (e, "FAKE lost msg on stream 1"); ELOG (mcm->elog_main, e, 0); return; } peer->last_sequence_received += seq_cmp_result == 0; s->user_requests_received++; if (seq_cmp_result > 0) peer->stats.n_msgs_from_future += 1; /* Send ack even if msg from future */ if (1) { mc_msg_user_ack_t *rp; u32 bi; rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi); rp->peer_id = mcm->transport.our_ack_peer_id; rp->stream_index = s->index; rp->local_sequence = mp->local_sequence; rp->seq_cmp_result = seq_cmp_result; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "tx-ack: stream %d local seq %d", .format_args = "i4i4", }; /* *INDENT-ON* */ struct { u32 stream_index; u32 local_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->stream_index = rp->stream_index; ed->local_sequence = rp->local_sequence; } mc_byte_swap_msg_user_ack (rp); mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi); /* Msg from past? If so, free the buffer... */ if (seq_cmp_result < 0) { vlib_buffer_free_one (vm, buffer_index); peer->stats.n_msgs_from_past += 1; } } if (seq_cmp_result == 0) { vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); switch (s->state) { case MC_STREAM_STATE_ready: vlib_buffer_advance (b, sizeof (mp[0])); s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index); /* Stream vector can change address via rx callback for mc-internal stream. */ s = mc_stream_by_index (mcm, mp->stream_index); ASSERT (s != 0); s->last_global_sequence_processed = mp->global_sequence; break; case MC_STREAM_STATE_catchup: clib_fifo_add1 (s->catchup_fifo, buffer_index); break; default: clib_warning ("stream in unknown state %U", format_mc_stream_state, s->state); break; } } } void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffer_index) { vlib_main_t *vm = mcm->vlib_main; uword *p; mc_stream_t *s; mc_stream_peer_t *peer; mc_retry_t *r; int peer_created = 0; mc_byte_swap_msg_user_ack (mp); s = mc_stream_by_index (mcm, mp->stream_index); if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "rx-ack: local seq %d peer %s seq_cmp_result %d", .format_args = "i4T4i4", }; /* *INDENT-ON* */ struct { u32 local_sequence; u32 peer; i32 seq_cmp_result; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->local_sequence = mp->local_sequence; ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); ed->seq_cmp_result = mp->seq_cmp_result; } /* Unknown stream? */ if (!s) return; /* Find the peer which just ack'ed. */ peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, /* created */ &peer_created); /* * Peer reports message from the future. If it's not in the retry * fifo, look for a retired message. */ if (mp->seq_cmp_result > 0) { p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - mp->seq_cmp_result); if (p == 0) mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result); /* Normal retry should fix it... */ return; } /* * Pointer to the indicated retry fifo entry. * Worth hashing because we could use a window size of 100 or 1000. */ p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence); /* * Is this a duplicate ACK, received after we've retired the * fifo entry. This can happen when learning about new * peers. */ if (p == 0) { if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "ack: for seq %d from peer %s no fifo elt", .format_args = "i4T4", }; /* *INDENT-ON* */ struct { u32 seq; u32 peer; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->seq = mp->local_sequence; ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); } return; } r = pool_elt_at_index (s->retry_pool, p[0]); /* Make sure that this new peer ACKs our msgs from now on */ if (peer_created) { mc_retry_t *later_retry = next_retry (s, r); while (later_retry) { later_retry->unacked_by_peer_bitmap = clib_bitmap_ori (later_retry->unacked_by_peer_bitmap, peer - s->peers); later_retry = next_retry (s, later_retry); } } ASSERT (mp->local_sequence == r->local_sequence); /* If we weren't expecting to hear from this peer */ if (!peer_created && !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers)) { if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "dup-ack: for seq %d from peer %s", .format_args = "i4T4", }; /* *INDENT-ON* */ struct { u32 seq; u32 peer; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->seq = r->local_sequence; ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); } if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) return; } if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "ack: for seq %d from peer %s", .format_args = "i4T4", }; /* *INDENT-ON* */ struct { u32 seq; u32 peer; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->seq = mp->local_sequence; ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); } r->unacked_by_peer_bitmap = clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers); /* Not all clients have ack'ed */ if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) { return; } if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "ack: retire fifo elt loc seq %d after %d acks", .format_args = "i4i4", }; /* *INDENT-ON* */ struct { u32 seq; u32 npeers; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->seq = r->local_sequence; ed->npeers = pool_elts (s->peers); } hash_unset (s->retry_index_by_local_sequence, mp->local_sequence); mc_retry_free (mcm, s, r); remove_retry_from_pool (s, r); maybe_send_window_open_event (vm, s); } #define EVENT_MC_SEND_CATCHUP_DATA 0 static uword mc_catchup_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { mc_main_t *mcm = mc_node_get_main (node); uword *event_data = 0; mc_catchup_process_arg_t *args; int i; while (1) { if (event_data) _vec_len (event_data) = 0; vlib_process_wait_for_event_with_type (vm, &event_data, EVENT_MC_SEND_CATCHUP_DATA); for (i = 0; i < vec_len (event_data); i++) { args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]); mcm->transport.catchup_send_fun (mcm->transport.opaque, args->catchup_opaque, args->catchup_snapshot); /* Send function will free snapshot data vector. */ pool_put (mcm->catchup_process_args, args); } } return 0; /* not likely */ } static void serialize_mc_stream (serialize_main_t * m, va_list * va) { mc_stream_t *s = va_arg (*va, mc_stream_t *); mc_stream_peer_t *p; serialize_integer (m, pool_elts (s->peers), sizeof (u32)); /* *INDENT-OFF* */ pool_foreach (p, s->peers, ({ u8 * x = serialize_get (m, sizeof (p->id)); clib_memcpy (x, p->id.as_u8, sizeof (p->id)); serialize_integer (m, p->last_sequence_received, sizeof (p->last_sequence_received)); })); /* *INDENT-ON* */ serialize_bitmap (m, s->all_peer_bitmap); } void unserialize_mc_stream (serialize_main_t * m, va_list * va) { mc_stream_t *s = va_arg (*va, mc_stream_t *); u32 i, n_peers; mc_stream_peer_t *p; unserialize_integer (m, &n_peers, sizeof (u32)); mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); for (i = 0; i < n_peers; i++) { u8 *x; pool_get (s->peers, p); x = unserialize_get (m, sizeof (p->id)); clib_memcpy (p->id.as_u8, x, sizeof (p->id)); unserialize_integer (m, &p->last_sequence_received, sizeof (p->last_sequence_received)); mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ 0); } s->all_peer_bitmap = unserialize_bitmap (m); /* This is really bad. */ if (!s->all_peer_bitmap) clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name); } void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t * req, u32 catchup_opaque) { vlib_main_t *vm = mcm->vlib_main; mc_stream_t *s; mc_catchup_process_arg_t *args; mc_byte_swap_msg_catchup_request (req); s = mc_stream_by_index (mcm, req->stream_index); if (!s || s->state != MC_STREAM_STATE_ready) return; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "catchup-request: from %s stream %d", .format_args = "T4i4", }; /* *INDENT-ON* */ struct { u32 peer, stream; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64); ed->stream = req->stream_index; } /* * The application has to snapshoot its data structures right * here, right now. If we process any messages after * noting the last global sequence we've processed, the client * won't be able to accurately reconstruct our data structures. * * Once the data structures are e.g. vec_dup()'ed, we * send the resulting messages from a separate process, to * make sure that we don't cause a bunch of message retransmissions */ pool_get (mcm->catchup_process_args, args); args->stream_index = s - mcm->stream_vector; args->catchup_opaque = catchup_opaque; args->catchup_snapshot = 0; /* Construct catchup reply and snapshot state for stream to send as catchup reply payload. */ { mc_msg_catchup_reply_t *rep; serialize_main_t m; vec_resize (args->catchup_snapshot, sizeof (rep[0])); rep = (void *) args->catchup_snapshot; rep->peer_id = req->peer_id; rep->stream_index = req->stream_index; rep->last_global_sequence_included = s->last_global_sequence_processed; /* Setup for serialize to append to catchup snapshot. */ serialize_open_vector (&m, args->catchup_snapshot); m.stream.current_buffer_index = vec_len (m.stream.buffer); serialize (&m, serialize_mc_stream, s); args->catchup_snapshot = serialize_close_vector (&m); /* Actually copy internal state */ args->catchup_snapshot = s->config.catchup_snapshot (mcm, args->catchup_snapshot, rep->last_global_sequence_included); rep = (void *) args->catchup_snapshot; rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]); mc_byte_swap_msg_catchup_reply (rep); } /* now go send it... */ vlib_process_signal_event (vm, mcm->catchup_process, EVENT_MC_SEND_CATCHUP_DATA, args - mcm->catchup_process_args); } #define EVENT_MC_UNSERIALIZE_BUFFER 0 #define EVENT_MC_UNSERIALIZE_CATCHUP 1 void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, u32 catchup_opaque) { vlib_process_signal_event (mcm->vlib_main, mcm->unserialize_process, EVENT_MC_UNSERIALIZE_CATCHUP, pointer_to_uword (mp)); } static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp) { mc_stream_t *s; i32 seq_cmp_result; mc_byte_swap_msg_catchup_reply (mp); s = mc_stream_by_index (mcm, mp->stream_index); /* Never heard of this stream or already caught up. */ if (!s || s->state == MC_STREAM_STATE_ready) return; { serialize_main_t m; mc_stream_peer_t *p; u32 n_stream_bytes; /* For offline sim replay: save the entire catchup snapshot... */ if (s->config.save_snapshot) s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, mp->n_data_bytes); unserialize_open_data (&m, mp->data, mp->n_data_bytes); unserialize (&m, unserialize_mc_stream, s); /* Make sure we start numbering our messages as expected */ /* *INDENT-OFF* */ pool_foreach (p, s->peers, ({ if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64) s->our_local_sequence = p->last_sequence_received + 1; })); /* *INDENT-ON* */ n_stream_bytes = m.stream.current_buffer_index; /* No need to unserialize close; nothing to free. */ /* After serialized stream is user's catchup data. */ s->config.catchup (mcm, mp->data + n_stream_bytes, mp->n_data_bytes - n_stream_bytes); } /* Vector could have been moved by catchup. This can only happen for mc-internal stream. */ s = mc_stream_by_index (mcm, mp->stream_index); s->last_global_sequence_processed = mp->last_global_sequence_included; while (clib_fifo_elts (s->catchup_fifo)) { mc_msg_user_request_t *gp; u32 bi; vlib_buffer_t *b; clib_fifo_sub1 (s->catchup_fifo, bi); b = vlib_get_buffer (mcm->vlib_main, bi); gp = vlib_buffer_get_current (b); /* Make sure we're replaying "new" news */ seq_cmp_result = mc_seq_cmp (gp->global_sequence, mp->last_global_sequence_included); if (seq_cmp_result > 0) { vlib_buffer_advance (b, sizeof (gp[0])); s->config.rx_buffer (mcm, s, gp->peer_id, bi); s->last_global_sequence_processed = gp->global_sequence; if (MC_EVENT_LOGGING) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "catchup replay local sequence 0x%x", .format_args = "i4", }; /* *INDENT-ON* */ struct { u32 local_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->local_sequence = gp->local_sequence; } } else { if (MC_EVENT_LOGGING) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (t) = { .format = "catchup discard local sequence 0x%x", .format_args = "i4", }; /* *INDENT-ON* */ struct { u32 local_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, t); ed->local_sequence = gp->local_sequence; } vlib_buffer_free_one (mcm->vlib_main, bi); } } s->state = MC_STREAM_STATE_ready; /* Now that we are caught up wake up joining process. */ { vlib_one_time_waiting_process_t *wp; vec_foreach (wp, s->procs_waiting_for_join_done) vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); if (s->procs_waiting_for_join_done) _vec_len (s->procs_waiting_for_join_done) = 0; } } static void this_node_maybe_master (mc_main_t * mcm) { vlib_main_t *vm = mcm->vlib_main; mc_msg_master_assert_t *mp; uword event_type; int timeouts = 0; int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER; clib_error_t *error; f64 now, time_last_master_assert = -1; u32 bi; while (1) { if (!mcm->we_can_be_relay_master) { mcm->relay_state = MC_RELAY_STATE_SLAVE; if (MC_EVENT_LOGGING) { ELOG_TYPE (e, "become slave (config)"); ELOG (mcm->elog_main, e, 0); } return; } now = vlib_time_now (vm); if (now >= time_last_master_assert + 1) { time_last_master_assert = now; mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi); mp->peer_id = mcm->transport.our_ack_peer_id; mp->global_sequence = mcm->relay_global_sequence; /* * these messages clog the event log, set MC_EVENT_LOGGING higher * if you want them */ if (MC_EVENT_LOGGING > 1) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "tx-massert: peer %s global seq %u", .format_args = "T4i4", }; /* *INDENT-ON* */ struct { u32 peer, global_sequence; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); ed->global_sequence = mp->global_sequence; } mc_byte_swap_msg_master_assert (mp); error = mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_MASTERSHIP, bi); if (error) clib_error_report (error); } vlib_process_wait_for_event_or_clock (vm, 1.0); event_type = vlib_process_get_events (vm, /* no event data */ 0); switch (event_type) { case ~0: if (!is_master && timeouts++ > 2) { mcm->relay_state = MC_RELAY_STATE_MASTER; mcm->relay_master_peer_id = mcm->transport.our_ack_peer_id.as_u64; if (MC_EVENT_LOGGING) { ELOG_TYPE (e, "become master (was maybe_master)"); ELOG (mcm->elog_main, e, 0); } return; } break; case MC_RELAY_STATE_SLAVE: mcm->relay_state = MC_RELAY_STATE_SLAVE; if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE) { ELOG_TYPE (e, "become slave (was maybe_master)"); ELOG (mcm->elog_main, e, 0); } return; } } } static void this_node_slave (mc_main_t * mcm) { vlib_main_t *vm = mcm->vlib_main; uword event_type; int timeouts = 0; if (MC_EVENT_LOGGING) { ELOG_TYPE (e, "become slave"); ELOG (mcm->elog_main, e, 0); } while (1) { vlib_process_wait_for_event_or_clock (vm, 1.0); event_type = vlib_process_get_events (vm, /* no event data */ 0); switch (event_type) { case ~0: if (timeouts++ > 2) { mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; mcm->relay_master_peer_id = ~0ULL; if (MC_EVENT_LOGGING) { ELOG_TYPE (e, "timeouts; negoitate mastership"); ELOG (mcm->elog_main, e, 0); } return; } break; case MC_RELAY_STATE_SLAVE: mcm->relay_state = MC_RELAY_STATE_SLAVE; timeouts = 0; break; } } } static uword mc_mastership_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { mc_main_t *mcm = mc_node_get_main (node); while (1) { switch (mcm->relay_state) { case MC_RELAY_STATE_NEGOTIATE: case MC_RELAY_STATE_MASTER: this_node_maybe_master (mcm); break; case MC_RELAY_STATE_SLAVE: this_node_slave (mcm); break; } } return 0; /* not likely */ } void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master) { if (we_can_be_master != mcm->we_can_be_relay_master) { mcm->we_can_be_relay_master = we_can_be_master; vlib_process_signal_event (mcm->vlib_main, mcm->mastership_process, MC_RELAY_STATE_NEGOTIATE, 0); } } void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, u32 buffer_index) { mc_peer_id_t his_peer_id, our_peer_id; i32 seq_cmp_result; u8 signal_slave = 0; u8 update_global_sequence = 0; mc_byte_swap_msg_master_assert (mp); his_peer_id = mp->peer_id; our_peer_id = mcm->transport.our_ack_peer_id; /* compare the incoming global sequence with ours */ seq_cmp_result = mc_seq_cmp (mp->global_sequence, mcm->relay_global_sequence); /* If the sender has a lower peer id and the sender's sequence >= our global sequence, we become a slave. Otherwise we are master. */ if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 && seq_cmp_result >= 0) { vlib_process_signal_event (mcm->vlib_main, mcm->mastership_process, MC_RELAY_STATE_SLAVE, 0); signal_slave = 1; } /* Update our global sequence. */ if (seq_cmp_result > 0) { mcm->relay_global_sequence = mp->global_sequence; update_global_sequence = 1; } { uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id); mc_mastership_peer_t *p; if (q) p = vec_elt_at_index (mcm->mastership_peers, q[0]); else { vec_add2 (mcm->mastership_peers, p, 1); p->peer_id = his_peer_id; mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, p - mcm->mastership_peers, /* old_value */ 0); } p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main); } /* * these messages clog the event log, set MC_EVENT_LOGGING higher * if you want them. */ if (MC_EVENT_LOGGING > 1) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "rx-massert: peer %s global seq %u upd %d slave %d", .format_args = "T4i4i1i1", }; /* *INDENT-ON* */ struct { u32 peer; u32 global_sequence; u8 update_sequence; u8 slave; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64); ed->global_sequence = mp->global_sequence; ed->update_sequence = update_global_sequence; ed->slave = signal_slave; } } static void mc_serialize_init (mc_main_t * mcm) { mc_serialize_msg_t *m; vlib_main_t *vm = vlib_get_main (); mcm->global_msg_index_by_name = hash_create_string ( /* elts */ 0, sizeof (uword)); m = vm->mc_msg_registrations; while (m) { m->global_index = vec_len (mcm->global_msgs); hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index); vec_add1 (mcm->global_msgs, m); m = m->next_registration; } } clib_error_t * mc_serialize_va (mc_main_t * mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t * msg, va_list * va) { mc_stream_t *s; clib_error_t *error; serialize_main_t *m = &mc->serialize_mains[VLIB_TX]; vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX]; u32 bi, n_before, n_after, n_total, n_this_msg; u32 si, gi; if (!sbm->vlib_main) { sbm->tx.max_n_data_bytes_per_chain = 4096; sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX; } if (sbm->first_buffer == 0) serialize_open_vlib_buffer (m, mc->vlib_main, sbm); n_before = serialize_vlib_buffer_n_bytes (m); s = mc_stream_by_index (mc, stream_index); gi = msg->global_index; ASSERT (msg == vec_elt (mc->global_msgs, gi)); si = ~0; if (gi < vec_len (s->stream_msg_index_by_global_index)) si = s->stream_msg_index_by_global_index[gi]; serialize_likely_small_unsigned_integer (m, si); /* For first time message is sent, use name to identify message. */ if (si == ~0 || MSG_ID_DEBUG) serialize_cstring (m, msg->name); if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "serialize-msg: %s index %d", .format_args = "T4i4", }; /* *INDENT-ON* */ struct { u32 c[2]; } *ed; ed = ELOG_DATA (mc->elog_main, e); ed->c[0] = elog_id_for_msg_name (mc, msg->name); ed->c[1] = si; } error = va_serialize (m, va); n_after = serialize_vlib_buffer_n_bytes (m); n_this_msg = n_after - n_before; n_total = n_after + sizeof (mc_msg_user_request_t); /* For max message size ignore first message where string name is sent. */ if (si != ~0) msg->max_n_bytes_serialized = clib_max (msg->max_n_bytes_serialized, n_this_msg); if (!multiple_messages_per_vlib_buffer || si == ~0 || n_total + msg->max_n_bytes_serialized > mc->transport.max_packet_size) { bi = serialize_close_vlib_buffer (m); sbm->first_buffer = 0; if (!error) mc_stream_send (mc, stream_index, bi); else if (bi != ~0) vlib_buffer_free_one (mc->vlib_main, bi); } return error; } clib_error_t * mc_serialize_internal (mc_main_t * mc, u32 stream_index, u32 multiple_messages_per_vlib_buffer, mc_serialize_msg_t * msg, ...) { vlib_main_t *vm = mc->vlib_main; va_list va; clib_error_t *error; if (stream_index == ~0) { if (vm->mc_main && vm->mc_stream_index == ~0) vlib_current_process_wait_for_one_time_event_vector (vm, &vm->procs_waiting_for_mc_stream_join); stream_index = vm->mc_stream_index; } va_start (va, msg); error = mc_serialize_va (mc, stream_index, multiple_messages_per_vlib_buffer, msg, &va); va_end (va); return error; } uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s, serialize_main_t * m) { mc_serialize_stream_msg_t *sm; u32 gi, si; si = unserialize_likely_small_unsigned_integer (m); if (!(si == ~0 || MSG_ID_DEBUG)) { sm = vec_elt_at_index (s->stream_msgs, si); gi = sm->global_index; } else { char *name; unserialize_cstring (m, &name); if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "unserialize-msg: %s rx index %d", .format_args = "T4i4", }; /* *INDENT-ON* */ struct { u32 c[2]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->c[0] = elog_id_for_msg_name (mcm, name); ed->c[1] = si; } { uword *p = hash_get_mem (mcm->global_msg_index_by_name, name); gi = p ? p[0] : ~0; } /* Unknown message? */ if (gi == ~0) { vec_free (name); goto done; } vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); si = s->stream_msg_index_by_global_index[gi]; /* Stream local index unknown? Create it. */ if (si == ~0) { vec_add2 (s->stream_msgs, sm, 1); si = sm - s->stream_msgs; sm->global_index = gi; s->stream_msg_index_by_global_index[gi] = si; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "msg-bind: stream %d %s to index %d", .format_args = "i4T4i4", }; /* *INDENT-ON* */ struct { u32 c[3]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->c[0] = s->index; ed->c[1] = elog_id_for_msg_name (mcm, name); ed->c[2] = si; } } else { sm = vec_elt_at_index (s->stream_msgs, si); if (gi != sm->global_index && MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "msg-id-ERROR: %s index %d expected %d", .format_args = "T4i4i4", }; /* *INDENT-ON* */ struct { u32 c[3]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->c[0] = elog_id_for_msg_name (mcm, name); ed->c[1] = si; ed->c[2] = ~0; if (sm->global_index < vec_len (s->stream_msg_index_by_global_index)) ed->c[2] = s->stream_msg_index_by_global_index[sm->global_index]; } } vec_free (name); } if (gi != ~0) { mc_serialize_msg_t *msg; msg = vec_elt (mcm->global_msgs, gi); unserialize (m, msg->unserialize, mcm); } done: return gi != ~0; } void mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index) { vlib_main_t *vm = mcm->vlib_main; serialize_main_t *m = &mcm->serialize_mains[VLIB_RX]; vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX]; mc_stream_and_buffer_t *sb; mc_stream_t *stream; u32 buffer_index; sb = pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, stream_and_buffer_index); buffer_index = sb->buffer_index; stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index); pool_put (mcm->mc_unserialize_stream_and_buffers, sb); if (stream->config.save_snapshot) { u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index); static u8 *contents; vec_reset_length (contents); vec_validate (contents, n_bytes - 1); vlib_buffer_contents (vm, buffer_index, contents); stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, n_bytes); } ASSERT (vlib_in_process_context (vm)); unserialize_open_vlib_buffer (m, vm, sbm); clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index); while (unserialize_vlib_buffer_n_bytes (m) > 0) mc_unserialize_message (mcm, stream, m); /* Frees buffer. */ unserialize_close_vlib_buffer (m); } void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index) { vlib_main_t *vm = mcm->vlib_main; mc_stream_and_buffer_t *sb; pool_get (mcm->mc_unserialize_stream_and_buffers, sb); sb->stream_index = s->index; sb->buffer_index = buffer_index; vlib_process_signal_event (vm, mcm->unserialize_process, EVENT_MC_UNSERIALIZE_BUFFER, sb - mcm->mc_unserialize_stream_and_buffers); } static uword mc_unserialize_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { mc_main_t *mcm = mc_node_get_main (node); uword event_type, *event_data = 0; int i; while (1) { if (event_data) _vec_len (event_data) = 0; vlib_process_wait_for_event (vm); event_type = vlib_process_get_events (vm, &event_data); switch (event_type) { case EVENT_MC_UNSERIALIZE_BUFFER: for (i = 0; i < vec_len (event_data); i++) mc_unserialize_internal (mcm, event_data[i]); break; case EVENT_MC_UNSERIALIZE_CATCHUP: for (i = 0; i < vec_len (event_data); i++) { u8 *mp = uword_to_pointer (event_data[i], u8 *); perform_catchup (mcm, (void *) mp); vec_free (mp); } break; default: break; } } return 0; /* not likely */ } void serialize_mc_main (serialize_main_t * m, va_list * va) { mc_main_t *mcm = va_arg (*va, mc_main_t *); mc_stream_t *s; mc_serialize_stream_msg_t *sm; mc_serialize_msg_t *msg; serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32)); vec_foreach (s, mcm->stream_vector) { /* Stream name. */ serialize_cstring (m, s->config.name); /* Serialize global names for all sent messages. */ serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32)); vec_foreach (sm, s->stream_msgs) { msg = vec_elt (mcm->global_msgs, sm->global_index); serialize_cstring (m, msg->name); } } } void unserialize_mc_main (serialize_main_t * m, va_list * va) { mc_main_t *mcm = va_arg (*va, mc_main_t *); u32 i, n_streams, n_stream_msgs; char *name; mc_stream_t *s; mc_serialize_stream_msg_t *sm; unserialize_integer (m, &n_streams, sizeof (u32)); for (i = 0; i < n_streams; i++) { unserialize_cstring (m, &name); if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name)) { vec_validate (mcm->stream_vector, i); s = vec_elt_at_index (mcm->stream_vector, i); mc_stream_init (s); s->index = s - mcm->stream_vector; s->config.name = name; s->state = MC_STREAM_STATE_name_known; hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index); } else vec_free (name); s = vec_elt_at_index (mcm->stream_vector, i); vec_free (s->stream_msgs); vec_free (s->stream_msg_index_by_global_index); unserialize_integer (m, &n_stream_msgs, sizeof (u32)); vec_resize (s->stream_msgs, n_stream_msgs); vec_foreach (sm, s->stream_msgs) { uword *p; u32 si, gi; unserialize_cstring (m, &name); p = hash_get (mcm->global_msg_index_by_name, name); gi = p ? p[0] : ~0; si = sm - s->stream_msgs; if (MC_EVENT_LOGGING > 0) { /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "catchup-bind: %s to %d global index %d stream %d", .format_args = "T4i4i4i4", }; /* *INDENT-ON* */ struct { u32 c[4]; } *ed; ed = ELOG_DATA (mcm->elog_main, e); ed->c[0] = elog_id_for_msg_name (mcm, name); ed->c[1] = si; ed->c[2] = gi; ed->c[3] = s->index; } vec_free (name); sm->global_index = gi; if (gi != ~0) { vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); s->stream_msg_index_by_global_index[gi] = si; } } } } void mc_main_init (mc_main_t * mcm, char *tag) { vlib_main_t *vm = vlib_get_main (); mcm->vlib_main = vm; mcm->elog_main = &vm->elog_main; mcm->relay_master_peer_id = ~0ULL; mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; mcm->stream_index_by_name = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); { vlib_node_registration_t r; memset (&r, 0, sizeof (r)); r.type = VLIB_NODE_TYPE_PROCESS; /* Point runtime data to main instance. */ r.runtime_data = &mcm; r.runtime_data_bytes = sizeof (&mcm); r.name = (char *) format (0, "mc-mastership-%s", tag); r.function = mc_mastership_process; mcm->mastership_process = vlib_register_node (vm, &r); r.name = (char *) format (0, "mc-join-ager-%s", tag); r.function = mc_join_ager_process; mcm->join_ager_process = vlib_register_node (vm, &r); r.name = (char *) format (0, "mc-retry-%s", tag); r.function = mc_retry_process; mcm->retry_process = vlib_register_node (vm, &r); r.name = (char *) format (0, "mc-catchup-%s", tag); r.function = mc_catchup_process; mcm->catchup_process = vlib_register_node (vm, &r); r.name = (char *) format (0, "mc-unserialize-%s", tag); r.function = mc_unserialize_process; mcm->unserialize_process = vlib_register_node (vm, &r); } if (MC_EVENT_LOGGING > 0) mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), sizeof (mc_peer_id_t)); mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); mc_serialize_init (mcm); } static u8 * format_mc_relay_state (u8 * s, va_list * args) { mc_relay_state_t state = va_arg (*args, mc_relay_state_t); char *t = 0; switch (state) { case MC_RELAY_STATE_NEGOTIATE: t = "negotiate"; break; case MC_RELAY_STATE_MASTER: t = "master"; break; case MC_RELAY_STATE_SLAVE: t = "slave"; break; default: return format (s, "unknown 0x%x", state); } return format (s, "%s", t); } static u8 * format_mc_stream_state (u8 * s, va_list * args) { mc_stream_state_t state = va_arg (*args, mc_stream_state_t); char *t = 0; switch (state) { #define _(f) case MC_STREAM_STATE_##f: t = #f; break; foreach_mc_stream_state #undef _ default: return format (s, "unknown 0x%x", state); } return format (s, "%s", t); } static int mc_peer_comp (void *a1, void *a2) { mc_stream_peer_t *p1 = a1; mc_stream_peer_t *p2 = a2; return mc_peer_id_compare (p1->id, p2->id); } u8 * format_mc_main (u8 * s, va_list * args) { mc_main_t *mcm = va_arg (*args, mc_main_t *); mc_stream_t *t; mc_stream_peer_t *p, *ps; u32 indent = format_get_indent (s); s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", format_mc_relay_state, mcm->relay_state, vec_len (mcm->stream_vector), mcm->relay_global_sequence); { mc_mastership_peer_t *mp; f64 now = vlib_time_now (mcm->vlib_main); s = format (s, "\n%UMost recent mastership peers:", format_white_space, indent + 2); vec_foreach (mp, mcm->mastership_peers) { s = format (s, "\n%U%-30U%.4e", format_white_space, indent + 4, mcm->transport.format_peer_id, mp->peer_id, now - mp->time_last_master_assert_received); } } vec_foreach (t, mcm->stream_vector) { s = format (s, "\n%Ustream `%s' index %d", format_white_space, indent + 2, t->config.name, t->index); s = format (s, "\n%Ustate %U", format_white_space, indent + 4, format_mc_stream_state, t->state); s = format (s, "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent", format_white_space, indent + 4, t->config.retry_interval, t->config.retry_limit, pool_elts (t->retry_pool), t->stats.n_retries - t->stats_last_clear.n_retries); s = format (s, "\n%U%Ld/%Ld user requests sent/received", format_white_space, indent + 4, t->user_requests_sent, t->user_requests_received); s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x", format_white_space, indent + 4, pool_elts (t->peers), t->our_local_sequence, t->last_global_sequence_processed); ps = 0; /* *INDENT-OFF* */ pool_foreach (p, t->peers, ({ if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers)) vec_add1 (ps, p[0]); })); /* *INDENT-ON* */ vec_sort_with_function (ps, mc_peer_comp); s = format (s, "\n%U%=30s%10s%16s%16s", format_white_space, indent + 6, "Peer", "Last seq", "Retries", "Future"); vec_foreach (p, ps) { s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s", format_white_space, indent + 6, mcm->transport.format_peer_id, p->id.as_u64, p->last_sequence_received, p->stats.n_msgs_from_past - p->stats_last_clear.n_msgs_from_past, p->stats.n_msgs_from_future - p->stats_last_clear.n_msgs_from_future, (mcm->transport.our_ack_peer_id.as_u64 == p->id.as_u64 ? " (self)" : "")); } vec_free (ps); } return s; } /* * fd.io coding-style-patch-verification: ON * * Local Variables: * eval: (c-set-style "gnu") * End: */