aboutsummaryrefslogtreecommitdiffstats
path: root/src/vlib
diff options
context:
space:
mode:
Diffstat (limited to 'src/vlib')
-rw-r--r--src/vlib/CMakeLists.txt3
-rw-r--r--src/vlib/buffer_funcs.h1
-rw-r--r--src/vlib/cli.c113
-rw-r--r--src/vlib/main.h8
-rw-r--r--src/vlib/mc.c2609
-rw-r--r--src/vlib/mc.h695
-rw-r--r--src/vlib/threads.c10
-rw-r--r--src/vlib/threads.h1
-rw-r--r--src/vlib/threads_cli.c23
-rw-r--r--src/vlib/vlib.h1
10 files changed, 125 insertions, 3339 deletions
diff --git a/src/vlib/CMakeLists.txt b/src/vlib/CMakeLists.txt
index b187f980401..72c73f3c2d8 100644
--- a/src/vlib/CMakeLists.txt
+++ b/src/vlib/CMakeLists.txt
@@ -43,7 +43,6 @@ add_vpp_library(vlib
linux/vfio.c
log.c
main.c
- mc.c
node.c
node_cli.c
node_format.c
@@ -55,7 +54,6 @@ add_vpp_library(vlib
unix/cli.c
unix/input.c
unix/main.c
- unix/mc_socket.c
unix/plugin.c
unix/util.c
@@ -77,7 +75,6 @@ add_vpp_library(vlib
linux/vfio.h
log.h
main.h
- mc.h
node_funcs.h
node.h
pci/pci_config.h
diff --git a/src/vlib/buffer_funcs.h b/src/vlib/buffer_funcs.h
index 5306af6e218..d8abdf31d79 100644
--- a/src/vlib/buffer_funcs.h
+++ b/src/vlib/buffer_funcs.h
@@ -41,6 +41,7 @@
#define included_vlib_buffer_funcs_h
#include <vppinfra/hash.h>
+#include <vppinfra/fifo.h>
/** \file
vlib buffer access methods.
diff --git a/src/vlib/cli.c b/src/vlib/cli.c
index ef02d27d288..d7b2a4686d1 100644
--- a/src/vlib/cli.c
+++ b/src/vlib/cli.c
@@ -40,6 +40,7 @@
#include <vlib/vlib.h>
#include <vlib/unix/unix.h>
#include <vppinfra/cpu.h>
+#include <vppinfra/elog.h>
#include <unistd.h>
#include <ctype.h>
@@ -583,6 +584,23 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm,
}
else
{
+ if (PREDICT_FALSE (vm->elog_trace_cli_commands))
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "cli-cmd: %s",
+ .format_args = "T4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c;
+ } *ed;
+ ed = ELOG_DATA (&vm->elog_main, e);
+ ed->c = elog_global_id_for_msg_name (c->path);
+ }
+
if (!c->is_mp_safe)
vlib_worker_thread_barrier_sync (vm);
@@ -591,6 +609,32 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm,
if (!c->is_mp_safe)
vlib_worker_thread_barrier_release (vm);
+ if (PREDICT_FALSE (vm->elog_trace_cli_commands))
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "cli-cmd: %s %s",
+ .format_args = "T4T4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c, err;
+ } *ed;
+ ed = ELOG_DATA (&vm->elog_main, e);
+ ed->c = elog_global_id_for_msg_name (c->path);
+ if (c_error)
+ {
+ vec_add1 (c_error->what, 0);
+ ed->err = elog_global_id_for_msg_name
+ ((const char *) c_error->what);
+ _vec_len (c_error->what) -= 1;
+ }
+ else
+ ed->err = elog_global_id_for_msg_name ("OK");
+ }
+
if (c_error)
{
error =
@@ -1416,6 +1460,75 @@ VLIB_CLI_COMMAND (show_cli_command, static) = {
/* *INDENT-ON* */
static clib_error_t *
+elog_trace_command_fn (vlib_main_t * vm,
+ unformat_input_t * input, vlib_cli_command_t * cmd)
+{
+ unformat_input_t _line_input, *line_input = &_line_input;
+ int enable = 1;
+ int api = 0, cli = 0, barrier = 0;
+
+ if (!unformat_user (input, unformat_line_input, line_input))
+ goto print_status;
+
+ while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (line_input, "api"))
+ api = 1;
+ else if (unformat (line_input, "cli"))
+ cli = 1;
+ else if (unformat (line_input, "barrier"))
+ barrier = 1;
+ else if (unformat (line_input, "disable"))
+ enable = 0;
+ else if (unformat (line_input, "enable"))
+ enable = 1;
+ else
+ break;
+ }
+ unformat_free (line_input);
+
+ vm->elog_trace_api_messages = api ? enable : vm->elog_trace_api_messages;
+ vm->elog_trace_cli_commands = cli ? enable : vm->elog_trace_cli_commands;
+ vlib_worker_threads->barrier_elog_enabled =
+ barrier ? enable : vlib_worker_threads->barrier_elog_enabled;
+
+print_status:
+ vlib_cli_output (vm, "Current status:");
+
+ vlib_cli_output
+ (vm, " Event log API message trace: %s\n CLI command trace: %s",
+ vm->elog_trace_api_messages ? "on" : "off",
+ vm->elog_trace_cli_commands ? "on" : "off");
+ vlib_cli_output
+ (vm, " Barrier sync trace: %s",
+ vlib_worker_threads->barrier_elog_enabled ? "on" : "off");
+
+ return 0;
+}
+
+/*?
+ * Control event logging of api, cli, and thread barrier events
+ * With no arguments, displays the current trace status.
+ * Name the event groups you wish to trace or stop tracing.
+ *
+ * @cliexpar
+ * @clistart
+ * elog trace api cli barrier
+ * elog trace api cli barrier disable
+ * elog trace
+ * @cliend
+ * @cliexcmd{elog trace [api][cli][barrier][disable]}
+?*/
+/* *INDENT-OFF* */
+VLIB_CLI_COMMAND (elog_trace_command, static) =
+{
+ .path = "elog trace",
+ .short_help = "elog trace [api][cli][barrier][disable]",
+ .function = elog_trace_command_fn,
+};
+/* *INDENT-ON* */
+
+static clib_error_t *
vlib_cli_init (vlib_main_t * vm)
{
vlib_cli_main_t *cm = &vm->cli_main;
diff --git a/src/vlib/main.h b/src/vlib/main.h
index d21227a5fdf..ce42b6ea442 100644
--- a/src/vlib/main.h
+++ b/src/vlib/main.h
@@ -148,9 +148,6 @@ typedef struct vlib_main_t
struct vlib_node_runtime_t * node,
vlib_frame_t * frame);
- /* Multicast distribution. Set to zero for MC disabled. */
- mc_main_t *mc_main;
-
/* Stream index to use for distribution when MC is enabled. */
u32 mc_stream_index;
@@ -159,6 +156,10 @@ typedef struct vlib_main_t
/* Event logger. */
elog_main_t elog_main;
+ /* Event logger trace flags */
+ int elog_trace_api_messages;
+ int elog_trace_cli_commands;
+
/* Node call and return event types. */
elog_event_type_t *node_call_elog_event_types;
elog_event_type_t *node_return_elog_event_types;
@@ -184,7 +185,6 @@ typedef struct vlib_main_t
_vlib_init_function_list_elt_t *main_loop_exit_function_registrations;
_vlib_init_function_list_elt_t *api_init_function_registrations;
vlib_config_function_runtime_t *config_function_registrations;
- mc_serialize_msg_t *mc_msg_registrations; /* mc_main is a pointer... */
/* control-plane API queue signal pending, length indication */
volatile u32 queue_signal_pending;
diff --git a/src/vlib/mc.c b/src/vlib/mc.c
deleted file mode 100644
index a289871f570..00000000000
--- a/src/vlib/mc.c
+++ /dev/null
@@ -1,2609 +0,0 @@
-/*
- * mc.c: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <vlib/vlib.h>
-
-/*
- * 1 to enable msg id training wheels, which are useful for tracking
- * down catchup and/or partitioned network problems
- */
-#define MSG_ID_DEBUG 0
-
-static format_function_t format_mc_stream_state;
-
-static u32
-elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
-{
- uword *p, r;
- mhash_t *h = &m->elog_id_by_peer_id;
-
- if (!m->elog_id_by_peer_id.hash)
- mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
-
- p = mhash_get (h, &peer_id);
- if (p)
- return p[0];
- r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
- mhash_set (h, &peer_id, r, /* old_value */ 0);
- return r;
-}
-
-static u32
-elog_id_for_msg_name (mc_main_t * m, char *msg_name)
-{
- uword *p, r;
- uword *h = m->elog_id_by_msg_name;
- u8 *name_copy;
-
- if (!h)
- h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
-
- p = hash_get_mem (h, msg_name);
- if (p)
- return p[0];
- r = elog_string (m->elog_main, "%s", msg_name);
-
- name_copy = format (0, "%s%c", msg_name, 0);
-
- hash_set_mem (h, name_copy, r);
- m->elog_id_by_msg_name = h;
-
- return r;
-}
-
-static void
-elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
- u32 retry_count)
-{
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-msg: stream %d local seq %d attempt %d",
- .format_args = "i4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_id, local_sequence, retry_count;
- } *ed;
- ed = ELOG_DATA (m->elog_main, e);
- ed->stream_id = stream_id;
- ed->local_sequence = local_sequence;
- ed->retry_count = retry_count;
- }
-}
-
-/*
- * seq_cmp
- * correctly compare two unsigned sequence numbers.
- * This function works so long as x and y are within 2**(n-1) of each
- * other, where n = bits(x, y).
- *
- * Magic decoder ring:
- * seq_cmp == 0 => x and y are equal
- * seq_cmp < 0 => x is "in the past" with respect to y
- * seq_cmp > 0 => x is "in the future" with respect to y
- */
-always_inline i32
-mc_seq_cmp (u32 x, u32 y)
-{
- return (i32) x - (i32) y;
-}
-
-void *
-mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
-{
- u32 n_alloc, bi = 0;
- vlib_buffer_t *b;
-
- n_alloc = vlib_buffer_alloc (vm, &bi, 1);
- ASSERT (n_alloc == 1);
-
- b = vlib_get_buffer (vm, bi);
- b->current_length = n_bytes;
- *bi_return = bi;
- return (void *) b->data;
-}
-
-static void
-delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
- uword index, int notify_application)
-{
- mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
- ASSERT (p != 0);
- if (s->config.peer_died && notify_application)
- s->config.peer_died (mcm, s, p->id);
-
- s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "delete peer %s from all_peer_bitmap",
- .format_args = "T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer;
- } *ed = 0;
-
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- }
- /* Do not delete the pool / hash table entries, or we lose sequence number state */
-}
-
-static mc_stream_peer_t *
-get_or_create_peer_with_id (mc_main_t * mcm,
- mc_stream_t * s, mc_peer_id_t id, int *created)
-{
- uword *q = mhash_get (&s->peer_index_by_id, &id);
- mc_stream_peer_t *p;
-
- if (q)
- {
- p = pool_elt_at_index (s->peers, q[0]);
- goto done;
- }
-
- pool_get (s->peers, p);
- memset (p, 0, sizeof (p[0]));
- p->id = id;
- p->last_sequence_received = ~0;
- mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
- if (created)
- *created = 1;
-
-done:
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "get_or_create %s peer %s stream %d seq %d",
- .format_args = "t4T4i4i4",
- .n_enum_strings = 2,
- .enum_strings = {
- "old", "new",
- },
- };
- /* *INDENT-ON* */
- struct
- {
- u32 is_new, peer, stream_index, rx_sequence;
- } *ed = 0;
-
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->is_new = q ? 0 : 1;
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->stream_index = s->index;
- ed->rx_sequence = p->last_sequence_received;
- }
- /* $$$$ Enable or reenable this peer */
- s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
- return p;
-}
-
-static void
-maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
-{
- vlib_one_time_waiting_process_t *p;
-
- if (pool_elts (stream->retry_pool) >= stream->config.window_size)
- return;
-
- vec_foreach (p, stream->procs_waiting_for_open_window)
- vlib_signal_one_time_waiting_process (vm, p);
-
- if (stream->procs_waiting_for_open_window)
- _vec_len (stream->procs_waiting_for_open_window) = 0;
-}
-
-static void
-mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
-{
- mc_retry_t record, *retp;
-
- if (r->unacked_by_peer_bitmap)
- _vec_len (r->unacked_by_peer_bitmap) = 0;
-
- if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
- {
- clib_fifo_sub1 (s->retired_fifo, record);
- vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
- }
-
- clib_fifo_add2 (s->retired_fifo, retp);
-
- retp->buffer_index = r->buffer_index;
- retp->local_sequence = r->local_sequence;
-
- r->buffer_index = ~0; /* poison buffer index in this retry */
-}
-
-static void
-mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
-{
- mc_retry_t *retry;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "resend-retired: search for local seq %d",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->local_sequence = local_sequence;
- }
-
- /* *INDENT-OFF* */
- clib_fifo_foreach (retry, s->retired_fifo,
- ({
- if (retry->local_sequence == local_sequence)
- {
- elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- retry->buffer_index);
- return;
- }
- }));
- /* *INDENT-ON* */
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "resend-retired: FAILED search for local seq %d",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->local_sequence = local_sequence;
- }
-}
-
-static uword *
-delete_retry_fifo_elt (mc_main_t * mcm,
- mc_stream_t * stream,
- mc_retry_t * r, uword * dead_peer_bitmap)
-{
- mc_stream_peer_t *p;
-
- /* *INDENT-OFF* */
- pool_foreach (p, stream->peers, ({
- uword pi = p - stream->peers;
- uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
-
- if (! is_alive)
- dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
-
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "delete_retry_fifo_elt: peer %s is %s",
- .format_args = "T4t4",
- .n_enum_strings = 2,
- .enum_strings = { "alive", "dead", },
- };
- struct { u32 peer, is_alive; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->is_alive = is_alive;
- }
- }));
- /* *INDENT-ON* */
-
- hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
- mc_retry_free (mcm, stream, r);
-
- return dead_peer_bitmap;
-}
-
-always_inline mc_retry_t *
-prev_retry (mc_stream_t * s, mc_retry_t * r)
-{
- return (r->prev_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
-}
-
-always_inline mc_retry_t *
-next_retry (mc_stream_t * s, mc_retry_t * r)
-{
- return (r->next_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
-}
-
-always_inline void
-remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
-{
- mc_retry_t *p = prev_retry (s, r);
- mc_retry_t *n = next_retry (s, r);
-
- if (p)
- p->next_index = r->next_index;
- else
- s->retry_head_index = r->next_index;
- if (n)
- n->prev_index = r->prev_index;
- else
- s->retry_tail_index = r->prev_index;
-
- pool_put_index (s->retry_pool, r - s->retry_pool);
-}
-
-static void
-check_retry (mc_main_t * mcm, mc_stream_t * s)
-{
- mc_retry_t *r;
- vlib_main_t *vm = mcm->vlib_main;
- f64 now = vlib_time_now (vm);
- uword *dead_peer_bitmap = 0;
- u32 ri, ri_next;
-
- for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
- {
- r = pool_elt_at_index (s->retry_pool, ri);
- ri_next = r->next_index;
-
- if (now < r->sent_at + s->config.retry_interval)
- continue;
-
- r->n_retries += 1;
- if (r->n_retries > s->config.retry_limit)
- {
- dead_peer_bitmap =
- delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
- remove_retry_from_pool (s, r);
- }
- else
- {
- if (MC_EVENT_LOGGING > 0)
- {
- mc_stream_peer_t *p;
-
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "resend local seq %d attempt %d",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
-
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
- {
- ELOG_TYPE_DECLARE (ev) = {
- .format = "resend: needed by peer %s local seq %d",
- .format_args = "T4i4",
- };
- struct { u32 peer, rx_sequence; } * ed;
- ed = ELOG_DATA (mcm->elog_main, ev);
- ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
- ed->rx_sequence = r->local_sequence;
- }
- }));
- /* *INDENT-ON* */
-
- struct
- {
- u32 sequence;
- u32 trail;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->sequence = r->local_sequence;
- ed->trail = r->n_retries;
- }
-
- r->sent_at = vlib_time_now (vm);
- s->stats.n_retries += 1;
-
- elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
-
- mcm->transport.tx_buffer
- (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
- }
- }
-
- maybe_send_window_open_event (mcm->vlib_main, s);
-
- /* Delete any dead peers we've found. */
- if (!clib_bitmap_is_zero (dead_peer_bitmap))
- {
- uword i;
-
- /* *INDENT-OFF* */
- clib_bitmap_foreach (i, dead_peer_bitmap, ({
- delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
-
- /* Delete any references to just deleted peer in retry pool. */
- pool_foreach (r, s->retry_pool, ({
- r->unacked_by_peer_bitmap =
- clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
- }));
- }));
-/* *INDENT-ON* */
- clib_bitmap_free (dead_peer_bitmap);
- }
-}
-
-always_inline mc_main_t *
-mc_node_get_main (vlib_node_runtime_t * node)
-{
- mc_main_t **p = (void *) node->runtime_data;
- return p[0];
-}
-
-static uword
-mc_retry_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- mc_stream_t *s;
-
- while (1)
- {
- vlib_process_suspend (vm, 1.0);
- vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_invalid)
- check_retry (mcm, s);
- }
- }
- return 0; /* not likely */
-}
-
-static void
-send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_msg_join_or_leave_request_t *mp;
- u32 bi;
-
- mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
- memset (mp, 0, sizeof (*mp));
- mp->type = MC_MSG_TYPE_join_or_leave_request;
- mp->peer_id = mcm->transport.our_ack_peer_id;
- mp->stream_index = stream_index;
- mp->is_join = is_join;
-
- mc_byte_swap_msg_join_or_leave_request (mp);
-
- /*
- * These msgs are unnumbered, unordered so send on the from-relay
- * channel.
- */
- mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
-}
-
-static uword
-mc_join_ager_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
-
- while (1)
- {
- if (mcm->joins_in_progress)
- {
- mc_stream_t *s;
- vlib_one_time_waiting_process_t *p;
- f64 now = vlib_time_now (vm);
-
- vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_join_in_progress)
- continue;
-
- if (now > s->join_timeout)
- {
- s->state = MC_STREAM_STATE_ready;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream %d join timeout",
- };
- /* *INDENT-ON* */
- ELOG (mcm->elog_main, e, s->index);
- }
- /* Make sure that this app instance exists as a stream peer,
- or we may answer a catchup request with a NULL
- all_peer_bitmap... */
- (void) get_or_create_peer_with_id
- (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
-
- vec_foreach (p, s->procs_waiting_for_join_done)
- vlib_signal_one_time_waiting_process (vm, p);
- if (s->procs_waiting_for_join_done)
- _vec_len (s->procs_waiting_for_join_done) = 0;
-
- mcm->joins_in_progress--;
- ASSERT (mcm->joins_in_progress >= 0);
- }
- else
- {
- /* Resent join request which may have been lost. */
- send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
-
- /* We're *not* alone, retry for as long as it takes */
- if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
- s->join_timeout = vlib_time_now (vm) + 2.0;
-
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream %d resend join request",
- };
- /* *INDENT-ON* */
- ELOG (mcm->elog_main, e, s->index);
- }
- }
- }
- }
-
- vlib_process_suspend (vm, .5);
- }
-
- return 0; /* not likely */
-}
-
-static void
-serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
- char *name = va_arg (*va, char *);
- serialize_cstring (m, name);
-}
-
-static void
-elog_stream_name (char *buf, int n_buf_bytes, char *v)
-{
- clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
- buf[n_buf_bytes - 1] = 0;
-}
-
-static void
-unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- char *name;
- mc_stream_t *s;
- uword *p;
-
- unserialize_cstring (m, &name);
-
- if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d already named %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = p[0];
- elog_stream_name (ed->name, sizeof (ed->name), name);
- }
-
- vec_free (name);
- return;
- }
-
- vec_add2 (mcm->stream_vector, s, 1);
- mc_stream_init (s);
- s->state = MC_STREAM_STATE_name_known;
- s->index = s - mcm->stream_vector;
- s->config.name = name;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d named %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = s->index;
- elog_stream_name (ed->name, sizeof (ed->name), name);
- }
-
- hash_set_mem (mcm->stream_index_by_name, name, s->index);
-
- p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
- if (p)
- {
- vlib_one_time_waiting_process_t *wp, **w;
- w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
- vec_foreach (wp, w[0])
- vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
- pool_put (mcm->procs_waiting_for_stream_name_pool, w);
- hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
- }
-}
-
-/* *INDENT-OFF* */
-MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
-{
- .name = "mc_register_stream_name",
- .serialize = serialize_mc_register_stream_name,
- .unserialize = unserialize_mc_register_stream_name,
-};
-/* *INDENT-ON* */
-
-void
-mc_rx_buffer_unserialize (mc_main_t * mcm,
- mc_stream_t * stream,
- mc_peer_id_t peer_id, u32 buffer_index)
-{
- return mc_unserialize (mcm, stream, buffer_index);
-}
-
-static u8 *
-mc_internal_catchup_snapshot (mc_main_t * mcm,
- u8 * data_vector,
- u32 last_global_sequence_processed)
-{
- serialize_main_t m;
-
- /* Append serialized data to data vector. */
- serialize_open_vector (&m, data_vector);
- m.stream.current_buffer_index = vec_len (data_vector);
-
- serialize (&m, serialize_mc_main, mcm);
- return serialize_close_vector (&m);
-}
-
-static void
-mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
-{
- serialize_main_t s;
-
- unserialize_open_data (&s, data, n_data_bytes);
-
- unserialize (&s, unserialize_mc_main, mcm);
-}
-
-/* Overridden from the application layer, not actually used here */
-void mc_stream_join_process_hold (void) __attribute__ ((weak));
-void
-mc_stream_join_process_hold (void)
-{
-}
-
-static u32
-mc_stream_join_helper (mc_main_t * mcm,
- mc_stream_config_t * config, u32 is_internal)
-{
- mc_stream_t *s;
- vlib_main_t *vm = mcm->vlib_main;
-
- s = 0;
- if (!is_internal)
- {
- uword *p;
-
- /* Already have a stream with given name? */
- if ((s = mc_stream_by_name (mcm, config->name)))
- {
- /* Already joined and ready? */
- if (s->state == MC_STREAM_STATE_ready)
- return s->index;
- }
-
- /* First join MC internal stream. */
- if (!mcm->stream_vector
- || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
- == MC_STREAM_STATE_invalid))
- {
- static mc_stream_config_t c = {
- .name = "mc-internal",
- .rx_buffer = mc_rx_buffer_unserialize,
- .catchup = mc_internal_catchup,
- .catchup_snapshot = mc_internal_catchup_snapshot,
- };
-
- c.save_snapshot = config->save_snapshot;
-
- mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
- }
-
- /* If stream is still unknown register this name and wait for
- sequenced message to name stream. This way all peers agree
- on stream name to index mappings. */
- s = mc_stream_by_name (mcm, config->name);
- if (!s)
- {
- vlib_one_time_waiting_process_t *wp, **w;
- u8 *name_copy = format (0, "%s", config->name);
-
- mc_serialize_stream (mcm,
- MC_STREAM_INDEX_INTERNAL,
- &mc_register_stream_name_msg, config->name);
-
- /* Wait for this stream to be named. */
- p =
- hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
- name_copy);
- if (p)
- w =
- pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
- p[0]);
- else
- {
- pool_get (mcm->procs_waiting_for_stream_name_pool, w);
- if (!mcm->procs_waiting_for_stream_name_by_name)
- mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
- sizeof
- (uword));
- hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
- name_copy,
- w - mcm->procs_waiting_for_stream_name_pool);
- w[0] = 0;
- }
-
- vec_add2 (w[0], wp, 1);
- vlib_current_process_wait_for_one_time_event (vm, wp);
- vec_free (name_copy);
- }
-
- /* Name should be known now. */
- s = mc_stream_by_name (mcm, config->name);
- ASSERT (s != 0);
- ASSERT (s->state == MC_STREAM_STATE_name_known);
- }
-
- if (!s)
- {
- vec_add2 (mcm->stream_vector, s, 1);
- mc_stream_init (s);
- s->index = s - mcm->stream_vector;
- }
-
- {
- /* Save name since we could have already used it as hash key. */
- char *name_save = s->config.name;
-
- s->config = config[0];
-
- if (name_save)
- s->config.name = name_save;
- }
-
- if (s->config.window_size == 0)
- s->config.window_size = 8;
-
- if (s->config.retry_interval == 0.0)
- s->config.retry_interval = 1.0;
-
- /* Sanity. */
- ASSERT (s->config.retry_interval < 30);
-
- if (s->config.retry_limit == 0)
- s->config.retry_limit = 7;
-
- s->state = MC_STREAM_STATE_join_in_progress;
- if (!s->peer_index_by_id.hash)
- mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
-
- /* If we don't hear from someone in 5 seconds, we're alone */
- s->join_timeout = vlib_time_now (vm) + 5.0;
- mcm->joins_in_progress++;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "stream index %d join request %s",
- .format_args = "i4s16",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- char name[16];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = s->index;
- elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
- }
-
- send_join_or_leave_request (mcm, s->index, 1 /* join */ );
-
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_join_done);
-
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "join complete stream %d");
- ELOG (mcm->elog_main, e, s->index);
- }
-
- return s->index;
-}
-
-u32
-mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
-{
- return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
-}
-
-void
-mc_stream_leave (mc_main_t * mcm, u32 stream_index)
-{
- mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
-
- if (!s)
- return;
-
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "leave-stream: %d",.format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 index;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->index = stream_index;
- }
-
- send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
- mc_stream_free (s);
- s->state = MC_STREAM_STATE_name_known;
-}
-
-void
-mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
- mc_msg_join_or_leave_request_t * req,
- u32 buffer_index)
-{
- mc_stream_t *s;
- mc_msg_join_reply_t *rep;
- u32 bi;
-
- mc_byte_swap_msg_join_or_leave_request (req);
-
- s = mc_stream_by_index (mcm, req->stream_index);
- if (!s || s->state != MC_STREAM_STATE_ready)
- return;
-
- /* If the peer is joining, create it */
- if (req->is_join)
- {
- mc_stream_t *this_s;
-
- /* We're not in a position to catch up a peer until all
- stream joins are complete. */
- if (0)
- {
- /* XXX This is hard to test so we've. */
- vec_foreach (this_s, mcm->stream_vector)
- {
- if (this_s->state != MC_STREAM_STATE_ready
- && this_s->state != MC_STREAM_STATE_name_known)
- return;
- }
- }
- else if (mcm->joins_in_progress > 0)
- return;
-
- (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
- /* created */ 0);
-
- rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
- memset (rep, 0, sizeof (rep[0]));
- rep->type = MC_MSG_TYPE_join_reply;
- rep->stream_index = req->stream_index;
-
- mc_byte_swap_msg_join_reply (rep);
- /* These two are already in network byte order... */
- rep->peer_id = mcm->transport.our_ack_peer_id;
- rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
-
- mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
- }
- else
- {
- if (s->config.peer_died)
- s->config.peer_died (mcm, s, req->peer_id);
- }
-}
-
-void
-mc_msg_join_reply_handler (mc_main_t * mcm,
- mc_msg_join_reply_t * mp, u32 buffer_index)
-{
- mc_stream_t *s;
-
- mc_byte_swap_msg_join_reply (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- if (!s || s->state != MC_STREAM_STATE_join_in_progress)
- return;
-
- /* Switch to catchup state; next join reply
- for this stream will be ignored. */
- s->state = MC_STREAM_STATE_catchup;
-
- mcm->joins_in_progress--;
- mcm->transport.catchup_request_fun (mcm->transport.opaque,
- mp->stream_index, mp->catchup_peer_id);
-}
-
-void
-mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
-{
- mc_stream_t *s;
-
- while (1)
- {
- s = mc_stream_by_name (m, stream_name);
- if (s)
- break;
- vlib_process_suspend (m->vlib_main, .1);
- }
-
- /* It's OK to send a message in catchup and ready states. */
- if (s->state == MC_STREAM_STATE_catchup
- || s->state == MC_STREAM_STATE_ready)
- return;
-
- /* Otherwise we are waiting for a join to finish. */
- vlib_current_process_wait_for_one_time_event_vector
- (m->vlib_main, &s->procs_waiting_for_join_done);
-}
-
-u32
-mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
-{
- mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
- vlib_main_t *vm = mcm->vlib_main;
- mc_retry_t *r;
- mc_msg_user_request_t *mp;
- vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
- u32 ri;
-
- if (!s)
- return 0;
-
- if (s->state != MC_STREAM_STATE_ready)
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_join_done);
-
- while (pool_elts (s->retry_pool) >= s->config.window_size)
- {
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &s->procs_waiting_for_open_window);
- }
-
- pool_get (s->retry_pool, r);
- ri = r - s->retry_pool;
-
- r->prev_index = s->retry_tail_index;
- r->next_index = ~0;
- s->retry_tail_index = ri;
-
- if (r->prev_index == ~0)
- s->retry_head_index = ri;
- else
- {
- mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
- p->next_index = ri;
- }
-
- vlib_buffer_advance (b, -sizeof (mp[0]));
- mp = vlib_buffer_get_current (b);
-
- mp->peer_id = mcm->transport.our_ack_peer_id;
- /* mp->transport.global_sequence set by relay agent. */
- mp->global_sequence = 0xdeadbeef;
- mp->stream_index = s->index;
- mp->local_sequence = s->our_local_sequence++;
- mp->n_data_bytes =
- vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
-
- r->buffer_index = buffer_index;
- r->local_sequence = mp->local_sequence;
- r->sent_at = vlib_time_now (vm);
- r->n_retries = 0;
-
- /* Retry will be freed when all currently known peers have acked. */
- vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
- vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
-
- hash_set (s->retry_index_by_local_sequence, r->local_sequence,
- r - s->retry_pool);
-
- elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
-
- mc_byte_swap_msg_user_request (mp);
-
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
-
- s->user_requests_sent++;
-
- /* return amount of window remaining */
- return s->config.window_size - pool_elts (s->retry_pool);
-}
-
-void
-mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
- u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_t *s;
- mc_stream_peer_t *peer;
- i32 seq_cmp_result;
- static int once = 0;
-
- mc_byte_swap_msg_user_request (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- /* Not signed up for this stream? Turf-o-matic */
- if (!s || s->state != MC_STREAM_STATE_ready)
- {
- vlib_buffer_free_one (vm, buffer_index);
- return;
- }
-
- /* Find peer, including ourselves. */
- peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
- /* created */ 0);
-
- seq_cmp_result = mc_seq_cmp (mp->local_sequence,
- peer->last_sequence_received + 1);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
- .format_args = "T4i4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, stream_index, rx_sequence;
- i32 seq_cmp_result;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- ed->stream_index = mp->stream_index;
- ed->rx_sequence = mp->local_sequence;
- ed->seq_cmp_result = seq_cmp_result;
- }
-
- if (0 && mp->stream_index == 1 && once == 0)
- {
- once = 1;
- ELOG_TYPE (e, "FAKE lost msg on stream 1");
- ELOG (mcm->elog_main, e, 0);
- return;
- }
-
- peer->last_sequence_received += seq_cmp_result == 0;
- s->user_requests_received++;
-
- if (seq_cmp_result > 0)
- peer->stats.n_msgs_from_future += 1;
-
- /* Send ack even if msg from future */
- if (1)
- {
- mc_msg_user_ack_t *rp;
- u32 bi;
-
- rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
- rp->peer_id = mcm->transport.our_ack_peer_id;
- rp->stream_index = s->index;
- rp->local_sequence = mp->local_sequence;
- rp->seq_cmp_result = seq_cmp_result;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-ack: stream %d local seq %d",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 stream_index;
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->stream_index = rp->stream_index;
- ed->local_sequence = rp->local_sequence;
- }
-
- mc_byte_swap_msg_user_ack (rp);
-
- mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
- /* Msg from past? If so, free the buffer... */
- if (seq_cmp_result < 0)
- {
- vlib_buffer_free_one (vm, buffer_index);
- peer->stats.n_msgs_from_past += 1;
- }
- }
-
- if (seq_cmp_result == 0)
- {
- vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
- switch (s->state)
- {
- case MC_STREAM_STATE_ready:
- vlib_buffer_advance (b, sizeof (mp[0]));
- s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
-
- /* Stream vector can change address via rx callback for mc-internal
- stream. */
- s = mc_stream_by_index (mcm, mp->stream_index);
- ASSERT (s != 0);
- s->last_global_sequence_processed = mp->global_sequence;
- break;
-
- case MC_STREAM_STATE_catchup:
- clib_fifo_add1 (s->catchup_fifo, buffer_index);
- break;
-
- default:
- clib_warning ("stream in unknown state %U",
- format_mc_stream_state, s->state);
- break;
- }
- }
-}
-
-void
-mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
- u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- uword *p;
- mc_stream_t *s;
- mc_stream_peer_t *peer;
- mc_retry_t *r;
- int peer_created = 0;
-
- mc_byte_swap_msg_user_ack (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
- .format_args = "i4T4i4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 local_sequence;
- u32 peer;
- i32 seq_cmp_result;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- ed->seq_cmp_result = mp->seq_cmp_result;
- }
-
- /* Unknown stream? */
- if (!s)
- return;
-
- /* Find the peer which just ack'ed. */
- peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
- /* created */ &peer_created);
-
- /*
- * Peer reports message from the future. If it's not in the retry
- * fifo, look for a retired message.
- */
- if (mp->seq_cmp_result > 0)
- {
- p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
- mp->seq_cmp_result);
- if (p == 0)
- mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
-
- /* Normal retry should fix it... */
- return;
- }
-
- /*
- * Pointer to the indicated retry fifo entry.
- * Worth hashing because we could use a window size of 100 or 1000.
- */
- p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
-
- /*
- * Is this a duplicate ACK, received after we've retired the
- * fifo entry. This can happen when learning about new
- * peers.
- */
- if (p == 0)
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: for seq %d from peer %s no fifo elt",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- }
-
- return;
- }
-
- r = pool_elt_at_index (s->retry_pool, p[0]);
-
- /* Make sure that this new peer ACKs our msgs from now on */
- if (peer_created)
- {
- mc_retry_t *later_retry = next_retry (s, r);
-
- while (later_retry)
- {
- later_retry->unacked_by_peer_bitmap =
- clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
- peer - s->peers);
- later_retry = next_retry (s, later_retry);
- }
- }
-
- ASSERT (mp->local_sequence == r->local_sequence);
-
- /* If we weren't expecting to hear from this peer */
- if (!peer_created &&
- !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
- {
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "dup-ack: for seq %d from peer %s",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = r->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- }
- if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
- return;
- }
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: for seq %d from peer %s",
- .format_args = "i4T4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 peer;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- }
-
- r->unacked_by_peer_bitmap =
- clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
-
- /* Not all clients have ack'ed */
- if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
- {
- return;
- }
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "ack: retire fifo elt loc seq %d after %d acks",
- .format_args = "i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 seq;
- u32 npeers;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = r->local_sequence;
- ed->npeers = pool_elts (s->peers);
- }
-
- hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
- mc_retry_free (mcm, s, r);
- remove_retry_from_pool (s, r);
- maybe_send_window_open_event (vm, s);
-}
-
-#define EVENT_MC_SEND_CATCHUP_DATA 0
-
-static uword
-mc_catchup_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- uword *event_data = 0;
- mc_catchup_process_arg_t *args;
- int i;
-
- while (1)
- {
- if (event_data)
- _vec_len (event_data) = 0;
- vlib_process_wait_for_event_with_type (vm, &event_data,
- EVENT_MC_SEND_CATCHUP_DATA);
-
- for (i = 0; i < vec_len (event_data); i++)
- {
- args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
-
- mcm->transport.catchup_send_fun (mcm->transport.opaque,
- args->catchup_opaque,
- args->catchup_snapshot);
-
- /* Send function will free snapshot data vector. */
- pool_put (mcm->catchup_process_args, args);
- }
- }
-
- return 0; /* not likely */
-}
-
-static void
-serialize_mc_stream (serialize_main_t * m, va_list * va)
-{
- mc_stream_t *s = va_arg (*va, mc_stream_t *);
- mc_stream_peer_t *p;
-
- serialize_integer (m, pool_elts (s->peers), sizeof (u32));
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- u8 * x = serialize_get (m, sizeof (p->id));
- clib_memcpy (x, p->id.as_u8, sizeof (p->id));
- serialize_integer (m, p->last_sequence_received,
- sizeof (p->last_sequence_received));
- }));
-/* *INDENT-ON* */
- serialize_bitmap (m, s->all_peer_bitmap);
-}
-
-void
-unserialize_mc_stream (serialize_main_t * m, va_list * va)
-{
- mc_stream_t *s = va_arg (*va, mc_stream_t *);
- u32 i, n_peers;
- mc_stream_peer_t *p;
-
- unserialize_integer (m, &n_peers, sizeof (u32));
- mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
- for (i = 0; i < n_peers; i++)
- {
- u8 *x;
- pool_get (s->peers, p);
- x = unserialize_get (m, sizeof (p->id));
- clib_memcpy (p->id.as_u8, x, sizeof (p->id));
- unserialize_integer (m, &p->last_sequence_received,
- sizeof (p->last_sequence_received));
- mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
- 0);
- }
- s->all_peer_bitmap = unserialize_bitmap (m);
-
- /* This is really bad. */
- if (!s->all_peer_bitmap)
- clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
-}
-
-void
-mc_msg_catchup_request_handler (mc_main_t * mcm,
- mc_msg_catchup_request_t * req,
- u32 catchup_opaque)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_t *s;
- mc_catchup_process_arg_t *args;
-
- mc_byte_swap_msg_catchup_request (req);
-
- s = mc_stream_by_index (mcm, req->stream_index);
- if (!s || s->state != MC_STREAM_STATE_ready)
- return;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup-request: from %s stream %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, stream;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
- ed->stream = req->stream_index;
- }
-
- /*
- * The application has to snapshoot its data structures right
- * here, right now. If we process any messages after
- * noting the last global sequence we've processed, the client
- * won't be able to accurately reconstruct our data structures.
- *
- * Once the data structures are e.g. vec_dup()'ed, we
- * send the resulting messages from a separate process, to
- * make sure that we don't cause a bunch of message retransmissions
- */
- pool_get (mcm->catchup_process_args, args);
-
- args->stream_index = s - mcm->stream_vector;
- args->catchup_opaque = catchup_opaque;
- args->catchup_snapshot = 0;
-
- /* Construct catchup reply and snapshot state for stream to send as
- catchup reply payload. */
- {
- mc_msg_catchup_reply_t *rep;
- serialize_main_t m;
-
- vec_resize (args->catchup_snapshot, sizeof (rep[0]));
-
- rep = (void *) args->catchup_snapshot;
-
- rep->peer_id = req->peer_id;
- rep->stream_index = req->stream_index;
- rep->last_global_sequence_included = s->last_global_sequence_processed;
-
- /* Setup for serialize to append to catchup snapshot. */
- serialize_open_vector (&m, args->catchup_snapshot);
- m.stream.current_buffer_index = vec_len (m.stream.buffer);
-
- serialize (&m, serialize_mc_stream, s);
-
- args->catchup_snapshot = serialize_close_vector (&m);
-
- /* Actually copy internal state */
- args->catchup_snapshot = s->config.catchup_snapshot
- (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
-
- rep = (void *) args->catchup_snapshot;
- rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
-
- mc_byte_swap_msg_catchup_reply (rep);
- }
-
- /* now go send it... */
- vlib_process_signal_event (vm, mcm->catchup_process,
- EVENT_MC_SEND_CATCHUP_DATA,
- args - mcm->catchup_process_args);
-}
-
-#define EVENT_MC_UNSERIALIZE_BUFFER 0
-#define EVENT_MC_UNSERIALIZE_CATCHUP 1
-
-void
-mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
- u32 catchup_opaque)
-{
- vlib_process_signal_event (mcm->vlib_main,
- mcm->unserialize_process,
- EVENT_MC_UNSERIALIZE_CATCHUP,
- pointer_to_uword (mp));
-}
-
-static void
-perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
-{
- mc_stream_t *s;
- i32 seq_cmp_result;
-
- mc_byte_swap_msg_catchup_reply (mp);
-
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- /* Never heard of this stream or already caught up. */
- if (!s || s->state == MC_STREAM_STATE_ready)
- return;
-
- {
- serialize_main_t m;
- mc_stream_peer_t *p;
- u32 n_stream_bytes;
-
- /* For offline sim replay: save the entire catchup snapshot... */
- if (s->config.save_snapshot)
- s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
- mp->n_data_bytes);
-
- unserialize_open_data (&m, mp->data, mp->n_data_bytes);
- unserialize (&m, unserialize_mc_stream, s);
-
- /* Make sure we start numbering our messages as expected */
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
- s->our_local_sequence = p->last_sequence_received + 1;
- }));
-/* *INDENT-ON* */
-
- n_stream_bytes = m.stream.current_buffer_index;
-
- /* No need to unserialize close; nothing to free. */
-
- /* After serialized stream is user's catchup data. */
- s->config.catchup (mcm, mp->data + n_stream_bytes,
- mp->n_data_bytes - n_stream_bytes);
- }
-
- /* Vector could have been moved by catchup.
- This can only happen for mc-internal stream. */
- s = mc_stream_by_index (mcm, mp->stream_index);
-
- s->last_global_sequence_processed = mp->last_global_sequence_included;
-
- while (clib_fifo_elts (s->catchup_fifo))
- {
- mc_msg_user_request_t *gp;
- u32 bi;
- vlib_buffer_t *b;
-
- clib_fifo_sub1 (s->catchup_fifo, bi);
-
- b = vlib_get_buffer (mcm->vlib_main, bi);
- gp = vlib_buffer_get_current (b);
-
- /* Make sure we're replaying "new" news */
- seq_cmp_result = mc_seq_cmp (gp->global_sequence,
- mp->last_global_sequence_included);
-
- if (seq_cmp_result > 0)
- {
- vlib_buffer_advance (b, sizeof (gp[0]));
- s->config.rx_buffer (mcm, s, gp->peer_id, bi);
- s->last_global_sequence_processed = gp->global_sequence;
-
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup replay local sequence 0x%x",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = gp->local_sequence;
- }
- }
- else
- {
- if (MC_EVENT_LOGGING)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup discard local sequence 0x%x",
- .format_args = "i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 local_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->local_sequence = gp->local_sequence;
- }
-
- vlib_buffer_free_one (mcm->vlib_main, bi);
- }
- }
-
- s->state = MC_STREAM_STATE_ready;
-
- /* Now that we are caught up wake up joining process. */
- {
- vlib_one_time_waiting_process_t *wp;
- vec_foreach (wp, s->procs_waiting_for_join_done)
- vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
- if (s->procs_waiting_for_join_done)
- _vec_len (s->procs_waiting_for_join_done) = 0;
- }
-}
-
-static void
-this_node_maybe_master (mc_main_t * mcm)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_msg_master_assert_t *mp;
- uword event_type;
- int timeouts = 0;
- int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
- clib_error_t *error;
- f64 now, time_last_master_assert = -1;
- u32 bi;
-
- while (1)
- {
- if (!mcm->we_can_be_relay_master)
- {
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become slave (config)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
-
- now = vlib_time_now (vm);
- if (now >= time_last_master_assert + 1)
- {
- time_last_master_assert = now;
- mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
-
- mp->peer_id = mcm->transport.our_ack_peer_id;
- mp->global_sequence = mcm->relay_global_sequence;
-
- /*
- * these messages clog the event log, set MC_EVENT_LOGGING higher
- * if you want them
- */
- if (MC_EVENT_LOGGING > 1)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "tx-massert: peer %s global seq %u",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 peer, global_sequence;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- ed->global_sequence = mp->global_sequence;
- }
-
- mc_byte_swap_msg_master_assert (mp);
-
- error =
- mcm->transport.tx_buffer (mcm->transport.opaque,
- MC_TRANSPORT_MASTERSHIP, bi);
- if (error)
- clib_error_report (error);
- }
-
- vlib_process_wait_for_event_or_clock (vm, 1.0);
- event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
- switch (event_type)
- {
- case ~0:
- if (!is_master && timeouts++ > 2)
- {
- mcm->relay_state = MC_RELAY_STATE_MASTER;
- mcm->relay_master_peer_id =
- mcm->transport.our_ack_peer_id.as_u64;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become master (was maybe_master)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- break;
-
- case MC_RELAY_STATE_SLAVE:
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
- {
- ELOG_TYPE (e, "become slave (was maybe_master)");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- }
-}
-
-static void
-this_node_slave (mc_main_t * mcm)
-{
- vlib_main_t *vm = mcm->vlib_main;
- uword event_type;
- int timeouts = 0;
-
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "become slave");
- ELOG (mcm->elog_main, e, 0);
- }
-
- while (1)
- {
- vlib_process_wait_for_event_or_clock (vm, 1.0);
- event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
- switch (event_type)
- {
- case ~0:
- if (timeouts++ > 2)
- {
- mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
- mcm->relay_master_peer_id = ~0ULL;
- if (MC_EVENT_LOGGING)
- {
- ELOG_TYPE (e, "timeouts; negoitate mastership");
- ELOG (mcm->elog_main, e, 0);
- }
- return;
- }
- break;
-
- case MC_RELAY_STATE_SLAVE:
- mcm->relay_state = MC_RELAY_STATE_SLAVE;
- timeouts = 0;
- break;
- }
- }
-}
-
-static uword
-mc_mastership_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
-
- while (1)
- {
- switch (mcm->relay_state)
- {
- case MC_RELAY_STATE_NEGOTIATE:
- case MC_RELAY_STATE_MASTER:
- this_node_maybe_master (mcm);
- break;
-
- case MC_RELAY_STATE_SLAVE:
- this_node_slave (mcm);
- break;
- }
- }
- return 0; /* not likely */
-}
-
-void
-mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
-{
- if (we_can_be_master != mcm->we_can_be_relay_master)
- {
- mcm->we_can_be_relay_master = we_can_be_master;
- vlib_process_signal_event (mcm->vlib_main,
- mcm->mastership_process,
- MC_RELAY_STATE_NEGOTIATE, 0);
- }
-}
-
-void
-mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
- u32 buffer_index)
-{
- mc_peer_id_t his_peer_id, our_peer_id;
- i32 seq_cmp_result;
- u8 signal_slave = 0;
- u8 update_global_sequence = 0;
-
- mc_byte_swap_msg_master_assert (mp);
-
- his_peer_id = mp->peer_id;
- our_peer_id = mcm->transport.our_ack_peer_id;
-
- /* compare the incoming global sequence with ours */
- seq_cmp_result = mc_seq_cmp (mp->global_sequence,
- mcm->relay_global_sequence);
-
- /* If the sender has a lower peer id and the sender's sequence >=
- our global sequence, we become a slave. Otherwise we are master. */
- if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
- && seq_cmp_result >= 0)
- {
- vlib_process_signal_event (mcm->vlib_main,
- mcm->mastership_process,
- MC_RELAY_STATE_SLAVE, 0);
- signal_slave = 1;
- }
-
- /* Update our global sequence. */
- if (seq_cmp_result > 0)
- {
- mcm->relay_global_sequence = mp->global_sequence;
- update_global_sequence = 1;
- }
-
- {
- uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
- mc_mastership_peer_t *p;
-
- if (q)
- p = vec_elt_at_index (mcm->mastership_peers, q[0]);
- else
- {
- vec_add2 (mcm->mastership_peers, p, 1);
- p->peer_id = his_peer_id;
- mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
- p - mcm->mastership_peers,
- /* old_value */ 0);
- }
- p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
- }
-
- /*
- * these messages clog the event log, set MC_EVENT_LOGGING higher
- * if you want them.
- */
- if (MC_EVENT_LOGGING > 1)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "rx-massert: peer %s global seq %u upd %d slave %d",
- .format_args = "T4i4i1i1",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 peer;
- u32 global_sequence;
- u8 update_sequence;
- u8 slave;
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
- ed->global_sequence = mp->global_sequence;
- ed->update_sequence = update_global_sequence;
- ed->slave = signal_slave;
- }
-}
-
-static void
-mc_serialize_init (mc_main_t * mcm)
-{
- mc_serialize_msg_t *m;
- vlib_main_t *vm = vlib_get_main ();
-
- mcm->global_msg_index_by_name
- = hash_create_string ( /* elts */ 0, sizeof (uword));
-
- m = vm->mc_msg_registrations;
-
- while (m)
- {
- m->global_index = vec_len (mcm->global_msgs);
- hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
- vec_add1 (mcm->global_msgs, m);
- m = m->next_registration;
- }
-}
-
-clib_error_t *
-mc_serialize_va (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, va_list * va)
-{
- mc_stream_t *s;
- clib_error_t *error;
- serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
- vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
- u32 bi, n_before, n_after, n_total, n_this_msg;
- u32 si, gi;
-
- if (!sbm->vlib_main)
- {
- sbm->tx.max_n_data_bytes_per_chain = 4096;
- sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
- }
-
- if (sbm->first_buffer == 0)
- serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
-
- n_before = serialize_vlib_buffer_n_bytes (m);
-
- s = mc_stream_by_index (mc, stream_index);
- gi = msg->global_index;
- ASSERT (msg == vec_elt (mc->global_msgs, gi));
-
- si = ~0;
- if (gi < vec_len (s->stream_msg_index_by_global_index))
- si = s->stream_msg_index_by_global_index[gi];
-
- serialize_likely_small_unsigned_integer (m, si);
-
- /* For first time message is sent, use name to identify message. */
- if (si == ~0 || MSG_ID_DEBUG)
- serialize_cstring (m, msg->name);
-
- if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "serialize-msg: %s index %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[2];
- } *ed;
- ed = ELOG_DATA (mc->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mc, msg->name);
- ed->c[1] = si;
- }
-
- error = va_serialize (m, va);
-
- n_after = serialize_vlib_buffer_n_bytes (m);
- n_this_msg = n_after - n_before;
- n_total = n_after + sizeof (mc_msg_user_request_t);
-
- /* For max message size ignore first message where string name is sent. */
- if (si != ~0)
- msg->max_n_bytes_serialized =
- clib_max (msg->max_n_bytes_serialized, n_this_msg);
-
- if (!multiple_messages_per_vlib_buffer
- || si == ~0
- || n_total + msg->max_n_bytes_serialized >
- mc->transport.max_packet_size)
- {
- bi = serialize_close_vlib_buffer (m);
- sbm->first_buffer = 0;
- if (!error)
- mc_stream_send (mc, stream_index, bi);
- else if (bi != ~0)
- vlib_buffer_free_one (mc->vlib_main, bi);
- }
-
- return error;
-}
-
-clib_error_t *
-mc_serialize_internal (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, ...)
-{
- vlib_main_t *vm = mc->vlib_main;
- va_list va;
- clib_error_t *error;
-
- if (stream_index == ~0)
- {
- if (vm->mc_main && vm->mc_stream_index == ~0)
- vlib_current_process_wait_for_one_time_event_vector
- (vm, &vm->procs_waiting_for_mc_stream_join);
- stream_index = vm->mc_stream_index;
- }
-
- va_start (va, msg);
- error = mc_serialize_va (mc, stream_index,
- multiple_messages_per_vlib_buffer, msg, &va);
- va_end (va);
- return error;
-}
-
-uword
-mc_unserialize_message (mc_main_t * mcm,
- mc_stream_t * s, serialize_main_t * m)
-{
- mc_serialize_stream_msg_t *sm;
- u32 gi, si;
-
- si = unserialize_likely_small_unsigned_integer (m);
-
- if (!(si == ~0 || MSG_ID_DEBUG))
- {
- sm = vec_elt_at_index (s->stream_msgs, si);
- gi = sm->global_index;
- }
- else
- {
- char *name;
-
- unserialize_cstring (m, &name);
-
- if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "unserialize-msg: %s rx index %d",
- .format_args = "T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[2];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- }
-
- {
- uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
- }
-
- /* Unknown message? */
- if (gi == ~0)
- {
- vec_free (name);
- goto done;
- }
-
- vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
- si = s->stream_msg_index_by_global_index[gi];
-
- /* Stream local index unknown? Create it. */
- if (si == ~0)
- {
- vec_add2 (s->stream_msgs, sm, 1);
-
- si = sm - s->stream_msgs;
- sm->global_index = gi;
- s->stream_msg_index_by_global_index[gi] = si;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "msg-bind: stream %d %s to index %d",
- .format_args = "i4T4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[3];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = s->index;
- ed->c[1] = elog_id_for_msg_name (mcm, name);
- ed->c[2] = si;
- }
- }
- else
- {
- sm = vec_elt_at_index (s->stream_msgs, si);
- if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "msg-id-ERROR: %s index %d expected %d",
- .format_args = "T4i4i4",
- };
- /* *INDENT-ON* */
- struct
- {
- u32 c[3];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = ~0;
- if (sm->global_index <
- vec_len (s->stream_msg_index_by_global_index))
- ed->c[2] =
- s->stream_msg_index_by_global_index[sm->global_index];
- }
- }
-
- vec_free (name);
- }
-
- if (gi != ~0)
- {
- mc_serialize_msg_t *msg;
- msg = vec_elt (mcm->global_msgs, gi);
- unserialize (m, msg->unserialize, mcm);
- }
-
-done:
- return gi != ~0;
-}
-
-void
-mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
- vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
- mc_stream_and_buffer_t *sb;
- mc_stream_t *stream;
- u32 buffer_index;
-
- sb =
- pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
- stream_and_buffer_index);
- buffer_index = sb->buffer_index;
- stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
- pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
-
- if (stream->config.save_snapshot)
- {
- u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
- static u8 *contents;
- vec_reset_length (contents);
- vec_validate (contents, n_bytes - 1);
- vlib_buffer_contents (vm, buffer_index, contents);
- stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
- n_bytes);
- }
-
- ASSERT (vlib_in_process_context (vm));
-
- unserialize_open_vlib_buffer (m, vm, sbm);
-
- clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
-
- while (unserialize_vlib_buffer_n_bytes (m) > 0)
- mc_unserialize_message (mcm, stream, m);
-
- /* Frees buffer. */
- unserialize_close_vlib_buffer (m);
-}
-
-void
-mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
-{
- vlib_main_t *vm = mcm->vlib_main;
- mc_stream_and_buffer_t *sb;
- pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
- sb->stream_index = s->index;
- sb->buffer_index = buffer_index;
- vlib_process_signal_event (vm, mcm->unserialize_process,
- EVENT_MC_UNSERIALIZE_BUFFER,
- sb - mcm->mc_unserialize_stream_and_buffers);
-}
-
-static uword
-mc_unserialize_process (vlib_main_t * vm,
- vlib_node_runtime_t * node, vlib_frame_t * f)
-{
- mc_main_t *mcm = mc_node_get_main (node);
- uword event_type, *event_data = 0;
- int i;
-
- while (1)
- {
- if (event_data)
- _vec_len (event_data) = 0;
-
- vlib_process_wait_for_event (vm);
- event_type = vlib_process_get_events (vm, &event_data);
- switch (event_type)
- {
- case EVENT_MC_UNSERIALIZE_BUFFER:
- for (i = 0; i < vec_len (event_data); i++)
- mc_unserialize_internal (mcm, event_data[i]);
- break;
-
- case EVENT_MC_UNSERIALIZE_CATCHUP:
- for (i = 0; i < vec_len (event_data); i++)
- {
- u8 *mp = uword_to_pointer (event_data[i], u8 *);
- perform_catchup (mcm, (void *) mp);
- vec_free (mp);
- }
- break;
-
- default:
- break;
- }
- }
-
- return 0; /* not likely */
-}
-
-void
-serialize_mc_main (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- mc_stream_t *s;
- mc_serialize_stream_msg_t *sm;
- mc_serialize_msg_t *msg;
-
- serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
- vec_foreach (s, mcm->stream_vector)
- {
- /* Stream name. */
- serialize_cstring (m, s->config.name);
-
- /* Serialize global names for all sent messages. */
- serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
- vec_foreach (sm, s->stream_msgs)
- {
- msg = vec_elt (mcm->global_msgs, sm->global_index);
- serialize_cstring (m, msg->name);
- }
- }
-}
-
-void
-unserialize_mc_main (serialize_main_t * m, va_list * va)
-{
- mc_main_t *mcm = va_arg (*va, mc_main_t *);
- u32 i, n_streams, n_stream_msgs;
- char *name;
- mc_stream_t *s;
- mc_serialize_stream_msg_t *sm;
-
- unserialize_integer (m, &n_streams, sizeof (u32));
- for (i = 0; i < n_streams; i++)
- {
- unserialize_cstring (m, &name);
- if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
- {
- vec_validate (mcm->stream_vector, i);
- s = vec_elt_at_index (mcm->stream_vector, i);
- mc_stream_init (s);
- s->index = s - mcm->stream_vector;
- s->config.name = name;
- s->state = MC_STREAM_STATE_name_known;
- hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
- }
- else
- vec_free (name);
-
- s = vec_elt_at_index (mcm->stream_vector, i);
-
- vec_free (s->stream_msgs);
- vec_free (s->stream_msg_index_by_global_index);
-
- unserialize_integer (m, &n_stream_msgs, sizeof (u32));
- vec_resize (s->stream_msgs, n_stream_msgs);
- vec_foreach (sm, s->stream_msgs)
- {
- uword *p;
- u32 si, gi;
-
- unserialize_cstring (m, &name);
- p = hash_get (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
- si = sm - s->stream_msgs;
-
- if (MC_EVENT_LOGGING > 0)
- {
- /* *INDENT-OFF* */
- ELOG_TYPE_DECLARE (e) =
- {
- .format = "catchup-bind: %s to %d global index %d stream %d",
- .format_args = "T4i4i4i4",
- };
- /* *INDENT-ON* */
-
- struct
- {
- u32 c[4];
- } *ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = gi;
- ed->c[3] = s->index;
- }
-
- vec_free (name);
-
- sm->global_index = gi;
- if (gi != ~0)
- {
- vec_validate_init_empty (s->stream_msg_index_by_global_index,
- gi, ~0);
- s->stream_msg_index_by_global_index[gi] = si;
- }
- }
- }
-}
-
-void
-mc_main_init (mc_main_t * mcm, char *tag)
-{
- vlib_main_t *vm = vlib_get_main ();
-
- mcm->vlib_main = vm;
- mcm->elog_main = &vm->elog_main;
-
- mcm->relay_master_peer_id = ~0ULL;
- mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
-
- mcm->stream_index_by_name
- = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
-
- {
- vlib_node_registration_t r;
-
- memset (&r, 0, sizeof (r));
-
- r.type = VLIB_NODE_TYPE_PROCESS;
-
- /* Point runtime data to main instance. */
- r.runtime_data = &mcm;
- r.runtime_data_bytes = sizeof (&mcm);
-
- r.name = (char *) format (0, "mc-mastership-%s", tag);
- r.function = mc_mastership_process;
- mcm->mastership_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-join-ager-%s", tag);
- r.function = mc_join_ager_process;
- mcm->join_ager_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-retry-%s", tag);
- r.function = mc_retry_process;
- mcm->retry_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-catchup-%s", tag);
- r.function = mc_catchup_process;
- mcm->catchup_process = vlib_register_node (vm, &r);
-
- r.name = (char *) format (0, "mc-unserialize-%s", tag);
- r.function = mc_unserialize_process;
- mcm->unserialize_process = vlib_register_node (vm, &r);
- }
-
- if (MC_EVENT_LOGGING > 0)
- mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
- sizeof (mc_peer_id_t));
-
- mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
- sizeof (mc_peer_id_t));
- mc_serialize_init (mcm);
-}
-
-static u8 *
-format_mc_relay_state (u8 * s, va_list * args)
-{
- mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
- char *t = 0;
- switch (state)
- {
- case MC_RELAY_STATE_NEGOTIATE:
- t = "negotiate";
- break;
- case MC_RELAY_STATE_MASTER:
- t = "master";
- break;
- case MC_RELAY_STATE_SLAVE:
- t = "slave";
- break;
- default:
- return format (s, "unknown 0x%x", state);
- }
-
- return format (s, "%s", t);
-}
-
-static u8 *
-format_mc_stream_state (u8 * s, va_list * args)
-{
- mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
- char *t = 0;
- switch (state)
- {
-#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
- foreach_mc_stream_state
-#undef _
- default:
- return format (s, "unknown 0x%x", state);
- }
-
- return format (s, "%s", t);
-}
-
-static int
-mc_peer_comp (void *a1, void *a2)
-{
- mc_stream_peer_t *p1 = a1;
- mc_stream_peer_t *p2 = a2;
-
- return mc_peer_id_compare (p1->id, p2->id);
-}
-
-u8 *
-format_mc_main (u8 * s, va_list * args)
-{
- mc_main_t *mcm = va_arg (*args, mc_main_t *);
- mc_stream_t *t;
- mc_stream_peer_t *p, *ps;
- u32 indent = format_get_indent (s);
-
- s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
- format_mc_relay_state, mcm->relay_state,
- vec_len (mcm->stream_vector), mcm->relay_global_sequence);
-
- {
- mc_mastership_peer_t *mp;
- f64 now = vlib_time_now (mcm->vlib_main);
- s = format (s, "\n%UMost recent mastership peers:",
- format_white_space, indent + 2);
- vec_foreach (mp, mcm->mastership_peers)
- {
- s = format (s, "\n%U%-30U%.4e",
- format_white_space, indent + 4,
- mcm->transport.format_peer_id, mp->peer_id,
- now - mp->time_last_master_assert_received);
- }
- }
-
- vec_foreach (t, mcm->stream_vector)
- {
- s = format (s, "\n%Ustream `%s' index %d",
- format_white_space, indent + 2, t->config.name, t->index);
-
- s = format (s, "\n%Ustate %U",
- format_white_space, indent + 4,
- format_mc_stream_state, t->state);
-
- s =
- format (s,
- "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
- format_white_space, indent + 4, t->config.retry_interval,
- t->config.retry_limit, pool_elts (t->retry_pool),
- t->stats.n_retries - t->stats_last_clear.n_retries);
-
- s = format (s, "\n%U%Ld/%Ld user requests sent/received",
- format_white_space, indent + 4,
- t->user_requests_sent, t->user_requests_received);
-
- s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
- format_white_space, indent + 4,
- pool_elts (t->peers),
- t->our_local_sequence, t->last_global_sequence_processed);
-
- ps = 0;
- /* *INDENT-OFF* */
- pool_foreach (p, t->peers,
- ({
- if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
- vec_add1 (ps, p[0]);
- }));
- /* *INDENT-ON* */
- vec_sort_with_function (ps, mc_peer_comp);
- s = format (s, "\n%U%=30s%10s%16s%16s",
- format_white_space, indent + 6,
- "Peer", "Last seq", "Retries", "Future");
-
- vec_foreach (p, ps)
- {
- s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
- format_white_space, indent + 6,
- mcm->transport.format_peer_id, p->id.as_u64,
- p->last_sequence_received,
- p->stats.n_msgs_from_past -
- p->stats_last_clear.n_msgs_from_past,
- p->stats.n_msgs_from_future -
- p->stats_last_clear.n_msgs_from_future,
- (mcm->transport.our_ack_peer_id.as_u64 ==
- p->id.as_u64 ? " (self)" : ""));
- }
- vec_free (ps);
- }
-
- return s;
-}
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */
diff --git a/src/vlib/mc.h b/src/vlib/mc.h
deleted file mode 100644
index 28f94350537..00000000000
--- a/src/vlib/mc.h
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * mc.h: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef included_vlib_mc_h
-#define included_vlib_mc_h
-
-#include <vppinfra/elog.h>
-#include <vppinfra/fifo.h>
-#include <vppinfra/mhash.h>
-#include <vlib/node.h>
-
-#ifndef MC_EVENT_LOGGING
-#define MC_EVENT_LOGGING 1
-#endif
-
-always_inline uword
-mc_need_byte_swap (void)
-{
- return CLIB_ARCH_IS_LITTLE_ENDIAN;
-}
-
-/*
- * Used to uniquely identify hosts.
- * For IP4 this would be ip4_address plus tcp/udp port.
- */
-typedef union
-{
- u8 as_u8[8];
- u64 as_u64;
-} mc_peer_id_t;
-
-always_inline mc_peer_id_t
-mc_byte_swap_peer_id (mc_peer_id_t i)
-{
- /* Peer id is already in network byte order. */
- return i;
-}
-
-always_inline int
-mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
-{
- return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
-}
-
-/* Assert mastership. Lowest peer_id amount all peers wins mastership.
- Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
- So, we don't need a message opcode. */
-typedef CLIB_PACKED (struct
- {
- /* Peer id asserting mastership. */
- mc_peer_id_t peer_id;
- /* Global sequence number asserted. */
- u32 global_sequence;}) mc_msg_master_assert_t;
-
-always_inline void
-mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- }
-}
-
-#define foreach_mc_msg_type \
- _ (master_assert) \
- _ (join_or_leave_request) \
- _ (join_reply) \
- _ (user_request) \
- _ (user_ack) \
- _ (catchup_request) \
- _ (catchup_reply)
-
-typedef enum
-{
-#define _(f) MC_MSG_TYPE_##f,
- foreach_mc_msg_type
-#undef _
-} mc_relay_msg_type_t;
-
-/* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
- {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
- /* MC_MSG_TYPE_join_or_leave_request */
- /* Stream to join or leave. */
- u32 stream_index;
- /* join = 1, leave = 0 */
- u8 is_join;}) mc_msg_join_or_leave_request_t;
-
-always_inline void
-mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->type = clib_byte_swap_u32 (r->type);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- }
-}
-
-/* Join reply. Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
- {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
- /* MC_MSG_TYPE_join_reply */
- u32 stream_index;
- /* Peer ID to contact to catchup with this stream. */
- mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
-
-always_inline void
-mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->type = clib_byte_swap_u32 (r->type);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
- }
-}
-
-/* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
- relayed by relay master after filling in global sequence number. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id; u32 stream_index;
- /* Global sequence number as filled in by relay master. */
- u32 global_sequence;
- /* Local sequence number as filled in by peer sending message. */
- u32 local_sequence;
- /* Size of request data. */
- u32 n_data_bytes;
- /* Opaque request data. */
- u8 data[0];}) mc_msg_user_request_t;
-
-always_inline void
-mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
- r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
- }
-}
-
-/* Sent unicast over ACK channel. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id;
- u32 global_sequence; u32 stream_index;
- u32 local_sequence;
- i32 seq_cmp_result;}) mc_msg_user_ack_t;
-
-always_inline void
-mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
- r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
- }
-}
-
-/* Sent/received unicast over catchup channel (e.g. using TCP). */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id;
- u32 stream_index;}) mc_msg_catchup_request_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- }
-}
-
-/* Sent/received unicast over catchup channel. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id; u32 stream_index;
- /* Last global sequence number included in catchup data. */
- u32 last_global_sequence_included;
- /* Size of catchup data. */
- u32 n_data_bytes;
- /* Catchup data. */
- u8 data[0];}) mc_msg_catchup_reply_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->last_global_sequence_included =
- clib_byte_swap_u32 (r->last_global_sequence_included);
- r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
- }
-}
-
-typedef struct _mc_serialize_msg
-{
- /* Name for this type. */
- char *name;
-
- /* Functions to serialize/unserialize data. */
- serialize_function_t *serialize;
- serialize_function_t *unserialize;
-
- /* Maximum message size in bytes when serialized.
- If zero then this will be set to the largest sent message. */
- u32 max_n_bytes_serialized;
-
- /* Opaque to use for first argument to serialize/unserialize function. */
- u32 opaque;
-
- /* Index in global message vector. */
- u32 global_index;
-
- /* Registration list */
- struct _mc_serialize_msg *next_registration;
-} mc_serialize_msg_t;
-
-typedef struct
-{
- /* Index into global message vector. */
- u32 global_index;
-} mc_serialize_stream_msg_t;
-
-#define MC_SERIALIZE_MSG(x,...) \
- __VA_ARGS__ mc_serialize_msg_t x; \
-static void __mc_serialize_msg_registration_##x (void) \
- __attribute__((__constructor__)) ; \
-static void __mc_serialize_msg_registration_##x (void) \
-{ \
- vlib_main_t * vm = vlib_get_main(); \
- x.next_registration = vm->mc_msg_registrations; \
- vm->mc_msg_registrations = &x; \
-} \
-static void __mc_serialize_msg_unregistration_##x (void) \
- __attribute__((__destructor__)) ; \
-static void __mc_serialize_msg_unregistration_##x (void) \
-{ \
- vlib_main_t * vm = vlib_get_main(); \
- VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
- next_registration); \
-} \
-__VA_ARGS__ mc_serialize_msg_t x
-
-typedef enum
-{
- MC_TRANSPORT_MASTERSHIP,
- MC_TRANSPORT_JOIN,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
- MC_N_TRANSPORT_TYPE,
-} mc_transport_type_t;
-
-typedef struct
-{
- clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
- u32 buffer_index);
-
- clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
- u32 buffer_index);
-
- /* Returns catchup opaque. */
- uword (*catchup_request_fun) (void *opaque, u32 stream_index,
- mc_peer_id_t catchup_peer_id);
-
- void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
- u8 * data_vector);
-
- /* Opaque passed to callbacks. */
- void *opaque;
-
- mc_peer_id_t our_ack_peer_id;
- mc_peer_id_t our_catchup_peer_id;
-
- /* Max packet size (MTU) for this transport.
- For IP this is interface MTU less IP + UDP header size. */
- u32 max_packet_size;
-
- format_function_t *format_peer_id;
-} mc_transport_t;
-
-typedef struct
-{
- /* Count of messages received from this peer from the past/future
- (with seq_cmp != 0). */
- u64 n_msgs_from_past;
- u64 n_msgs_from_future;
-} mc_stream_peer_stats_t;
-
-typedef struct
-{
- /* ID of this peer. */
- mc_peer_id_t id;
-
- /* The last sequence we received from this peer. */
- u32 last_sequence_received;
-
- mc_stream_peer_stats_t stats, stats_last_clear;
-} mc_stream_peer_t;
-
-typedef struct
-{
- u32 buffer_index;
-
- /* Cached copy of local sequence number from buffer. */
- u32 local_sequence;
-
- /* Number of times this buffer has been sent (retried). */
- u32 n_retries;
-
- /* Previous/next retries in doubly-linked list. */
- u32 prev_index, next_index;
-
- /* Bitmap of all peers which have acked this msg */
- uword *unacked_by_peer_bitmap;
-
- /* Message send or resend time */
- f64 sent_at;
-} mc_retry_t;
-
-typedef struct
-{
- /* Number of retries sent for this stream. */
- u64 n_retries;
-} mc_stream_stats_t;
-
-struct mc_main_t;
-struct mc_stream_t;
-
-typedef struct
-{
- /* Stream name. */
- char *name;
-
- /* Number of outstanding messages. */
- u32 window_size;
-
- /* Retry interval, in seconds */
- f64 retry_interval;
-
- /* Retry limit */
- u32 retry_limit;
-
- /* User rx buffer callback */
- void (*rx_buffer) (struct mc_main_t * mc_main,
- struct mc_stream_t * stream,
- mc_peer_id_t peer_id, u32 buffer_index);
-
- /* User callback to create a snapshot */
- u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
- u8 * snapshot_vector,
- u32 last_global_sequence_included);
-
- /* User callback to replay a snapshot */
- void (*catchup) (struct mc_main_t * mc_main,
- u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
- /* Callback to save a snapshot for offline replay */
- void (*save_snapshot) (struct mc_main_t * mc_main,
- u32 is_catchup,
- u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
- /* Called when a peer dies */
- void (*peer_died) (struct mc_main_t * mc_main,
- struct mc_stream_t * stream, mc_peer_id_t peer_id);
-} mc_stream_config_t;
-
-#define foreach_mc_stream_state \
- _ (invalid) \
- _ (name_known) \
- _ (join_in_progress) \
- _ (catchup) \
- _ (ready)
-
-typedef enum
-{
-#define _(f) MC_STREAM_STATE_##f,
- foreach_mc_stream_state
-#undef _
-} mc_stream_state_t;
-
-typedef struct mc_stream_t
-{
- mc_stream_config_t config;
-
- mc_stream_state_t state;
-
- /* Index in stream pool. */
- u32 index;
-
- /* Stream index 0 is always for MC internal use. */
-#define MC_STREAM_INDEX_INTERNAL 0
-
- mc_retry_t *retry_pool;
-
- /* Head and tail index of retry pool. */
- u32 retry_head_index, retry_tail_index;
-
- /*
- * Country club for recently retired messages
- * If the set of peers is expanding and a new peer
- * misses a message, we can easily retire the FIFO
- * element before we even know about the new peer
- */
- mc_retry_t *retired_fifo;
-
- /* Hash mapping local sequence to retry pool index. */
- uword *retry_index_by_local_sequence;
-
- /* catch-up fifo of VLIB buffer indices.
- start recording when catching up. */
- u32 *catchup_fifo;
-
- mc_stream_stats_t stats, stats_last_clear;
-
- /* Peer pool. */
- mc_stream_peer_t *peers;
-
- /* Bitmap with ones for all peers in peer pool. */
- uword *all_peer_bitmap;
-
- /* Map of 64 bit id to index in stream pool. */
- mhash_t peer_index_by_id;
-
- /* Timeout, in case we're alone in the world */
- f64 join_timeout;
-
- vlib_one_time_waiting_process_t *procs_waiting_for_join_done;
-
- vlib_one_time_waiting_process_t *procs_waiting_for_open_window;
-
- /* Next sequence number to use */
- u32 our_local_sequence;
-
- /*
- * Last global sequence we processed.
- * When supplying catchup data, we need to tell
- * the client precisely where to start replaying
- */
- u32 last_global_sequence_processed;
-
- /* Vector of unique messages we've sent on this stream. */
- mc_serialize_stream_msg_t *stream_msgs;
-
- /* Vector global message index into per stream message index. */
- u32 *stream_msg_index_by_global_index;
-
- /* Hashed by message name. */
- uword *stream_msg_index_by_name;
-
- u64 user_requests_sent;
- u64 user_requests_received;
-} mc_stream_t;
-
-always_inline void
-mc_stream_free (mc_stream_t * s)
-{
- pool_free (s->retry_pool);
- hash_free (s->retry_index_by_local_sequence);
- clib_fifo_free (s->catchup_fifo);
- pool_free (s->peers);
- mhash_free (&s->peer_index_by_id);
- vec_free (s->procs_waiting_for_join_done);
- vec_free (s->procs_waiting_for_open_window);
-}
-
-always_inline void
-mc_stream_init (mc_stream_t * s)
-{
- memset (s, 0, sizeof (s[0]));
- s->retry_head_index = s->retry_tail_index = ~0;
-}
-
-typedef struct
-{
- u32 stream_index;
- u32 catchup_opaque;
- u8 *catchup_snapshot;
-} mc_catchup_process_arg_t;
-
-typedef enum
-{
- MC_RELAY_STATE_NEGOTIATE,
- MC_RELAY_STATE_MASTER,
- MC_RELAY_STATE_SLAVE,
-} mc_relay_state_t;
-
-typedef struct
-{
- mc_peer_id_t peer_id;
-
- f64 time_last_master_assert_received;
-} mc_mastership_peer_t;
-
-typedef struct
-{
- u32 stream_index;
- u32 buffer_index;
-} mc_stream_and_buffer_t;
-
-typedef struct mc_main_t
-{
- mc_relay_state_t relay_state;
-
- /* Mastership */
- u32 we_can_be_relay_master;
-
- u64 relay_master_peer_id;
-
- mc_mastership_peer_t *mastership_peers;
-
- /* Map of 64 bit id to index in stream pool. */
- mhash_t mastership_peer_index_by_id;
-
- /* The transport we're using. */
- mc_transport_t transport;
-
- /* Last-used global sequence number. */
- u32 relay_global_sequence;
-
- /* Vector of streams. */
- mc_stream_t *stream_vector;
-
- /* Hash table mapping stream name to pool index. */
- uword *stream_index_by_name;
-
- uword *procs_waiting_for_stream_name_by_name;
-
- vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool;
-
- int joins_in_progress;
-
- mc_catchup_process_arg_t *catchup_process_args;
-
- /* Node indices for mastership, join ager,
- retry and catchup processes. */
- u32 mastership_process;
- u32 join_ager_process;
- u32 retry_process;
- u32 catchup_process;
- u32 unserialize_process;
-
- /* Global vector of messages. */
- mc_serialize_msg_t **global_msgs;
-
- /* Hash table mapping message name to index. */
- uword *global_msg_index_by_name;
-
- /* Shared serialize/unserialize main. */
- serialize_main_t serialize_mains[VLIB_N_RX_TX];
-
- vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
-
- /* Convenience variables */
- struct vlib_main_t *vlib_main;
- elog_main_t *elog_main;
-
- /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
- mhash_t elog_id_by_peer_id;
-
- uword *elog_id_by_msg_name;
-
- /* For mc_unserialize. */
- mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers;
-} mc_main_t;
-
-always_inline mc_stream_t *
-mc_stream_by_name (mc_main_t * m, char *name)
-{
- uword *p = hash_get (m->stream_index_by_name, name);
- return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
-}
-
-always_inline mc_stream_t *
-mc_stream_by_index (mc_main_t * m, u32 i)
-{
- return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
-}
-
-always_inline void
-mc_clear_stream_stats (mc_main_t * m)
-{
- mc_stream_t *s;
- mc_stream_peer_t *p;
- vec_foreach (s, m->stream_vector)
- {
- s->stats_last_clear = s->stats;
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- p->stats_last_clear = p->stats;
- }));
- /* *INDENT-ON* */
- }
-}
-
-/* Declare all message handlers. */
-#define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
-foreach_mc_msg_type
-#undef _
- u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
-
-void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
-
-void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
-
-u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
-
-void mc_main_init (mc_main_t * mcm, char *tag);
-
-void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
-
-void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
- u32 * bi_return);
-
-format_function_t format_mc_main;
-
-clib_error_t *mc_serialize_internal (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, ...);
-
-clib_error_t *mc_serialize_va (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, va_list * va);
-
-#define mc_serialize_stream(mc,si,msg,args...) \
- mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize(mc,msg,args...) \
- mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize2(mc,add,msg,args...) \
- mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
-
-void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
-uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
- serialize_main_t * m);
-
-serialize_function_t serialize_mc_main, unserialize_mc_main;
-
-always_inline uword
-mc_max_message_size_in_bytes (mc_main_t * mcm)
-{
- return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
-}
-
-always_inline word
-mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
-{
- return mc_max_message_size_in_bytes (mcm) -
- serialize_vlib_buffer_n_bytes (m);
-}
-
-void unserialize_mc_stream (serialize_main_t * m, va_list * va);
-void mc_stream_join_process_hold (void);
-
-#endif /* included_vlib_mc_h */
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */
diff --git a/src/vlib/threads.c b/src/vlib/threads.c
index 055998adac8..981209bf2e7 100644
--- a/src/vlib/threads.c
+++ b/src/vlib/threads.c
@@ -43,8 +43,8 @@ vlib_thread_main_t vlib_thread_main;
* imapacts observed timings.
*/
-static u32
-elog_id_for_msg_name (const char *msg_name)
+u32
+elog_global_id_for_msg_name (const char *msg_name)
{
uword *p, r;
static uword *h;
@@ -85,7 +85,8 @@ barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
ed = ELOG_DATA (&vlib_global_main.elog_main, e);
ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
- ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
+ ed->caller = elog_global_id_for_msg_name
+ (vlib_worker_threads[0].barrier_caller);
ed->t_entry = (int) (1000000.0 * t_entry);
ed->t_open = (int) (1000000.0 * t_open);
ed->t_closed = (int) (1000000.0 * t_closed);
@@ -111,7 +112,8 @@ barrier_trace_sync_rec (f64 t_entry)
ed = ELOG_DATA (&vlib_global_main.elog_main, e);
ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
- ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
+ ed->caller = elog_global_id_for_msg_name
+ (vlib_worker_threads[0].barrier_caller);
}
static inline void
diff --git a/src/vlib/threads.h b/src/vlib/threads.h
index 71b5d0c8261..bb7c164c2e3 100644
--- a/src/vlib/threads.h
+++ b/src/vlib/threads.h
@@ -597,6 +597,7 @@ vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
args);
void vlib_rpc_call_main_thread (void *function, u8 * args, u32 size);
+u32 elog_global_id_for_msg_name (const char *msg_name);
#endif /* included_vlib_threads_h */
/*
diff --git a/src/vlib/threads_cli.c b/src/vlib/threads_cli.c
index a47d86401bc..86c02479892 100644
--- a/src/vlib/threads_cli.c
+++ b/src/vlib/threads_cli.c
@@ -532,29 +532,6 @@ VLIB_CLI_COMMAND (cmd_test_frame_queue_threshold,static) = {
};
/* *INDENT-ON* */
-static clib_error_t *
-test_threads_barrier_elog_command_fn (vlib_main_t * vm,
- unformat_input_t * input,
- vlib_cli_command_t * cmd)
-{
- if (unformat (input, "enable"))
- vlib_worker_threads->barrier_elog_enabled = 1;
- else if (unformat (input, "disable"))
- vlib_worker_threads->barrier_elog_enabled = 0;
- else
- return clib_error_return (0, "please choose enable or disable");
- return 0;
-}
-
-/* *INDENT-OFF* */
-VLIB_CLI_COMMAND (test_elog_vector_length_trigger, static) =
-{
- .path = "test threads barrier-elog",
- .short_help = "test threads barrier-elog",
- .function = test_threads_barrier_elog_command_fn,
-};
-/* *INDENT-ON* */
-
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vlib/vlib.h b/src/vlib/vlib.h
index 21c5b2aa814..ba72f46aab2 100644
--- a/src/vlib/vlib.h
+++ b/src/vlib/vlib.h
@@ -56,7 +56,6 @@ struct vlib_main_t;
#include <vlib/counter.h>
#include <vlib/error.h>
#include <vlib/init.h>
-#include <vlib/mc.h>
#include <vlib/node.h>
#include <vlib/trace.h>
#include <vlib/log.h>