summaryrefslogtreecommitdiffstats
path: root/vlib/vlib/mc.c
diff options
context:
space:
mode:
Diffstat (limited to 'vlib/vlib/mc.c')
-rw-r--r--vlib/vlib/mc.c1716
1 files changed, 981 insertions, 735 deletions
diff --git a/vlib/vlib/mc.c b/vlib/vlib/mc.c
index 0942666713e..8fde091389e 100644
--- a/vlib/vlib/mc.c
+++ b/vlib/vlib/mc.c
@@ -17,7 +17,7 @@
#include <vlib/vlib.h>
-/*
+/*
* 1 to enable msg id training wheels, which are useful for tracking
* down catchup and/or partitioned network problems
*/
@@ -25,32 +25,32 @@
static format_function_t format_mc_stream_state;
-static u32 elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
+static u32
+elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
{
- uword * p, r;
- mhash_t * h = &m->elog_id_by_peer_id;
+ uword *p, r;
+ mhash_t *h = &m->elog_id_by_peer_id;
- if (! m->elog_id_by_peer_id.hash)
+ if (!m->elog_id_by_peer_id.hash)
mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
-
+
p = mhash_get (h, &peer_id);
if (p)
return p[0];
- r = elog_string (m->elog_main, "%U",
- m->transport.format_peer_id, peer_id);
+ r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
mhash_set (h, &peer_id, r, /* old_value */ 0);
return r;
}
-static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
+static u32
+elog_id_for_msg_name (mc_main_t * m, char *msg_name)
{
- uword * p, r;
- uword * h = m->elog_id_by_msg_name;
+ uword *p, r;
+ uword *h = m->elog_id_by_msg_name;
u8 *name_copy;
- if (! h)
- h = m->elog_id_by_msg_name
- = hash_create_string (0, sizeof (uword));
+ if (!h)
+ h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
p = hash_get_mem (h, msg_name);
if (p)
@@ -65,15 +65,23 @@ static u32 elog_id_for_msg_name (mc_main_t * m, char *msg_name)
return r;
}
-static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 retry_count)
+static void
+elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
+ u32 retry_count)
{
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "tx-msg: stream %d local seq %d attempt %d",
- .format_args = "i4i4i4",
- };
- struct { u32 stream_id, local_sequence, retry_count; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "tx-msg: stream %d local seq %d attempt %d",
+ .format_args = "i4i4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 stream_id, local_sequence, retry_count;
+ } *ed;
ed = ELOG_DATA (m->elog_main, e);
ed->stream_id = stream_id;
ed->local_sequence = local_sequence;
@@ -83,7 +91,7 @@ static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 r
/*
* seq_cmp
- * correctly compare two unsigned sequence numbers.
+ * correctly compare two unsigned sequence numbers.
* This function works so long as x and y are within 2**(n-1) of each
* other, where n = bits(x, y).
*
@@ -92,13 +100,17 @@ static void elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, u32 r
* seq_cmp < 0 => x is "in the past" with respect to y
* seq_cmp > 0 => x is "in the future" with respect to y
*/
-always_inline i32 mc_seq_cmp (u32 x, u32 y)
-{ return (i32) x - (i32) y;}
+always_inline i32
+mc_seq_cmp (u32 x, u32 y)
+{
+ return (i32) x - (i32) y;
+}
-void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
+void *
+mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
{
u32 n_alloc, bi;
- vlib_buffer_t * b;
+ vlib_buffer_t *b;
n_alloc = vlib_buffer_alloc (vm, &bi, 1);
ASSERT (n_alloc == 1);
@@ -111,10 +123,9 @@ void * mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
static void
delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
- uword index,
- int notify_application)
+ uword index, int notify_application)
{
- mc_stream_peer_t * p = pool_elt_at_index (s->peers, index);
+ mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
ASSERT (p != 0);
if (s->config.peer_died && notify_application)
s->config.peer_died (mcm, s, p->id);
@@ -123,11 +134,17 @@ delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "delete peer %s from all_peer_bitmap",
- .format_args = "T4",
- };
- struct { u32 peer; } * ed = 0;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "delete peer %s from all_peer_bitmap",
+ .format_args = "T4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 peer;
+ } *ed = 0;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
@@ -137,11 +154,10 @@ delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
static mc_stream_peer_t *
get_or_create_peer_with_id (mc_main_t * mcm,
- mc_stream_t * s, mc_peer_id_t id,
- int * created)
+ mc_stream_t * s, mc_peer_id_t id, int *created)
{
- uword * q = mhash_get (&s->peer_index_by_id, &id);
- mc_stream_peer_t * p;
+ uword *q = mhash_get (&s->peer_index_by_id, &id);
+ mc_stream_peer_t *p;
if (q)
{
@@ -157,16 +173,24 @@ get_or_create_peer_with_id (mc_main_t * mcm,
if (created)
*created = 1;
- done:
+done:
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "get_or_create %s peer %s stream %d seq %d",
- .format_args = "t4T4i4i4",
- .n_enum_strings = 2,
- .enum_strings = { "old", "new", },
- };
- struct { u32 is_new, peer, stream_index, rx_sequence; } * ed = 0;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "get_or_create %s peer %s stream %d seq %d",
+ .format_args = "t4T4i4i4",
+ .n_enum_strings = 2,
+ .enum_strings = {
+ "old", "new",
+ },
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 is_new, peer, stream_index, rx_sequence;
+ } *ed = 0;
ed = ELOG_DATA (mcm->elog_main, e);
ed->is_new = q ? 0 : 1;
@@ -179,9 +203,10 @@ get_or_create_peer_with_id (mc_main_t * mcm,
return p;
}
-static void maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
+static void
+maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
{
- vlib_one_time_waiting_process_t * p;
+ vlib_one_time_waiting_process_t *p;
if (pool_elts (stream->retry_pool) >= stream->config.window_size)
return;
@@ -193,64 +218,77 @@ static void maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream
_vec_len (stream->procs_waiting_for_open_window) = 0;
}
-static void mc_retry_free (mc_main_t * mcm, mc_stream_t *s, mc_retry_t * r)
+static void
+mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
{
mc_retry_t record, *retp;
-
+
if (r->unacked_by_peer_bitmap)
_vec_len (r->unacked_by_peer_bitmap) = 0;
- if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
+ if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
{
clib_fifo_sub1 (s->retired_fifo, record);
vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
}
-
+
clib_fifo_add2 (s->retired_fifo, retp);
-
+
retp->buffer_index = r->buffer_index;
retp->local_sequence = r->local_sequence;
r->buffer_index = ~0; /* poison buffer index in this retry */
}
-static void mc_resend_retired (mc_main_t *mcm, mc_stream_t *s, u32 local_sequence)
+static void
+mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
{
mc_retry_t *retry;
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "resend-retired: search for local seq %d",
- .format_args = "i4",
- };
- struct { u32 local_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "resend-retired: search for local seq %d",
+ .format_args = "i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 local_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->local_sequence = local_sequence;
}
- clib_fifo_foreach
- (retry, s->retired_fifo,
- ({
- if (retry->local_sequence == local_sequence)
- {
- elog_tx_msg (mcm, s->index, retry->local_sequence, -13);
-
- mcm->transport.tx_buffer
- (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- retry->buffer_index);
- return;
- }
- }));
+ /* *INDENT-OFF* */
+ clib_fifo_foreach (retry, s->retired_fifo,
+ ({
+ if (retry->local_sequence == local_sequence)
+ {
+ elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
+ mcm->transport.tx_buffer (mcm->transport.opaque,
+ MC_TRANSPORT_USER_REQUEST_TO_RELAY,
+ retry->buffer_index);
+ return;
+ }
+ }));
+ /* *INDENT-ON* */
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "resend-retired: FAILED search for local seq %d",
- .format_args = "i4",
- };
- struct { u32 local_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "resend-retired: FAILED search for local seq %d",
+ .format_args = "i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 local_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->local_sequence = local_sequence;
}
@@ -259,11 +297,11 @@ static void mc_resend_retired (mc_main_t *mcm, mc_stream_t *s, u32 local_sequenc
static uword *
delete_retry_fifo_elt (mc_main_t * mcm,
mc_stream_t * stream,
- mc_retry_t * r,
- uword * dead_peer_bitmap)
+ mc_retry_t * r, uword * dead_peer_bitmap)
{
- mc_stream_peer_t * p;
+ mc_stream_peer_t *p;
+ /* *INDENT-OFF* */
pool_foreach (p, stream->peers, ({
uword pi = p - stream->peers;
uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
@@ -285,7 +323,8 @@ delete_retry_fifo_elt (mc_main_t * mcm,
ed->is_alive = is_alive;
}
}));
-
+ /* *INDENT-ON* */
+
hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
mc_retry_free (mcm, stream, r);
@@ -296,23 +335,21 @@ always_inline mc_retry_t *
prev_retry (mc_stream_t * s, mc_retry_t * r)
{
return (r->prev_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->prev_index)
- : 0);
+ ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
}
always_inline mc_retry_t *
next_retry (mc_stream_t * s, mc_retry_t * r)
{
return (r->next_index != ~0
- ? pool_elt_at_index (s->retry_pool, r->next_index)
- : 0);
+ ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
}
always_inline void
remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
{
- mc_retry_t * p = prev_retry (s, r);
- mc_retry_t * n = next_retry (s, r);
+ mc_retry_t *p = prev_retry (s, r);
+ mc_retry_t *n = next_retry (s, r);
if (p)
p->next_index = r->next_index;
@@ -326,12 +363,13 @@ remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
pool_put_index (s->retry_pool, r - s->retry_pool);
}
-static void check_retry (mc_main_t * mcm, mc_stream_t * s)
+static void
+check_retry (mc_main_t * mcm, mc_stream_t * s)
{
- mc_retry_t * r;
- vlib_main_t * vm = mcm->vlib_main;
- f64 now = vlib_time_now(vm);
- uword * dead_peer_bitmap = 0;
+ mc_retry_t *r;
+ vlib_main_t *vm = mcm->vlib_main;
+ f64 now = vlib_time_now (vm);
+ uword *dead_peer_bitmap = 0;
u32 ri, ri_next;
for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
@@ -353,12 +391,17 @@ static void check_retry (mc_main_t * mcm, mc_stream_t * s)
{
if (MC_EVENT_LOGGING > 0)
{
- mc_stream_peer_t * p;
- ELOG_TYPE_DECLARE (t) = {
- .format = "resend local seq %d attempt %d",
- .format_args = "i4i4",
- };
+ mc_stream_peer_t *p;
+
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "resend local seq %d attempt %d",
+ .format_args = "i4i4",
+ };
+ /* *INDENT-ON* */
+ /* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
{
@@ -372,8 +415,13 @@ static void check_retry (mc_main_t * mcm, mc_stream_t * s)
ed->rx_sequence = r->local_sequence;
}
}));
+ /* *INDENT-ON* */
- struct { u32 sequence; u32 trail; } * ed;
+ struct
+ {
+ u32 sequence;
+ u32 trail;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->sequence = r->local_sequence;
ed->trail = r->n_retries;
@@ -385,19 +433,19 @@ static void check_retry (mc_main_t * mcm, mc_stream_t * s)
elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
mcm->transport.tx_buffer
- (mcm->transport.opaque,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- r->buffer_index);
+ (mcm->transport.opaque,
+ MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
}
}
maybe_send_window_open_event (mcm->vlib_main, s);
/* Delete any dead peers we've found. */
- if (! clib_bitmap_is_zero (dead_peer_bitmap))
+ if (!clib_bitmap_is_zero (dead_peer_bitmap))
{
uword i;
+ /* *INDENT-OFF* */
clib_bitmap_foreach (i, dead_peer_bitmap, ({
delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
@@ -407,6 +455,7 @@ static void check_retry (mc_main_t * mcm, mc_stream_t * s)
clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
}));
}));
+/* *INDENT-ON* */
clib_bitmap_free (dead_peer_bitmap);
}
}
@@ -414,38 +463,38 @@ static void check_retry (mc_main_t * mcm, mc_stream_t * s)
always_inline mc_main_t *
mc_node_get_main (vlib_node_runtime_t * node)
{
- mc_main_t ** p = (void *) node->runtime_data;
+ mc_main_t **p = (void *) node->runtime_data;
return p[0];
}
static uword
mc_retry_process (vlib_main_t * vm,
- vlib_node_runtime_t * node,
- vlib_frame_t * f)
+ vlib_node_runtime_t * node, vlib_frame_t * f)
{
- mc_main_t * mcm = mc_node_get_main (node);
- mc_stream_t * s;
-
+ mc_main_t *mcm = mc_node_get_main (node);
+ mc_stream_t *s;
+
while (1)
{
vlib_process_suspend (vm, 1.0);
vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_invalid)
- check_retry (mcm, s);
- }
+ {
+ if (s->state != MC_STREAM_STATE_invalid)
+ check_retry (mcm, s);
+ }
}
- return 0; /* not likely */
+ return 0; /* not likely */
}
-static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
+static void
+send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
{
- vlib_main_t * vm = mcm->vlib_main;
- mc_msg_join_or_leave_request_t * mp;
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_msg_join_or_leave_request_t *mp;
u32 bi;
mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
- memset(mp, 0, sizeof (*mp));
+ memset (mp, 0, sizeof (*mp));
mp->type = MC_MSG_TYPE_join_or_leave_request;
mp->peer_id = mcm->transport.our_ack_peer_id;
mp->stream_index = stream_index;
@@ -453,104 +502,111 @@ static void send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 i
mc_byte_swap_msg_join_or_leave_request (mp);
- /*
+ /*
* These msgs are unnumbered, unordered so send on the from-relay
- * channel.
+ * channel.
*/
mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
-}
+}
static uword
mc_join_ager_process (vlib_main_t * vm,
- vlib_node_runtime_t * node,
- vlib_frame_t * f)
+ vlib_node_runtime_t * node, vlib_frame_t * f)
{
- mc_main_t * mcm = mc_node_get_main (node);
-
+ mc_main_t *mcm = mc_node_get_main (node);
+
while (1)
{
if (mcm->joins_in_progress)
{
- mc_stream_t * s;
- vlib_one_time_waiting_process_t * p;
+ mc_stream_t *s;
+ vlib_one_time_waiting_process_t *p;
f64 now = vlib_time_now (vm);
vec_foreach (s, mcm->stream_vector)
- {
- if (s->state != MC_STREAM_STATE_join_in_progress)
- continue;
+ {
+ if (s->state != MC_STREAM_STATE_join_in_progress)
+ continue;
- if (now > s->join_timeout)
- {
- s->state = MC_STREAM_STATE_ready;
-
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "stream %d join timeout",
- };
- ELOG (mcm->elog_main, e, s->index);
- }
- /* Make sure that this app instance exists as a stream peer,
- or we may answer a catchup request with a NULL
- all_peer_bitmap... */
- (void) get_or_create_peer_with_id
- (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
-
- vec_foreach (p, s->procs_waiting_for_join_done)
- vlib_signal_one_time_waiting_process (vm, p);
- if (s->procs_waiting_for_join_done)
- _vec_len (s->procs_waiting_for_join_done) = 0;
-
- mcm->joins_in_progress--;
- ASSERT (mcm->joins_in_progress >= 0);
- }
- else
- {
- /* Resent join request which may have been lost. */
- send_join_or_leave_request (mcm, s->index,
- 1 /* is_join */);
-
- /* We're *not* alone, retry for as long as it takes */
- if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
- s->join_timeout = vlib_time_now (vm) + 2.0;
-
-
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "stream %d resend join request",
- };
- ELOG (mcm->elog_main, e, s->index);
- }
- }
- }
+ if (now > s->join_timeout)
+ {
+ s->state = MC_STREAM_STATE_ready;
+
+ if (MC_EVENT_LOGGING > 0)
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "stream %d join timeout",
+ };
+ /* *INDENT-ON* */
+ ELOG (mcm->elog_main, e, s->index);
+ }
+ /* Make sure that this app instance exists as a stream peer,
+ or we may answer a catchup request with a NULL
+ all_peer_bitmap... */
+ (void) get_or_create_peer_with_id
+ (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
+
+ vec_foreach (p, s->procs_waiting_for_join_done)
+ vlib_signal_one_time_waiting_process (vm, p);
+ if (s->procs_waiting_for_join_done)
+ _vec_len (s->procs_waiting_for_join_done) = 0;
+
+ mcm->joins_in_progress--;
+ ASSERT (mcm->joins_in_progress >= 0);
+ }
+ else
+ {
+ /* Resent join request which may have been lost. */
+ send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
+
+ /* We're *not* alone, retry for as long as it takes */
+ if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
+ s->join_timeout = vlib_time_now (vm) + 2.0;
+
+
+ if (MC_EVENT_LOGGING > 0)
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "stream %d resend join request",
+ };
+ /* *INDENT-ON* */
+ ELOG (mcm->elog_main, e, s->index);
+ }
+ }
+ }
}
vlib_process_suspend (vm, .5);
}
- return 0; /* not likely */
+ return 0; /* not likely */
}
-static void serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
+static void
+serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
{
- char * name = va_arg (*va, char *);
+ char *name = va_arg (*va, char *);
serialize_cstring (m, name);
}
-static void elog_stream_name (char * buf, int n_buf_bytes, char * v)
+static void
+elog_stream_name (char *buf, int n_buf_bytes, char *v)
{
clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
buf[n_buf_bytes - 1] = 0;
}
-static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
+static void
+unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
{
- mc_main_t * mcm = va_arg (*va, mc_main_t *);
- char * name;
- mc_stream_t * s;
- uword * p;
+ mc_main_t *mcm = va_arg (*va, mc_main_t *);
+ char *name;
+ mc_stream_t *s;
+ uword *p;
unserialize_cstring (m, &name);
@@ -558,11 +614,18 @@ static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list *
{
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "stream index %d already named %s",
- .format_args = "i4s16",
- };
- struct { u32 stream_index; char name[16]; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "stream index %d already named %s",
+ .format_args = "i4s16",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 stream_index;
+ char name[16];
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = p[0];
elog_stream_name (ed->name, sizeof (ed->name), name);
@@ -580,11 +643,18 @@ static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list *
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "stream index %d named %s",
- .format_args = "i4s16",
- };
- struct { u32 stream_index; char name[16]; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "stream index %d named %s",
+ .format_args = "i4s16",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 stream_index;
+ char name[16];
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = s->index;
elog_stream_name (ed->name, sizeof (ed->name), name);
@@ -595,7 +665,7 @@ static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list *
p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
if (p)
{
- vlib_one_time_waiting_process_t * wp, ** w;
+ vlib_one_time_waiting_process_t *wp, **w;
w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
vec_foreach (wp, w[0])
vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
@@ -604,18 +674,22 @@ static void unserialize_mc_register_stream_name (serialize_main_t * m, va_list *
}
}
-MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = {
+/* *INDENT-OFF* */
+MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
+{
.name = "mc_register_stream_name",
.serialize = serialize_mc_register_stream_name,
.unserialize = unserialize_mc_register_stream_name,
};
+/* *INDENT-ON* */
void
mc_rx_buffer_unserialize (mc_main_t * mcm,
mc_stream_t * stream,
- mc_peer_id_t peer_id,
- u32 buffer_index)
-{ return mc_unserialize (mcm, stream, buffer_index); }
+ mc_peer_id_t peer_id, u32 buffer_index)
+{
+ return mc_unserialize (mcm, stream, buffer_index);
+}
static u8 *
mc_internal_catchup_snapshot (mc_main_t * mcm,
@@ -633,9 +707,7 @@ mc_internal_catchup_snapshot (mc_main_t * mcm,
}
static void
-mc_internal_catchup (mc_main_t * mcm,
- u8 * data,
- u32 n_data_bytes)
+mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
{
serialize_main_t s;
@@ -646,20 +718,22 @@ mc_internal_catchup (mc_main_t * mcm,
/* Overridden from the application layer, not actually used here */
void mc_stream_join_process_hold (void) __attribute__ ((weak));
-void mc_stream_join_process_hold (void) { }
+void
+mc_stream_join_process_hold (void)
+{
+}
static u32
mc_stream_join_helper (mc_main_t * mcm,
- mc_stream_config_t * config,
- u32 is_internal)
+ mc_stream_config_t * config, u32 is_internal)
{
- mc_stream_t * s;
- vlib_main_t * vm = mcm->vlib_main;
-
+ mc_stream_t *s;
+ vlib_main_t *vm = mcm->vlib_main;
+
s = 0;
- if (! is_internal)
+ if (!is_internal)
{
- uword * p;
+ uword *p;
/* Already have a stream with given name? */
if ((s = mc_stream_by_name (mcm, config->name)))
@@ -670,10 +744,10 @@ mc_stream_join_helper (mc_main_t * mcm,
}
/* First join MC internal stream. */
- if (! mcm->stream_vector
- || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
- == MC_STREAM_STATE_invalid))
- {
+ if (!mcm->stream_vector
+ || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
+ == MC_STREAM_STATE_invalid))
+ {
static mc_stream_config_t c = {
.name = "mc-internal",
.rx_buffer = mc_rx_buffer_unserialize,
@@ -681,35 +755,39 @@ mc_stream_join_helper (mc_main_t * mcm,
.catchup_snapshot = mc_internal_catchup_snapshot,
};
- c.save_snapshot = config->save_snapshot;
+ c.save_snapshot = config->save_snapshot;
mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
}
/* If stream is still unknown register this name and wait for
- sequenced message to name stream. This way all peers agree
- on stream name to index mappings. */
+ sequenced message to name stream. This way all peers agree
+ on stream name to index mappings. */
s = mc_stream_by_name (mcm, config->name);
- if (! s)
+ if (!s)
{
- vlib_one_time_waiting_process_t * wp, ** w;
- u8 * name_copy = format (0, "%s", config->name);
+ vlib_one_time_waiting_process_t *wp, **w;
+ u8 *name_copy = format (0, "%s", config->name);
mc_serialize_stream (mcm,
MC_STREAM_INDEX_INTERNAL,
- &mc_register_stream_name_msg,
- config->name);
+ &mc_register_stream_name_msg, config->name);
/* Wait for this stream to be named. */
- p = hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, name_copy);
+ p =
+ hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
+ name_copy);
if (p)
- w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
+ w =
+ pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
+ p[0]);
else
{
pool_get (mcm->procs_waiting_for_stream_name_pool, w);
- if (! mcm->procs_waiting_for_stream_name_by_name)
- mcm->procs_waiting_for_stream_name_by_name
- = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
+ if (!mcm->procs_waiting_for_stream_name_by_name)
+ mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
+ sizeof
+ (uword));
hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
name_copy,
w - mcm->procs_waiting_for_stream_name_pool);
@@ -727,7 +805,7 @@ mc_stream_join_helper (mc_main_t * mcm,
ASSERT (s->state == MC_STREAM_STATE_name_known);
}
- if (! s)
+ if (!s)
{
vec_add2 (mcm->stream_vector, s, 1);
mc_stream_init (s);
@@ -736,7 +814,7 @@ mc_stream_join_helper (mc_main_t * mcm,
{
/* Save name since we could have already used it as hash key. */
- char * name_save = s->config.name;
+ char *name_save = s->config.name;
s->config = config[0];
@@ -748,16 +826,16 @@ mc_stream_join_helper (mc_main_t * mcm,
s->config.window_size = 8;
if (s->config.retry_interval == 0.0)
- s->config.retry_interval = 1.0;
+ s->config.retry_interval = 1.0;
/* Sanity. */
ASSERT (s->config.retry_interval < 30);
if (s->config.retry_limit == 0)
- s->config.retry_limit = 7;
+ s->config.retry_limit = 7;
s->state = MC_STREAM_STATE_join_in_progress;
- if (! s->peer_index_by_id.hash)
+ if (!s->peer_index_by_id.hash)
mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
/* If we don't hear from someone in 5 seconds, we're alone */
@@ -766,21 +844,28 @@ mc_stream_join_helper (mc_main_t * mcm,
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "stream index %d join request %s",
- .format_args = "i4s16",
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "stream index %d join request %s",
+ .format_args = "i4s16",
};
- struct { u32 stream_index; char name[16]; } * ed;
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 stream_index;
+ char name[16];
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = s->index;
elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
}
- send_join_or_leave_request (mcm, s->index, 1 /* join */);
-
+ send_join_or_leave_request (mcm, s->index, 1 /* join */ );
+
vlib_current_process_wait_for_one_time_event_vector
(vm, &s->procs_waiting_for_join_done);
-
+
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "join complete stream %d");
@@ -790,77 +875,84 @@ mc_stream_join_helper (mc_main_t * mcm,
return s->index;
}
-u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
-{ return mc_stream_join_helper (mcm, config, /* is_internal */ 0); }
+u32
+mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
+{
+ return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
+}
-void mc_stream_leave (mc_main_t * mcm, u32 stream_index)
+void
+mc_stream_leave (mc_main_t * mcm, u32 stream_index)
{
- mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
-
- if (! s)
+ mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
+
+ if (!s)
return;
if (MC_EVENT_LOGGING)
{
- ELOG_TYPE_DECLARE (t) = {
- .format = "leave-stream: %d",
- .format_args = "i4",
- };
- struct { u32 index; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "leave-stream: %d",.format_args = "i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 index;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->index = stream_index;
}
- send_join_or_leave_request (mcm, stream_index, 0 /* is_join */);
+ send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
mc_stream_free (s);
s->state = MC_STREAM_STATE_name_known;
}
-void mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
- mc_msg_join_or_leave_request_t * req,
- u32 buffer_index)
+void
+mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
+ mc_msg_join_or_leave_request_t * req,
+ u32 buffer_index)
{
- mc_stream_t * s;
- mc_msg_join_reply_t * rep;
+ mc_stream_t *s;
+ mc_msg_join_reply_t *rep;
u32 bi;
mc_byte_swap_msg_join_or_leave_request (req);
s = mc_stream_by_index (mcm, req->stream_index);
- if (! s || s->state != MC_STREAM_STATE_ready)
+ if (!s || s->state != MC_STREAM_STATE_ready)
return;
/* If the peer is joining, create it */
- if (req->is_join)
+ if (req->is_join)
{
- mc_stream_t * this_s;
+ mc_stream_t *this_s;
/* We're not in a position to catch up a peer until all
stream joins are complete. */
if (0)
- {
- /* XXX This is hard to test so we've. */
- vec_foreach (this_s, mcm->stream_vector)
- {
- if (this_s->state != MC_STREAM_STATE_ready
- && this_s->state != MC_STREAM_STATE_name_known)
- return;
- }
- }
- else
- if (mcm->joins_in_progress > 0)
- return;
-
- (void) get_or_create_peer_with_id (mcm,
- s,
- req->peer_id,
- /* created */ 0);
+ {
+ /* XXX This is hard to test so we've. */
+ vec_foreach (this_s, mcm->stream_vector)
+ {
+ if (this_s->state != MC_STREAM_STATE_ready
+ && this_s->state != MC_STREAM_STATE_name_known)
+ return;
+ }
+ }
+ else if (mcm->joins_in_progress > 0)
+ return;
+
+ (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
+ /* created */ 0);
rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
memset (rep, 0, sizeof (rep[0]));
rep->type = MC_MSG_TYPE_join_reply;
rep->stream_index = req->stream_index;
-
+
mc_byte_swap_msg_join_reply (rep);
/* These two are already in network byte order... */
rep->peer_id = mcm->transport.our_ack_peer_id;
@@ -875,32 +967,32 @@ void mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
}
}
-void mc_msg_join_reply_handler (mc_main_t * mcm,
- mc_msg_join_reply_t * mp,
- u32 buffer_index)
+void
+mc_msg_join_reply_handler (mc_main_t * mcm,
+ mc_msg_join_reply_t * mp, u32 buffer_index)
{
- mc_stream_t * s;
+ mc_stream_t *s;
mc_byte_swap_msg_join_reply (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
- if (! s || s->state != MC_STREAM_STATE_join_in_progress)
+ if (!s || s->state != MC_STREAM_STATE_join_in_progress)
return;
- /* Switch to catchup state; next join reply
+ /* Switch to catchup state; next join reply
for this stream will be ignored. */
s->state = MC_STREAM_STATE_catchup;
mcm->joins_in_progress--;
mcm->transport.catchup_request_fun (mcm->transport.opaque,
- mp->stream_index,
- mp->catchup_peer_id);
+ mp->stream_index, mp->catchup_peer_id);
}
-void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name)
+void
+mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
{
- mc_stream_t * s;
+ mc_stream_t *s;
while (1)
{
@@ -920,16 +1012,17 @@ void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name)
(m->vlib_main, &s->procs_waiting_for_join_done);
}
-u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
+u32
+mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
{
- mc_stream_t * s = mc_stream_by_index (mcm, stream_index);
- vlib_main_t * vm = mcm->vlib_main;
- mc_retry_t * r;
- mc_msg_user_request_t * mp;
- vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
+ mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_retry_t *r;
+ mc_msg_user_request_t *mp;
+ vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
u32 ri;
- if (! s)
+ if (!s)
return 0;
if (s->state != MC_STREAM_STATE_ready)
@@ -953,7 +1046,7 @@ u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
s->retry_head_index = ri;
else
{
- mc_retry_t * p = pool_elt_at_index (s->retry_pool, r->prev_index);
+ mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
p->next_index = ri;
}
@@ -965,24 +1058,27 @@ u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
mp->global_sequence = 0xdeadbeef;
mp->stream_index = s->index;
mp->local_sequence = s->our_local_sequence++;
- mp->n_data_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
+ mp->n_data_bytes =
+ vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
r->buffer_index = buffer_index;
r->local_sequence = mp->local_sequence;
- r->sent_at = vlib_time_now(vm);
+ r->sent_at = vlib_time_now (vm);
r->n_retries = 0;
/* Retry will be freed when all currently known peers have acked. */
vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
- hash_set (s->retry_index_by_local_sequence, r->local_sequence, r - s->retry_pool);
+ hash_set (s->retry_index_by_local_sequence, r->local_sequence,
+ r - s->retry_pool);
elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
mc_byte_swap_msg_user_request (mp);
- mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
+ mcm->transport.tx_buffer (mcm->transport.opaque,
+ MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
s->user_requests_sent++;
@@ -990,40 +1086,48 @@ u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
return s->config.window_size - pool_elts (s->retry_pool);
}
-void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u32 buffer_index)
+void
+mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
+ u32 buffer_index)
{
- vlib_main_t * vm = mcm->vlib_main;
- mc_stream_t * s;
- mc_stream_peer_t * peer;
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_stream_t *s;
+ mc_stream_peer_t *peer;
i32 seq_cmp_result;
- static int once=0;
+ static int once = 0;
mc_byte_swap_msg_user_request (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
/* Not signed up for this stream? Turf-o-matic */
- if (! s || s->state != MC_STREAM_STATE_ready)
+ if (!s || s->state != MC_STREAM_STATE_ready)
{
vlib_buffer_free_one (vm, buffer_index);
return;
}
/* Find peer, including ourselves. */
- peer = get_or_create_peer_with_id (mcm,
- s, mp->peer_id,
+ peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
/* created */ 0);
- seq_cmp_result = mc_seq_cmp (mp->local_sequence,
+ seq_cmp_result = mc_seq_cmp (mp->local_sequence,
peer->last_sequence_received + 1);
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
- .format_args = "T4i4i4i4",
- };
- struct { u32 peer, stream_index, rx_sequence; i32 seq_cmp_result; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
+ .format_args = "T4i4i4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 peer, stream_index, rx_sequence;
+ i32 seq_cmp_result;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
ed->stream_index = mp->stream_index;
@@ -1031,11 +1135,11 @@ void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u
ed->seq_cmp_result = seq_cmp_result;
}
- if (0 && mp->stream_index == 1 && once == 0)
+ if (0 && mp->stream_index == 1 && once == 0)
{
once = 1;
ELOG_TYPE (e, "FAKE lost msg on stream 1");
- ELOG (mcm->elog_main,e,0);
+ ELOG (mcm->elog_main, e, 0);
return;
}
@@ -1043,12 +1147,12 @@ void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u
s->user_requests_received++;
if (seq_cmp_result > 0)
- peer->stats.n_msgs_from_future += 1;
+ peer->stats.n_msgs_from_future += 1;
/* Send ack even if msg from future */
if (1)
{
- mc_msg_user_ack_t * rp;
+ mc_msg_user_ack_t *rp;
u32 bi;
rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
@@ -1059,11 +1163,18 @@ void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "tx-ack: stream %d local seq %d",
- .format_args = "i4i4",
- };
- struct { u32 stream_index; u32 local_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "tx-ack: stream %d local seq %d",
+ .format_args = "i4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 stream_index;
+ u32 local_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = rp->stream_index;
ed->local_sequence = rp->local_sequence;
@@ -1073,21 +1184,21 @@ void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u
mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
/* Msg from past? If so, free the buffer... */
- if (seq_cmp_result < 0)
- {
- vlib_buffer_free_one (vm, buffer_index);
- peer->stats.n_msgs_from_past += 1;
- }
+ if (seq_cmp_result < 0)
+ {
+ vlib_buffer_free_one (vm, buffer_index);
+ peer->stats.n_msgs_from_past += 1;
+ }
}
-
+
if (seq_cmp_result == 0)
{
- vlib_buffer_t * b = vlib_get_buffer (vm, buffer_index);
+ vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
switch (s->state)
{
case MC_STREAM_STATE_ready:
vlib_buffer_advance (b, sizeof (mp[0]));
- s->config.rx_buffer(mcm, s, mp->peer_id, buffer_index);
+ s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
/* Stream vector can change address via rx callback for mc-internal
stream. */
@@ -1108,13 +1219,15 @@ void mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, u
}
}
-void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffer_index)
+void
+mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
+ u32 buffer_index)
{
- vlib_main_t * vm = mcm->vlib_main;
+ vlib_main_t *vm = mcm->vlib_main;
uword *p;
- mc_stream_t * s;
- mc_stream_peer_t * peer;
- mc_retry_t * r;
+ mc_stream_t *s;
+ mc_stream_peer_t *peer;
+ mc_retry_t *r;
int peer_created = 0;
mc_byte_swap_msg_user_ack (mp);
@@ -1123,11 +1236,20 @@ void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffe
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (t) = {
- .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
- .format_args = "i4T4i4",
- };
- struct { u32 local_sequence; u32 peer; i32 seq_cmp_result;} * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
+ .format_args = "i4T4i4",
+ };
+ /* *INDENT-ON* */
+
+ struct
+ {
+ u32 local_sequence;
+ u32 peer;
+ i32 seq_cmp_result;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = mp->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
@@ -1135,35 +1257,35 @@ void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffe
}
/* Unknown stream? */
- if (! s)
+ if (!s)
return;
/* Find the peer which just ack'ed. */
peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
/* created */ &peer_created);
- /*
+ /*
* Peer reports message from the future. If it's not in the retry
- * fifo, look for a retired message.
+ * fifo, look for a retired message.
*/
if (mp->seq_cmp_result > 0)
{
- p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
- mp->seq_cmp_result);
+ p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
+ mp->seq_cmp_result);
if (p == 0)
- mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
+ mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
/* Normal retry should fix it... */
return;
}
- /*
+ /*
* Pointer to the indicated retry fifo entry.
* Worth hashing because we could use a window size of 100 or 1000.
*/
p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
- /*
+ /*
* Is this a duplicate ACK, received after we've retired the
* fifo entry. This can happen when learning about new
* peers.
@@ -1171,21 +1293,28 @@ void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffe
if (p == 0)
{
if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (t) =
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
{
.format = "ack: for seq %d from peer %s no fifo elt",
.format_args = "i4T4",
- };
- struct { u32 seq; u32 peer; } * ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = mp->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
- }
+ };
+ /* *INDENT-ON* */
+
+ struct
+ {
+ u32 seq;
+ u32 peer;
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, t);
+ ed->seq = mp->local_sequence;
+ ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
+ }
return;
}
-
+
r = pool_elt_at_index (s->retry_pool, p[0]);
/* Make sure that this new peer ACKs our msgs from now on */
@@ -1203,61 +1332,79 @@ void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffe
}
ASSERT (mp->local_sequence == r->local_sequence);
-
+
/* If we weren't expecting to hear from this peer */
- if (!peer_created &&
- ! clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
+ if (!peer_created &&
+ !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
{
if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (t) =
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
{
.format = "dup-ack: for seq %d from peer %s",
.format_args = "i4T4",
- };
- struct { u32 seq; u32 peer; } * ed;
- ed = ELOG_DATA (mcm->elog_main, t);
- ed->seq = r->local_sequence;
- ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
- }
- if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 seq;
+ u32 peer;
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, t);
+ ed->seq = r->local_sequence;
+ ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
+ }
+ if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
return;
}
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (t) =
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
{
.format = "ack: for seq %d from peer %s",
.format_args = "i4T4",
};
- struct { u32 seq; u32 peer; } * ed;
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 seq;
+ u32 peer;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = mp->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
}
- r->unacked_by_peer_bitmap =
+ r->unacked_by_peer_bitmap =
clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
-
+
/* Not all clients have ack'ed */
- if (! clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
+ if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
{
return;
}
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (t) =
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
{
.format = "ack: retire fifo elt loc seq %d after %d acks",
.format_args = "i4i4",
};
- struct { u32 seq; u32 npeers; } * ed;
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 seq;
+ u32 npeers;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = r->local_sequence;
ed->npeers = pool_elts (s->peers);
}
-
+
hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
mc_retry_free (mcm, s, r);
remove_retry_from_pool (s, r);
@@ -1268,25 +1415,24 @@ void mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, u32 buffe
static uword
mc_catchup_process (vlib_main_t * vm,
- vlib_node_runtime_t * node,
- vlib_frame_t * f)
+ vlib_node_runtime_t * node, vlib_frame_t * f)
{
- mc_main_t * mcm = mc_node_get_main (node);
+ mc_main_t *mcm = mc_node_get_main (node);
uword *event_data = 0;
- mc_catchup_process_arg_t * args;
+ mc_catchup_process_arg_t *args;
int i;
-
+
while (1)
{
if (event_data)
- _vec_len(event_data) = 0;
- vlib_process_wait_for_event_with_type (vm, &event_data, EVENT_MC_SEND_CATCHUP_DATA);
+ _vec_len (event_data) = 0;
+ vlib_process_wait_for_event_with_type (vm, &event_data,
+ EVENT_MC_SEND_CATCHUP_DATA);
- for (i = 0; i < vec_len(event_data); i++)
+ for (i = 0; i < vec_len (event_data); i++)
{
- args = pool_elt_at_index (mcm->catchup_process_args,
- event_data[i]);
-
+ args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
+
mcm->transport.catchup_send_fun (mcm->transport.opaque,
args->catchup_opaque,
args->catchup_snapshot);
@@ -1296,40 +1442,46 @@ mc_catchup_process (vlib_main_t * vm,
}
}
- return 0; /* not likely */
+ return 0; /* not likely */
}
-static void serialize_mc_stream (serialize_main_t * m, va_list * va)
+static void
+serialize_mc_stream (serialize_main_t * m, va_list * va)
{
- mc_stream_t * s = va_arg (*va, mc_stream_t *);
- mc_stream_peer_t * p;
+ mc_stream_t *s = va_arg (*va, mc_stream_t *);
+ mc_stream_peer_t *p;
serialize_integer (m, pool_elts (s->peers), sizeof (u32));
+ /* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
u8 * x = serialize_get (m, sizeof (p->id));
clib_memcpy (x, p->id.as_u8, sizeof (p->id));
- serialize_integer (m, p->last_sequence_received,
+ serialize_integer (m, p->last_sequence_received,
sizeof (p->last_sequence_received));
}));
+/* *INDENT-ON* */
serialize_bitmap (m, s->all_peer_bitmap);
}
-void unserialize_mc_stream (serialize_main_t * m, va_list * va)
+void
+unserialize_mc_stream (serialize_main_t * m, va_list * va)
{
- mc_stream_t * s = va_arg (*va, mc_stream_t *);
+ mc_stream_t *s = va_arg (*va, mc_stream_t *);
u32 i, n_peers;
- mc_stream_peer_t * p;
+ mc_stream_peer_t *p;
unserialize_integer (m, &n_peers, sizeof (u32));
mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
for (i = 0; i < n_peers; i++)
{
- u8 * x;
+ u8 *x;
pool_get (s->peers, p);
x = unserialize_get (m, sizeof (p->id));
clib_memcpy (p->id.as_u8, x, sizeof (p->id));
- unserialize_integer (m, &p->last_sequence_received, sizeof (p->last_sequence_received));
- mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ 0);
+ unserialize_integer (m, &p->last_sequence_received,
+ sizeof (p->last_sequence_received));
+ mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
+ 0);
}
s->all_peer_bitmap = unserialize_bitmap (m);
@@ -1338,38 +1490,46 @@ void unserialize_mc_stream (serialize_main_t * m, va_list * va)
clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
}
-void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t * req, u32 catchup_opaque)
+void
+mc_msg_catchup_request_handler (mc_main_t * mcm,
+ mc_msg_catchup_request_t * req,
+ u32 catchup_opaque)
{
- vlib_main_t * vm = mcm->vlib_main;
- mc_stream_t * s;
- mc_catchup_process_arg_t * args;
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_stream_t *s;
+ mc_catchup_process_arg_t *args;
mc_byte_swap_msg_catchup_request (req);
s = mc_stream_by_index (mcm, req->stream_index);
- if (! s || s->state != MC_STREAM_STATE_ready)
+ if (!s || s->state != MC_STREAM_STATE_ready)
return;
-
+
if (MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (t) =
- {
- .format = "catchup-request: from %s stream %d",
- .format_args = "T4i4",
- };
- struct { u32 peer, stream; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "catchup-request: from %s stream %d",
+ .format_args = "T4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 peer, stream;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
ed->stream = req->stream_index;
}
- /*
- * The application has to snapshoot its data structures right
- * here, right now. If we process any messages after
- * noting the last global sequence we've processed, the client
+ /*
+ * The application has to snapshoot its data structures right
+ * here, right now. If we process any messages after
+ * noting the last global sequence we've processed, the client
* won't be able to accurately reconstruct our data structures.
*
- * Once the data structures are e.g. vec_dup()'ed, we
+ * Once the data structures are e.g. vec_dup()'ed, we
* send the resulting messages from a separate process, to
* make sure that we don't cause a bunch of message retransmissions
*/
@@ -1382,7 +1542,7 @@ void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t *
/* Construct catchup reply and snapshot state for stream to send as
catchup reply payload. */
{
- mc_msg_catchup_reply_t * rep;
+ mc_msg_catchup_reply_t *rep;
serialize_main_t m;
vec_resize (args->catchup_snapshot, sizeof (rep[0]));
@@ -1403,9 +1563,7 @@ void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t *
/* Actually copy internal state */
args->catchup_snapshot = s->config.catchup_snapshot
- (mcm,
- args->catchup_snapshot,
- rep->last_global_sequence_included);
+ (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
rep = (void *) args->catchup_snapshot;
rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
@@ -1415,14 +1573,16 @@ void mc_msg_catchup_request_handler (mc_main_t * mcm, mc_msg_catchup_request_t *
/* now go send it... */
vlib_process_signal_event (vm, mcm->catchup_process,
- EVENT_MC_SEND_CATCHUP_DATA,
- args - mcm->catchup_process_args);
+ EVENT_MC_SEND_CATCHUP_DATA,
+ args - mcm->catchup_process_args);
}
#define EVENT_MC_UNSERIALIZE_BUFFER 0
#define EVENT_MC_UNSERIALIZE_CATCHUP 1
-void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, u32 catchup_opaque)
+void
+mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
+ u32 catchup_opaque)
{
vlib_process_signal_event (mcm->vlib_main,
mcm->unserialize_process,
@@ -1430,36 +1590,40 @@ void mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
pointer_to_uword (mp));
}
-static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
+static void
+perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
{
- mc_stream_t * s;
+ mc_stream_t *s;
i32 seq_cmp_result;
-
+
mc_byte_swap_msg_catchup_reply (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
/* Never heard of this stream or already caught up. */
- if (! s || s->state == MC_STREAM_STATE_ready)
+ if (!s || s->state == MC_STREAM_STATE_ready)
return;
-
+
{
serialize_main_t m;
- mc_stream_peer_t * p;
+ mc_stream_peer_t *p;
u32 n_stream_bytes;
/* For offline sim replay: save the entire catchup snapshot... */
if (s->config.save_snapshot)
- s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, mp->n_data_bytes);
+ s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
+ mp->n_data_bytes);
unserialize_open_data (&m, mp->data, mp->n_data_bytes);
unserialize (&m, unserialize_mc_stream, s);
/* Make sure we start numbering our messages as expected */
+ /* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
s->our_local_sequence = p->last_sequence_received + 1;
}));
+/* *INDENT-ON* */
n_stream_bytes = m.stream.current_buffer_index;
@@ -1467,7 +1631,7 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
/* After serialized stream is user's catchup data. */
s->config.catchup (mcm, mp->data + n_stream_bytes,
- mp->n_data_bytes - n_stream_bytes);
+ mp->n_data_bytes - n_stream_bytes);
}
/* Vector could have been moved by catchup.
@@ -1478,12 +1642,12 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
while (clib_fifo_elts (s->catchup_fifo))
{
- mc_msg_user_request_t * gp;
+ mc_msg_user_request_t *gp;
u32 bi;
- vlib_buffer_t * b;
+ vlib_buffer_t *b;
+
+ clib_fifo_sub1 (s->catchup_fifo, bi);
- clib_fifo_sub1(s->catchup_fifo, bi);
-
b = vlib_get_buffer (mcm->vlib_main, bi);
gp = vlib_buffer_get_current (b);
@@ -1499,11 +1663,17 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
if (MC_EVENT_LOGGING)
{
- ELOG_TYPE_DECLARE (t) = {
- .format = "catchup replay local sequence 0x%x",
- .format_args = "i4",
- };
- struct { u32 local_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "catchup replay local sequence 0x%x",
+ .format_args = "i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 local_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = gp->local_sequence;
}
@@ -1512,11 +1682,17 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
{
if (MC_EVENT_LOGGING)
{
- ELOG_TYPE_DECLARE (t) = {
- .format = "catchup discard local sequence 0x%x",
- .format_args = "i4",
- };
- struct { u32 local_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (t) =
+ {
+ .format = "catchup discard local sequence 0x%x",
+ .format_args = "i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 local_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = gp->local_sequence;
}
@@ -1529,7 +1705,7 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
/* Now that we are caught up wake up joining process. */
{
- vlib_one_time_waiting_process_t * wp;
+ vlib_one_time_waiting_process_t *wp;
vec_foreach (wp, s->procs_waiting_for_join_done)
vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
if (s->procs_waiting_for_join_done)
@@ -1537,20 +1713,21 @@ static void perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
}
}
-static void this_node_maybe_master (mc_main_t * mcm)
+static void
+this_node_maybe_master (mc_main_t * mcm)
{
- vlib_main_t * vm = mcm->vlib_main;
- mc_msg_master_assert_t * mp;
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_msg_master_assert_t *mp;
uword event_type;
int timeouts = 0;
int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
- clib_error_t * error;
+ clib_error_t *error;
f64 now, time_last_master_assert = -1;
u32 bi;
while (1)
{
- if (! mcm->we_can_be_relay_master)
+ if (!mcm->we_can_be_relay_master)
{
mcm->relay_state = MC_RELAY_STATE_SLAVE;
if (MC_EVENT_LOGGING)
@@ -1570,25 +1747,33 @@ static void this_node_maybe_master (mc_main_t * mcm)
mp->peer_id = mcm->transport.our_ack_peer_id;
mp->global_sequence = mcm->relay_global_sequence;
- /*
- * these messages clog the event log, set MC_EVENT_LOGGING higher
- * if you want them
- */
+ /*
+ * these messages clog the event log, set MC_EVENT_LOGGING higher
+ * if you want them
+ */
if (MC_EVENT_LOGGING > 1)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "tx-massert: peer %s global seq %u",
- .format_args = "T4i4",
- };
- struct { u32 peer, global_sequence; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "tx-massert: peer %s global seq %u",
+ .format_args = "T4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 peer, global_sequence;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
ed->global_sequence = mp->global_sequence;
}
- mc_byte_swap_msg_master_assert (mp);
+ mc_byte_swap_msg_master_assert (mp);
- error = mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_MASTERSHIP, bi);
+ error =
+ mcm->transport.tx_buffer (mcm->transport.opaque,
+ MC_TRANSPORT_MASTERSHIP, bi);
if (error)
clib_error_report (error);
}
@@ -1599,10 +1784,11 @@ static void this_node_maybe_master (mc_main_t * mcm)
switch (event_type)
{
case ~0:
- if (! is_master && timeouts++ > 2)
+ if (!is_master && timeouts++ > 2)
{
mcm->relay_state = MC_RELAY_STATE_MASTER;
- mcm->relay_master_peer_id = mcm->transport.our_ack_peer_id.as_u64;
+ mcm->relay_master_peer_id =
+ mcm->transport.our_ack_peer_id.as_u64;
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "become master (was maybe_master)");
@@ -1611,7 +1797,7 @@ static void this_node_maybe_master (mc_main_t * mcm)
return;
}
break;
-
+
case MC_RELAY_STATE_SLAVE:
mcm->relay_state = MC_RELAY_STATE_SLAVE;
if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
@@ -1624,9 +1810,10 @@ static void this_node_maybe_master (mc_main_t * mcm)
}
}
-static void this_node_slave (mc_main_t * mcm)
+static void
+this_node_slave (mc_main_t * mcm)
{
- vlib_main_t * vm = mcm->vlib_main;
+ vlib_main_t *vm = mcm->vlib_main;
uword event_type;
int timeouts = 0;
@@ -1667,18 +1854,17 @@ static void this_node_slave (mc_main_t * mcm)
static uword
mc_mastership_process (vlib_main_t * vm,
- vlib_node_runtime_t * node,
- vlib_frame_t * f)
+ vlib_node_runtime_t * node, vlib_frame_t * f)
{
- mc_main_t * mcm = mc_node_get_main (node);
-
+ mc_main_t *mcm = mc_node_get_main (node);
+
while (1)
{
switch (mcm->relay_state)
{
case MC_RELAY_STATE_NEGOTIATE:
case MC_RELAY_STATE_MASTER:
- this_node_maybe_master(mcm);
+ this_node_maybe_master (mcm);
break;
case MC_RELAY_STATE_SLAVE:
@@ -1686,21 +1872,24 @@ mc_mastership_process (vlib_main_t * vm,
break;
}
}
- return 0; /* not likely */
+ return 0; /* not likely */
}
-void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
+void
+mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
{
if (we_can_be_master != mcm->we_can_be_relay_master)
{
mcm->we_can_be_relay_master = we_can_be_master;
- vlib_process_signal_event (mcm->vlib_main,
+ vlib_process_signal_event (mcm->vlib_main,
mcm->mastership_process,
MC_RELAY_STATE_NEGOTIATE, 0);
}
}
-void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, u32 buffer_index)
+void
+mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
+ u32 buffer_index)
{
mc_peer_id_t his_peer_id, our_peer_id;
i32 seq_cmp_result;
@@ -1718,9 +1907,10 @@ void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
/* If the sender has a lower peer id and the sender's sequence >=
our global sequence, we become a slave. Otherwise we are master. */
- if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 && seq_cmp_result >= 0)
+ if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
+ && seq_cmp_result >= 0)
{
- vlib_process_signal_event (mcm->vlib_main,
+ vlib_process_signal_event (mcm->vlib_main,
mcm->mastership_process,
MC_RELAY_STATE_SLAVE, 0);
signal_slave = 1;
@@ -1734,8 +1924,8 @@ void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
}
{
- uword * q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
- mc_mastership_peer_t * p;
+ uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
+ mc_mastership_peer_t *p;
if (q)
p = vec_elt_at_index (mcm->mastership_peers, q[0]);
@@ -1743,28 +1933,34 @@ void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
{
vec_add2 (mcm->mastership_peers, p, 1);
p->peer_id = his_peer_id;
- mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, p - mcm->mastership_peers,
+ mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
+ p - mcm->mastership_peers,
/* old_value */ 0);
}
p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
}
- /*
+ /*
* these messages clog the event log, set MC_EVENT_LOGGING higher
* if you want them.
*/
if (MC_EVENT_LOGGING > 1)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "rx-massert: peer %s global seq %u upd %d slave %d",
- .format_args = "T4i4i1i1",
- };
- struct {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "rx-massert: peer %s global seq %u upd %d slave %d",
+ .format_args = "T4i4i1i1",
+ };
+ /* *INDENT-ON* */
+
+ struct
+ {
u32 peer;
- u32 global_sequence;
+ u32 global_sequence;
u8 update_sequence;
u8 slave;
- } * ed;
+ } *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
ed->global_sequence = mp->global_sequence;
@@ -1776,20 +1972,18 @@ void mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
static void
mc_serialize_init (mc_main_t * mcm)
{
- mc_serialize_msg_t * m;
- vlib_main_t * vm = vlib_get_main();
+ mc_serialize_msg_t *m;
+ vlib_main_t *vm = vlib_get_main ();
mcm->global_msg_index_by_name
- = hash_create_string (/* elts */ 0, sizeof (uword));
+ = hash_create_string ( /* elts */ 0, sizeof (uword));
m = vm->mc_msg_registrations;
-
+
while (m)
{
m->global_index = vec_len (mcm->global_msgs);
- hash_set_mem (mcm->global_msg_index_by_name,
- m->name,
- m->global_index);
+ hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
vec_add1 (mcm->global_msgs, m);
m = m->next_registration;
}
@@ -1797,19 +1991,18 @@ mc_serialize_init (mc_main_t * mcm)
clib_error_t *
mc_serialize_va (mc_main_t * mc,
- u32 stream_index,
+ u32 stream_index,
u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg,
- va_list * va)
+ mc_serialize_msg_t * msg, va_list * va)
{
- mc_stream_t * s;
- clib_error_t * error;
- serialize_main_t * m = &mc->serialize_mains[VLIB_TX];
- vlib_serialize_buffer_main_t * sbm = &mc->serialize_buffer_mains[VLIB_TX];
+ mc_stream_t *s;
+ clib_error_t *error;
+ serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
+ vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
u32 bi, n_before, n_after, n_total, n_this_msg;
u32 si, gi;
- if (! sbm->vlib_main)
+ if (!sbm->vlib_main)
{
sbm->tx.max_n_data_bytes_per_chain = 4096;
sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
@@ -1831,16 +2024,22 @@ mc_serialize_va (mc_main_t * mc,
serialize_likely_small_unsigned_integer (m, si);
/* For first time message is sent, use name to identify message. */
- if (si == ~0 || MSG_ID_DEBUG)
+ if (si == ~0 || MSG_ID_DEBUG)
serialize_cstring (m, msg->name);
if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
{
- ELOG_TYPE_DECLARE (e) = {
- .format = "serialize-msg: %s index %d",
- .format_args = "T4i4",
- };
- struct { u32 c[2]; } * ed;
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "serialize-msg: %s index %d",
+ .format_args = "T4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c[2];
+ } *ed;
ed = ELOG_DATA (mc->elog_main, e);
ed->c[0] = elog_id_for_msg_name (mc, msg->name);
ed->c[1] = si;
@@ -1854,15 +2053,17 @@ mc_serialize_va (mc_main_t * mc,
/* For max message size ignore first message where string name is sent. */
if (si != ~0)
- msg->max_n_bytes_serialized = clib_max (msg->max_n_bytes_serialized, n_this_msg);
+ msg->max_n_bytes_serialized =
+ clib_max (msg->max_n_bytes_serialized, n_this_msg);
- if (! multiple_messages_per_vlib_buffer
+ if (!multiple_messages_per_vlib_buffer
|| si == ~0
- || n_total + msg->max_n_bytes_serialized > mc->transport.max_packet_size)
+ || n_total + msg->max_n_bytes_serialized >
+ mc->transport.max_packet_size)
{
bi = serialize_close_vlib_buffer (m);
sbm->first_buffer = 0;
- if (! error)
+ if (!error)
mc_stream_send (mc, stream_index, bi);
else if (bi != ~0)
vlib_buffer_free_one (mc->vlib_main, bi);
@@ -1875,12 +2076,11 @@ clib_error_t *
mc_serialize_internal (mc_main_t * mc,
u32 stream_index,
u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg,
- ...)
+ mc_serialize_msg_t * msg, ...)
{
- vlib_main_t * vm = mc->vlib_main;
+ vlib_main_t *vm = mc->vlib_main;
va_list va;
- clib_error_t * error;
+ clib_error_t *error;
if (stream_index == ~0)
{
@@ -1892,125 +2092,146 @@ mc_serialize_internal (mc_main_t * mc,
va_start (va, msg);
error = mc_serialize_va (mc, stream_index,
- multiple_messages_per_vlib_buffer,
- msg, &va);
+ multiple_messages_per_vlib_buffer, msg, &va);
va_end (va);
return error;
}
-uword mc_unserialize_message (mc_main_t * mcm,
- mc_stream_t * s,
- serialize_main_t * m)
+uword
+mc_unserialize_message (mc_main_t * mcm,
+ mc_stream_t * s, serialize_main_t * m)
{
- mc_serialize_stream_msg_t * sm;
+ mc_serialize_stream_msg_t *sm;
u32 gi, si;
si = unserialize_likely_small_unsigned_integer (m);
- if (! (si == ~0 || MSG_ID_DEBUG))
+ if (!(si == ~0 || MSG_ID_DEBUG))
{
sm = vec_elt_at_index (s->stream_msgs, si);
gi = sm->global_index;
}
else
{
- char * name;
+ char *name;
unserialize_cstring (m, &name);
if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "unserialize-msg: %s rx index %d",
- .format_args = "T4i4",
- };
- struct { u32 c[2]; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- }
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "unserialize-msg: %s rx index %d",
+ .format_args = "T4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c[2];
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, e);
+ ed->c[0] = elog_id_for_msg_name (mcm, name);
+ ed->c[1] = si;
+ }
{
- uword * p = hash_get_mem (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
+ uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
+ gi = p ? p[0] : ~0;
}
/* Unknown message? */
if (gi == ~0)
- {
- vec_free (name);
- goto done;
- }
+ {
+ vec_free (name);
+ goto done;
+ }
vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
si = s->stream_msg_index_by_global_index[gi];
/* Stream local index unknown? Create it. */
if (si == ~0)
- {
- vec_add2 (s->stream_msgs, sm, 1);
+ {
+ vec_add2 (s->stream_msgs, sm, 1);
- si = sm - s->stream_msgs;
- sm->global_index = gi;
- s->stream_msg_index_by_global_index[gi] = si;
+ si = sm - s->stream_msgs;
+ sm->global_index = gi;
+ s->stream_msg_index_by_global_index[gi] = si;
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "msg-bind: stream %d %s to index %d",
- .format_args = "i4T4i4",
- };
- struct { u32 c[3]; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = s->index;
- ed->c[1] = elog_id_for_msg_name (mcm, name);
- ed->c[2] = si;
- }
- }
+ if (MC_EVENT_LOGGING > 0)
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "msg-bind: stream %d %s to index %d",
+ .format_args = "i4T4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c[3];
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, e);
+ ed->c[0] = s->index;
+ ed->c[1] = elog_id_for_msg_name (mcm, name);
+ ed->c[2] = si;
+ }
+ }
else
- {
- sm = vec_elt_at_index (s->stream_msgs, si);
- if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
- .format = "msg-id-ERROR: %s index %d expected %d",
- .format_args = "T4i4i4",
- };
- struct { u32 c[3]; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = ~0;
- if (sm->global_index < vec_len (s->stream_msg_index_by_global_index))
- ed->c[2] = s->stream_msg_index_by_global_index[sm->global_index];
- }
- }
+ {
+ sm = vec_elt_at_index (s->stream_msgs, si);
+ if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
+ .format = "msg-id-ERROR: %s index %d expected %d",
+ .format_args = "T4i4i4",
+ };
+ /* *INDENT-ON* */
+ struct
+ {
+ u32 c[3];
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, e);
+ ed->c[0] = elog_id_for_msg_name (mcm, name);
+ ed->c[1] = si;
+ ed->c[2] = ~0;
+ if (sm->global_index <
+ vec_len (s->stream_msg_index_by_global_index))
+ ed->c[2] =
+ s->stream_msg_index_by_global_index[sm->global_index];
+ }
+ }
vec_free (name);
}
if (gi != ~0)
{
- mc_serialize_msg_t * msg;
+ mc_serialize_msg_t *msg;
msg = vec_elt (mcm->global_msgs, gi);
unserialize (m, msg->unserialize, mcm);
}
- done:
+done:
return gi != ~0;
}
void
mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
{
- vlib_main_t * vm = mcm->vlib_main;
- serialize_main_t * m = &mcm->serialize_mains[VLIB_RX];
- vlib_serialize_buffer_main_t * sbm = &mcm->serialize_buffer_mains[VLIB_RX];
- mc_stream_and_buffer_t * sb;
- mc_stream_t * stream;
+ vlib_main_t *vm = mcm->vlib_main;
+ serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
+ vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
+ mc_stream_and_buffer_t *sb;
+ mc_stream_t *stream;
u32 buffer_index;
- sb = pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, stream_and_buffer_index);
+ sb =
+ pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
+ stream_and_buffer_index);
buffer_index = sb->buffer_index;
stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
@@ -2018,11 +2239,12 @@ mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
if (stream->config.save_snapshot)
{
u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
- static u8 * contents;
+ static u8 *contents;
vec_reset_length (contents);
vec_validate (contents, n_bytes - 1);
vlib_buffer_contents (vm, buffer_index, contents);
- stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, n_bytes);
+ stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
+ n_bytes);
}
ASSERT (vlib_in_process_context (vm));
@@ -2041,28 +2263,28 @@ mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
void
mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
{
- vlib_main_t * vm = mcm->vlib_main;
- mc_stream_and_buffer_t * sb;
+ vlib_main_t *vm = mcm->vlib_main;
+ mc_stream_and_buffer_t *sb;
pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
sb->stream_index = s->index;
sb->buffer_index = buffer_index;
- vlib_process_signal_event (vm, mcm->unserialize_process,
- EVENT_MC_UNSERIALIZE_BUFFER, sb - mcm->mc_unserialize_stream_and_buffers);
+ vlib_process_signal_event (vm, mcm->unserialize_process,
+ EVENT_MC_UNSERIALIZE_BUFFER,
+ sb - mcm->mc_unserialize_stream_and_buffers);
}
static uword
mc_unserialize_process (vlib_main_t * vm,
- vlib_node_runtime_t * node,
- vlib_frame_t * f)
+ vlib_node_runtime_t * node, vlib_frame_t * f)
{
- mc_main_t * mcm = mc_node_get_main (node);
- uword event_type, * event_data = 0;
+ mc_main_t *mcm = mc_node_get_main (node);
+ uword event_type, *event_data = 0;
int i;
-
+
while (1)
{
if (event_data)
- _vec_len(event_data) = 0;
+ _vec_len (event_data) = 0;
vlib_process_wait_for_event (vm);
event_type = vlib_process_get_events (vm, &event_data);
@@ -2076,7 +2298,7 @@ mc_unserialize_process (vlib_main_t * vm,
case EVENT_MC_UNSERIALIZE_CATCHUP:
for (i = 0; i < vec_len (event_data); i++)
{
- u8 * mp = uword_to_pointer (event_data[i], u8 *);
+ u8 *mp = uword_to_pointer (event_data[i], u8 *);
perform_catchup (mcm, (void *) mp);
vec_free (mp);
}
@@ -2087,57 +2309,58 @@ mc_unserialize_process (vlib_main_t * vm,
}
}
- return 0; /* not likely */
+ return 0; /* not likely */
}
-void serialize_mc_main (serialize_main_t * m, va_list * va)
+void
+serialize_mc_main (serialize_main_t * m, va_list * va)
{
- mc_main_t * mcm = va_arg (*va, mc_main_t *);
- mc_stream_t * s;
- mc_serialize_stream_msg_t * sm;
- mc_serialize_msg_t * msg;
+ mc_main_t *mcm = va_arg (*va, mc_main_t *);
+ mc_stream_t *s;
+ mc_serialize_stream_msg_t *sm;
+ mc_serialize_msg_t *msg;
serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
vec_foreach (s, mcm->stream_vector)
- {
- /* Stream name. */
- serialize_cstring (m, s->config.name);
+ {
+ /* Stream name. */
+ serialize_cstring (m, s->config.name);
- /* Serialize global names for all sent messages. */
- serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
- vec_foreach (sm, s->stream_msgs)
- {
- msg = vec_elt (mcm->global_msgs, sm->global_index);
- serialize_cstring (m, msg->name);
- }
+ /* Serialize global names for all sent messages. */
+ serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
+ vec_foreach (sm, s->stream_msgs)
+ {
+ msg = vec_elt (mcm->global_msgs, sm->global_index);
+ serialize_cstring (m, msg->name);
}
+ }
}
-void unserialize_mc_main (serialize_main_t * m, va_list * va)
+void
+unserialize_mc_main (serialize_main_t * m, va_list * va)
{
- mc_main_t * mcm = va_arg (*va, mc_main_t *);
+ mc_main_t *mcm = va_arg (*va, mc_main_t *);
u32 i, n_streams, n_stream_msgs;
- char * name;
- mc_stream_t * s;
- mc_serialize_stream_msg_t * sm;
+ char *name;
+ mc_stream_t *s;
+ mc_serialize_stream_msg_t *sm;
unserialize_integer (m, &n_streams, sizeof (u32));
for (i = 0; i < n_streams; i++)
{
unserialize_cstring (m, &name);
- if (i != MC_STREAM_INDEX_INTERNAL
- && ! mc_stream_by_name (mcm, name))
- {
- vec_validate (mcm->stream_vector, i);
- s = vec_elt_at_index (mcm->stream_vector, i);
- mc_stream_init (s);
- s->index = s - mcm->stream_vector;
- s->config.name = name;
- s->state = MC_STREAM_STATE_name_known;
- hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
- }
+ if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
+ {
+ vec_validate (mcm->stream_vector, i);
+ s = vec_elt_at_index (mcm->stream_vector, i);
+ mc_stream_init (s);
+ s->index = s - mcm->stream_vector;
+ s->config.name = name;
+ s->state = MC_STREAM_STATE_name_known;
+ hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
+ }
else
- vec_free (name);
+ vec_free (name);
s = vec_elt_at_index (mcm->stream_vector, i);
@@ -2147,45 +2370,53 @@ void unserialize_mc_main (serialize_main_t * m, va_list * va)
unserialize_integer (m, &n_stream_msgs, sizeof (u32));
vec_resize (s->stream_msgs, n_stream_msgs);
vec_foreach (sm, s->stream_msgs)
- {
- uword * p;
- u32 si, gi;
-
- unserialize_cstring (m, &name);
- p = hash_get (mcm->global_msg_index_by_name, name);
- gi = p ? p[0] : ~0;
- si = sm - s->stream_msgs;
-
- if (MC_EVENT_LOGGING > 0)
- {
- ELOG_TYPE_DECLARE (e) = {
+ {
+ uword *p;
+ u32 si, gi;
+
+ unserialize_cstring (m, &name);
+ p = hash_get (mcm->global_msg_index_by_name, name);
+ gi = p ? p[0] : ~0;
+ si = sm - s->stream_msgs;
+
+ if (MC_EVENT_LOGGING > 0)
+ {
+ /* *INDENT-OFF* */
+ ELOG_TYPE_DECLARE (e) =
+ {
.format = "catchup-bind: %s to %d global index %d stream %d",
.format_args = "T4i4i4i4",
};
- struct { u32 c[4]; } * ed;
- ed = ELOG_DATA (mcm->elog_main, e);
- ed->c[0] = elog_id_for_msg_name (mcm, name);
- ed->c[1] = si;
- ed->c[2] = gi;
- ed->c[3] = s->index;
- }
-
- vec_free (name);
-
- sm->global_index = gi;
- if (gi != ~0)
+ /* *INDENT-ON* */
+
+ struct
{
- vec_validate_init_empty (s->stream_msg_index_by_global_index,
- gi, ~0);
- s->stream_msg_index_by_global_index[gi] = si;
- }
- }
+ u32 c[4];
+ } *ed;
+ ed = ELOG_DATA (mcm->elog_main, e);
+ ed->c[0] = elog_id_for_msg_name (mcm, name);
+ ed->c[1] = si;
+ ed->c[2] = gi;
+ ed->c[3] = s->index;
+ }
+
+ vec_free (name);
+
+ sm->global_index = gi;
+ if (gi != ~0)
+ {
+ vec_validate_init_empty (s->stream_msg_index_by_global_index,
+ gi, ~0);
+ s->stream_msg_index_by_global_index[gi] = si;
+ }
+ }
}
}
-void mc_main_init (mc_main_t * mcm, char * tag)
+void
+mc_main_init (mc_main_t * mcm, char *tag)
{
- vlib_main_t * vm = vlib_get_main();
+ vlib_main_t *vm = vlib_get_main ();
mcm->vlib_main = vm;
mcm->elog_main = &vm->elog_main;
@@ -2194,7 +2425,7 @@ void mc_main_init (mc_main_t * mcm, char * tag)
mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
mcm->stream_index_by_name
- = hash_create_string (/* elts */ 0, /* value size */ sizeof (uword));
+ = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
{
vlib_node_registration_t r;
@@ -2229,16 +2460,19 @@ void mc_main_init (mc_main_t * mcm, char * tag)
}
if (MC_EVENT_LOGGING > 0)
- mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), sizeof (mc_peer_id_t));
+ mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
+ sizeof (mc_peer_id_t));
- mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
+ mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
+ sizeof (mc_peer_id_t));
mc_serialize_init (mcm);
}
-static u8 * format_mc_relay_state (u8 * s, va_list * args)
+static u8 *
+format_mc_relay_state (u8 * s, va_list * args)
{
mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
- char * t = 0;
+ char *t = 0;
switch (state)
{
case MC_RELAY_STATE_NEGOTIATE:
@@ -2257,10 +2491,11 @@ static u8 * format_mc_relay_state (u8 * s, va_list * args)
return format (s, "%s", t);
}
-static u8 * format_mc_stream_state (u8 * s, va_list * args)
+static u8 *
+format_mc_stream_state (u8 * s, va_list * args)
{
mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
- char * t = 0;
+ char *t = 0;
switch (state)
{
#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
@@ -2274,90 +2509,101 @@ static u8 * format_mc_stream_state (u8 * s, va_list * args)
}
static int
-mc_peer_comp (void * a1, void * a2)
+mc_peer_comp (void *a1, void *a2)
{
- mc_stream_peer_t * p1 = a1;
- mc_stream_peer_t * p2 = a2;
+ mc_stream_peer_t *p1 = a1;
+ mc_stream_peer_t *p2 = a2;
return mc_peer_id_compare (p1->id, p2->id);
}
-u8 * format_mc_main (u8 * s, va_list * args)
+u8 *
+format_mc_main (u8 * s, va_list * args)
{
- mc_main_t * mcm = va_arg (*args, mc_main_t *);
- mc_stream_t * t;
- mc_stream_peer_t * p, * ps;
+ mc_main_t *mcm = va_arg (*args, mc_main_t *);
+ mc_stream_t *t;
+ mc_stream_peer_t *p, *ps;
uword indent = format_get_indent (s);
- s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
- format_mc_relay_state, mcm->relay_state,
- vec_len (mcm->stream_vector),
- mcm->relay_global_sequence);
+ s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
+ format_mc_relay_state, mcm->relay_state,
+ vec_len (mcm->stream_vector), mcm->relay_global_sequence);
{
- mc_mastership_peer_t * mp;
+ mc_mastership_peer_t *mp;
f64 now = vlib_time_now (mcm->vlib_main);
s = format (s, "\n%UMost recent mastership peers:",
format_white_space, indent + 2);
vec_foreach (mp, mcm->mastership_peers)
- {
- s = format (s, "\n%U%-30U%.4e",
- format_white_space, indent + 4,
- mcm->transport.format_peer_id, mp->peer_id,
- now - mp->time_last_master_assert_received);
- }
- }
-
- vec_foreach (t, mcm->stream_vector)
{
- s = format (s, "\n%Ustream `%s' index %d",
- format_white_space, indent + 2,
- t->config.name, t->index);
-
- s = format (s, "\n%Ustate %U",
- format_white_space, indent + 4,
- format_mc_stream_state, t->state);
-
- s = format (s, "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
- format_white_space, indent + 4, t->config.retry_interval,
- t->config.retry_limit,
- pool_elts (t->retry_pool),
- t->stats.n_retries - t->stats_last_clear.n_retries);
-
- s = format (s, "\n%U%Ld/%Ld user requests sent/received",
+ s = format (s, "\n%U%-30U%.4e",
format_white_space, indent + 4,
- t->user_requests_sent, t->user_requests_received);
-
- s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
- format_white_space, indent + 4,
- pool_elts (t->peers),
- t->our_local_sequence,
- t->last_global_sequence_processed);
-
- ps = 0;
- pool_foreach (p, t->peers,
- ({
- if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
- vec_add1 (ps, p[0]);
- }));
- vec_sort_with_function (ps, mc_peer_comp);
- s = format (s, "\n%U%=30s%10s%16s%16s",
- format_white_space, indent + 6,
- "Peer", "Last seq", "Retries", "Future");
-
- vec_foreach (p, ps)
- {
- s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
- format_white_space, indent + 6,
- mcm->transport.format_peer_id, p->id.as_u64,
- p->last_sequence_received,
- p->stats.n_msgs_from_past - p->stats_last_clear.n_msgs_from_past,
- p->stats.n_msgs_from_future - p->stats_last_clear.n_msgs_from_future,
- (mcm->transport.our_ack_peer_id.as_u64 == p->id.as_u64
- ? " (self)" : ""));
- }
- vec_free (ps);
+ mcm->transport.format_peer_id, mp->peer_id,
+ now - mp->time_last_master_assert_received);
}
+ }
+
+ vec_foreach (t, mcm->stream_vector)
+ {
+ s = format (s, "\n%Ustream `%s' index %d",
+ format_white_space, indent + 2, t->config.name, t->index);
+
+ s = format (s, "\n%Ustate %U",
+ format_white_space, indent + 4,
+ format_mc_stream_state, t->state);
+
+ s =
+ format (s,
+ "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
+ format_white_space, indent + 4, t->config.retry_interval,
+ t->config.retry_limit, pool_elts (t->retry_pool),
+ t->stats.n_retries - t->stats_last_clear.n_retries);
+
+ s = format (s, "\n%U%Ld/%Ld user requests sent/received",
+ format_white_space, indent + 4,
+ t->user_requests_sent, t->user_requests_received);
+
+ s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
+ format_white_space, indent + 4,
+ pool_elts (t->peers),
+ t->our_local_sequence, t->last_global_sequence_processed);
+
+ ps = 0;
+ /* *INDENT-OFF* */
+ pool_foreach (p, t->peers,
+ ({
+ if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
+ vec_add1 (ps, p[0]);
+ }));
+ /* *INDENT-ON* */
+ vec_sort_with_function (ps, mc_peer_comp);
+ s = format (s, "\n%U%=30s%10s%16s%16s",
+ format_white_space, indent + 6,
+ "Peer", "Last seq", "Retries", "Future");
+
+ vec_foreach (p, ps)
+ {
+ s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
+ format_white_space, indent + 6,
+ mcm->transport.format_peer_id, p->id.as_u64,
+ p->last_sequence_received,
+ p->stats.n_msgs_from_past -
+ p->stats_last_clear.n_msgs_from_past,
+ p->stats.n_msgs_from_future -
+ p->stats_last_clear.n_msgs_from_future,
+ (mcm->transport.our_ack_peer_id.as_u64 ==
+ p->id.as_u64 ? " (self)" : ""));
+ }
+ vec_free (ps);
+ }
return s;
}
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */