diff options
author | Dave Barach <dave@barachs.net> | 2016-07-08 08:13:45 -0400 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2016-07-08 14:18:22 +0000 |
commit | 9b8ffd99fb35d37250cfb7094a0a31515f1bb577 (patch) | |
tree | 604ebcf86671a86b9e3395dda8d9fda50db2fcbc /vlib/vlib/mc.h | |
parent | 0557a91ca727cee963a8179808d2d2108564ec56 (diff) |
fd-io-styleify pass
Change-Id: If2d57a213fc2fec996db26df332910c3d2105f97
Signed-off-by: Dave Barach <dave@barachs.net>
Diffstat (limited to 'vlib/vlib/mc.h')
-rw-r--r-- | vlib/vlib/mc.h | 366 |
1 files changed, 190 insertions, 176 deletions
diff --git a/vlib/vlib/mc.h b/vlib/vlib/mc.h index 55dce2822c6..8614050c241 100644 --- a/vlib/vlib/mc.h +++ b/vlib/vlib/mc.h @@ -29,20 +29,23 @@ always_inline uword mc_need_byte_swap (void) -{ return CLIB_ARCH_IS_LITTLE_ENDIAN; } +{ + return CLIB_ARCH_IS_LITTLE_ENDIAN; +} -/* - * Used to uniquely identify hosts. - * For IP4 this would be ip4_address plus tcp/udp port. +/* + * Used to uniquely identify hosts. + * For IP4 this would be ip4_address plus tcp/udp port. */ -typedef union { +typedef union +{ u8 as_u8[8]; u64 as_u64; } mc_peer_id_t; always_inline mc_peer_id_t mc_byte_swap_peer_id (mc_peer_id_t i) -{ +{ /* Peer id is already in network byte order. */ return i; } @@ -56,13 +59,12 @@ mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b) /* Assert mastership. Lowest peer_id amount all peers wins mastership. Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP). So, we don't need a message opcode. */ -typedef CLIB_PACKED (struct { - /* Peer id asserting mastership. */ - mc_peer_id_t peer_id; - - /* Global sequence number asserted. */ - u32 global_sequence; -}) mc_msg_master_assert_t; +typedef CLIB_PACKED (struct + { + /* Peer id asserting mastership. */ + mc_peer_id_t peer_id; + /* Global sequence number asserted. */ + u32 global_sequence;}) mc_msg_master_assert_t; always_inline void mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r) @@ -83,24 +85,22 @@ mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r) _ (catchup_request) \ _ (catchup_reply) -typedef enum { +typedef enum +{ #define _(f) MC_MSG_TYPE_##f, foreach_mc_msg_type #undef _ } mc_relay_msg_type_t; /* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - - mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_or_leave_request */ - - /* Stream to join or leave. */ - u32 stream_index; - - /* join = 1, leave = 0 */ - u8 is_join; -}) mc_msg_join_or_leave_request_t; +typedef CLIB_PACKED (struct + { +mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; + /* MC_MSG_TYPE_join_or_leave_request */ + /* Stream to join or leave. */ + u32 stream_index; + /* join = 1, leave = 0 */ + u8 is_join;}) mc_msg_join_or_leave_request_t; always_inline void mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r) @@ -114,16 +114,13 @@ mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r) } /* Join reply. Multicast over MC_TRANSPORT_JOIN. */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - - mc_relay_msg_type_t type : 32; /* MC_MSG_TYPE_join_reply */ - - u32 stream_index; - - /* Peer ID to contact to catchup with this stream. */ - mc_peer_id_t catchup_peer_id; -}) mc_msg_join_reply_t; +typedef CLIB_PACKED (struct + { +mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; + /* MC_MSG_TYPE_join_reply */ + u32 stream_index; + /* Peer ID to contact to catchup with this stream. */ + mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t; always_inline void mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r) @@ -139,23 +136,17 @@ mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r) /* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then relayed by relay master after filling in global sequence number. */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - - u32 stream_index; - - /* Global sequence number as filled in by relay master. */ - u32 global_sequence; - - /* Local sequence number as filled in by peer sending message. */ - u32 local_sequence; - - /* Size of request data. */ - u32 n_data_bytes; - - /* Opaque request data. */ - u8 data[0]; -}) mc_msg_user_request_t; +typedef CLIB_PACKED (struct + { + mc_peer_id_t peer_id; u32 stream_index; + /* Global sequence number as filled in by relay master. */ + u32 global_sequence; + /* Local sequence number as filled in by peer sending message. */ + u32 local_sequence; + /* Size of request data. */ + u32 n_data_bytes; + /* Opaque request data. */ + u8 data[0];}) mc_msg_user_request_t; always_inline void mc_byte_swap_msg_user_request (mc_msg_user_request_t * r) @@ -171,13 +162,13 @@ mc_byte_swap_msg_user_request (mc_msg_user_request_t * r) } /* Sent unicast over ACK channel. */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - u32 global_sequence; - u32 stream_index; - u32 local_sequence; - i32 seq_cmp_result; -}) mc_msg_user_ack_t; +typedef CLIB_PACKED (struct + { + mc_peer_id_t peer_id; + u32 global_sequence; + u32 stream_index; + u32 local_sequence; + i32 seq_cmp_result;}) mc_msg_user_ack_t; always_inline void mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r) @@ -193,10 +184,10 @@ mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r) } /* Sent/received unicast over catchup channel (e.g. using TCP). */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - u32 stream_index; -}) mc_msg_catchup_request_t; +typedef CLIB_PACKED (struct + { + mc_peer_id_t peer_id; + u32 stream_index;}) mc_msg_catchup_request_t; always_inline void mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r) @@ -209,20 +200,15 @@ mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r) } /* Sent/received unicast over catchup channel. */ -typedef CLIB_PACKED (struct { - mc_peer_id_t peer_id; - - u32 stream_index; - - /* Last global sequence number included in catchup data. */ - u32 last_global_sequence_included; - - /* Size of catchup data. */ - u32 n_data_bytes; - - /* Catchup data. */ - u8 data[0]; -}) mc_msg_catchup_reply_t; +typedef CLIB_PACKED (struct + { + mc_peer_id_t peer_id; u32 stream_index; + /* Last global sequence number included in catchup data. */ + u32 last_global_sequence_included; + /* Size of catchup data. */ + u32 n_data_bytes; + /* Catchup data. */ + u8 data[0];}) mc_msg_catchup_reply_t; always_inline void mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r) @@ -231,18 +217,20 @@ mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->stream_index = clib_byte_swap_u32 (r->stream_index); - r->last_global_sequence_included = clib_byte_swap_u32 (r->last_global_sequence_included); + r->last_global_sequence_included = + clib_byte_swap_u32 (r->last_global_sequence_included); r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes); } } -typedef struct _mc_serialize_msg { +typedef struct _mc_serialize_msg +{ /* Name for this type. */ - char * name; + char *name; /* Functions to serialize/unserialize data. */ - serialize_function_t * serialize; - serialize_function_t * unserialize; + serialize_function_t *serialize; + serialize_function_t *unserialize; /* Maximum message size in bytes when serialized. If zero then this will be set to the largest sent message. */ @@ -255,10 +243,11 @@ typedef struct _mc_serialize_msg { u32 global_index; /* Registration list */ - struct _mc_serialize_msg * next_registration; + struct _mc_serialize_msg *next_registration; } mc_serialize_msg_t; -typedef struct { +typedef struct +{ /* Index into global message vector. */ u32 global_index; } mc_serialize_stream_msg_t; @@ -273,9 +262,10 @@ static void __mc_serialize_msg_registration_##x (void) \ x.next_registration = vm->mc_msg_registrations; \ vm->mc_msg_registrations = &x; \ } \ -__VA_ARGS__ mc_serialize_msg_t x +__VA_ARGS__ mc_serialize_msg_t x -typedef enum { +typedef enum +{ MC_TRANSPORT_MASTERSHIP, MC_TRANSPORT_JOIN, MC_TRANSPORT_USER_REQUEST_TO_RELAY, @@ -283,18 +273,23 @@ typedef enum { MC_N_TRANSPORT_TYPE, } mc_transport_type_t; -typedef struct { - clib_error_t * (* tx_buffer) (void * opaque, mc_transport_type_t type, u32 buffer_index); +typedef struct +{ + clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type, + u32 buffer_index); - clib_error_t * (* tx_ack) (void * opaque, mc_peer_id_t peer_id, u32 buffer_index); + clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id, + u32 buffer_index); /* Returns catchup opaque. */ - uword (* catchup_request_fun) (void * opaque, u32 stream_index, mc_peer_id_t catchup_peer_id); + uword (*catchup_request_fun) (void *opaque, u32 stream_index, + mc_peer_id_t catchup_peer_id); - void (* catchup_send_fun) (void * opaque, uword catchup_opaque, u8 * data_vector); + void (*catchup_send_fun) (void *opaque, uword catchup_opaque, + u8 * data_vector); /* Opaque passed to callbacks. */ - void * opaque; + void *opaque; mc_peer_id_t our_ack_peer_id; mc_peer_id_t our_catchup_peer_id; @@ -303,17 +298,19 @@ typedef struct { For IP this is interface MTU less IP + UDP header size. */ u32 max_packet_size; - format_function_t * format_peer_id; + format_function_t *format_peer_id; } mc_transport_t; -typedef struct { +typedef struct +{ /* Count of messages received from this peer from the past/future (with seq_cmp != 0). */ u64 n_msgs_from_past; u64 n_msgs_from_future; } mc_stream_peer_stats_t; -typedef struct { +typedef struct +{ /* ID of this peer. */ mc_peer_id_t id; @@ -323,7 +320,8 @@ typedef struct { mc_stream_peer_stats_t stats, stats_last_clear; } mc_stream_peer_t; -typedef struct { +typedef struct +{ u32 buffer_index; /* Cached copy of local sequence number from buffer. */ @@ -336,13 +334,14 @@ typedef struct { u32 prev_index, next_index; /* Bitmap of all peers which have acked this msg */ - uword * unacked_by_peer_bitmap; + uword *unacked_by_peer_bitmap; /* Message send or resend time */ f64 sent_at; } mc_retry_t; -typedef struct { +typedef struct +{ /* Number of retries sent for this stream. */ u64 n_retries; } mc_stream_stats_t; @@ -350,9 +349,10 @@ typedef struct { struct mc_main_t; struct mc_stream_t; -typedef struct { +typedef struct +{ /* Stream name. */ - char * name; + char *name; /* Number of outstanding messages. */ u32 window_size; @@ -364,31 +364,27 @@ typedef struct { u32 retry_limit; /* User rx buffer callback */ - void (* rx_buffer) (struct mc_main_t * mc_main, - struct mc_stream_t * stream, - mc_peer_id_t peer_id, - u32 buffer_index); + void (*rx_buffer) (struct mc_main_t * mc_main, + struct mc_stream_t * stream, + mc_peer_id_t peer_id, u32 buffer_index); /* User callback to create a snapshot */ - u8 * (* catchup_snapshot) (struct mc_main_t *mc_main, - u8 * snapshot_vector, - u32 last_global_sequence_included); + u8 *(*catchup_snapshot) (struct mc_main_t * mc_main, + u8 * snapshot_vector, + u32 last_global_sequence_included); /* User callback to replay a snapshot */ - void (* catchup) (struct mc_main_t *mc_main, - u8 * snapshot_data, - u32 n_snapshot_data_bytes); + void (*catchup) (struct mc_main_t * mc_main, + u8 * snapshot_data, u32 n_snapshot_data_bytes); /* Callback to save a snapshot for offline replay */ - void (* save_snapshot) (struct mc_main_t *mc_main, - u32 is_catchup, - u8 * snapshot_data, - u32 n_snapshot_data_bytes); + void (*save_snapshot) (struct mc_main_t * mc_main, + u32 is_catchup, + u8 * snapshot_data, u32 n_snapshot_data_bytes); /* Called when a peer dies */ - void (* peer_died) (struct mc_main_t * mc_main, - struct mc_stream_t * stream, - mc_peer_id_t peer_id); + void (*peer_died) (struct mc_main_t * mc_main, + struct mc_stream_t * stream, mc_peer_id_t peer_id); } mc_stream_config_t; #define foreach_mc_stream_state \ @@ -398,13 +394,15 @@ typedef struct { _ (catchup) \ _ (ready) -typedef enum { +typedef enum +{ #define _(f) MC_STREAM_STATE_##f, foreach_mc_stream_state #undef _ } mc_stream_state_t; -typedef struct mc_stream_t { +typedef struct mc_stream_t +{ mc_stream_config_t config; mc_stream_state_t state; @@ -415,33 +413,33 @@ typedef struct mc_stream_t { /* Stream index 0 is always for MC internal use. */ #define MC_STREAM_INDEX_INTERNAL 0 - mc_retry_t * retry_pool; + mc_retry_t *retry_pool; /* Head and tail index of retry pool. */ u32 retry_head_index, retry_tail_index; - /* + /* * Country club for recently retired messages * If the set of peers is expanding and a new peer * misses a message, we can easily retire the FIFO * element before we even know about the new peer */ - mc_retry_t * retired_fifo; + mc_retry_t *retired_fifo; /* Hash mapping local sequence to retry pool index. */ - uword * retry_index_by_local_sequence; + uword *retry_index_by_local_sequence; /* catch-up fifo of VLIB buffer indices. start recording when catching up. */ - u32 * catchup_fifo; + u32 *catchup_fifo; mc_stream_stats_t stats, stats_last_clear; /* Peer pool. */ - mc_stream_peer_t * peers; + mc_stream_peer_t *peers; /* Bitmap with ones for all peers in peer pool. */ - uword * all_peer_bitmap; + uword *all_peer_bitmap; /* Map of 64 bit id to index in stream pool. */ mhash_t peer_index_by_id; @@ -449,14 +447,14 @@ typedef struct mc_stream_t { /* Timeout, in case we're alone in the world */ f64 join_timeout; - vlib_one_time_waiting_process_t * procs_waiting_for_join_done; + vlib_one_time_waiting_process_t *procs_waiting_for_join_done; - vlib_one_time_waiting_process_t * procs_waiting_for_open_window; + vlib_one_time_waiting_process_t *procs_waiting_for_open_window; /* Next sequence number to use */ u32 our_local_sequence; - /* + /* * Last global sequence we processed. * When supplying catchup data, we need to tell * the client precisely where to start replaying @@ -464,13 +462,13 @@ typedef struct mc_stream_t { u32 last_global_sequence_processed; /* Vector of unique messages we've sent on this stream. */ - mc_serialize_stream_msg_t * stream_msgs; + mc_serialize_stream_msg_t *stream_msgs; /* Vector global message index into per stream message index. */ - u32 * stream_msg_index_by_global_index; + u32 *stream_msg_index_by_global_index; /* Hashed by message name. */ - uword * stream_msg_index_by_name; + uword *stream_msg_index_by_name; u64 user_requests_sent; u64 user_requests_received; @@ -495,30 +493,35 @@ mc_stream_init (mc_stream_t * s) s->retry_head_index = s->retry_tail_index = ~0; } -typedef struct { +typedef struct +{ u32 stream_index; u32 catchup_opaque; u8 *catchup_snapshot; } mc_catchup_process_arg_t; -typedef enum { +typedef enum +{ MC_RELAY_STATE_NEGOTIATE, MC_RELAY_STATE_MASTER, MC_RELAY_STATE_SLAVE, } mc_relay_state_t; -typedef struct { +typedef struct +{ mc_peer_id_t peer_id; f64 time_last_master_assert_received; } mc_mastership_peer_t; -typedef struct { +typedef struct +{ u32 stream_index; u32 buffer_index; } mc_stream_and_buffer_t; -typedef struct mc_main_t { +typedef struct mc_main_t +{ mc_relay_state_t relay_state; /* Mastership */ @@ -526,30 +529,30 @@ typedef struct mc_main_t { u64 relay_master_peer_id; - mc_mastership_peer_t * mastership_peers; + mc_mastership_peer_t *mastership_peers; /* Map of 64 bit id to index in stream pool. */ mhash_t mastership_peer_index_by_id; /* The transport we're using. */ mc_transport_t transport; - + /* Last-used global sequence number. */ u32 relay_global_sequence; /* Vector of streams. */ - mc_stream_t * stream_vector; + mc_stream_t *stream_vector; /* Hash table mapping stream name to pool index. */ - uword * stream_index_by_name; + uword *stream_index_by_name; - uword * procs_waiting_for_stream_name_by_name; + uword *procs_waiting_for_stream_name_by_name; - vlib_one_time_waiting_process_t ** procs_waiting_for_stream_name_pool; + vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool; int joins_in_progress; - mc_catchup_process_arg_t * catchup_process_args; + mc_catchup_process_arg_t *catchup_process_args; /* Node indices for mastership, join ager, retry and catchup processes. */ @@ -560,10 +563,10 @@ typedef struct mc_main_t { u32 unserialize_process; /* Global vector of messages. */ - mc_serialize_msg_t ** global_msgs; + mc_serialize_msg_t **global_msgs; /* Hash table mapping message name to index. */ - uword * global_msg_index_by_name; + uword *global_msg_index_by_name; /* Shared serialize/unserialize main. */ serialize_main_t serialize_mains[VLIB_N_RX_TX]; @@ -571,8 +574,8 @@ typedef struct mc_main_t { vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX]; /* Convenience variables */ - struct vlib_main_t * vlib_main; - elog_main_t * elog_main; + struct vlib_main_t *vlib_main; + elog_main_t *elog_main; /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */ mhash_t elog_id_by_peer_id; @@ -580,13 +583,13 @@ typedef struct mc_main_t { uword *elog_id_by_msg_name; /* For mc_unserialize. */ - mc_stream_and_buffer_t * mc_unserialize_stream_and_buffers; + mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers; } mc_main_t; always_inline mc_stream_t * -mc_stream_by_name (mc_main_t * m, char * name) +mc_stream_by_name (mc_main_t * m, char *name) { - uword * p = hash_get (m->stream_index_by_name, name); + uword *p = hash_get (m->stream_index_by_name, name); return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0; } @@ -599,51 +602,49 @@ mc_stream_by_index (mc_main_t * m, u32 i) always_inline void mc_clear_stream_stats (mc_main_t * m) { - mc_stream_t * s; - mc_stream_peer_t * p; + mc_stream_t *s; + mc_stream_peer_t *p; vec_foreach (s, m->stream_vector) - { - s->stats_last_clear = s->stats; + { + s->stats_last_clear = s->stats; + /* *INDENT-OFF* */ pool_foreach (p, s->peers, ({ p->stats_last_clear = p->stats; })); - } + /* *INDENT-ON* */ + } } /* Declare all message handlers. */ #define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index); foreach_mc_msg_type #undef _ - -u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *); + u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *); void mc_stream_leave (mc_main_t * mcm, u32 stream_index); -void mc_wait_for_stream_ready (mc_main_t * m, char * stream_name); +void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name); u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index); -void mc_main_init (mc_main_t * mcm, char * tag); +void mc_main_init (mc_main_t * mcm, char *tag); void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master); -void * mc_get_vlib_buffer (struct vlib_main_t * vm, u32 n_bytes, u32 * bi_return); +void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes, + u32 * bi_return); format_function_t format_mc_main; -clib_error_t * -mc_serialize_internal (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, - ...); +clib_error_t *mc_serialize_internal (mc_main_t * mc, + u32 stream_index, + u32 multiple_messages_per_vlib_buffer, + mc_serialize_msg_t * msg, ...); -clib_error_t * -mc_serialize_va (mc_main_t * mc, - u32 stream_index, - u32 multiple_messages_per_vlib_buffer, - mc_serialize_msg_t * msg, - va_list * va); +clib_error_t *mc_serialize_va (mc_main_t * mc, + u32 stream_index, + u32 multiple_messages_per_vlib_buffer, + mc_serialize_msg_t * msg, va_list * va); #define mc_serialize_stream(mc,si,msg,args...) \ mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args) @@ -656,19 +657,32 @@ mc_serialize_va (mc_main_t * mc, void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index); uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s, - serialize_main_t * m); + serialize_main_t * m); serialize_function_t serialize_mc_main, unserialize_mc_main; always_inline uword mc_max_message_size_in_bytes (mc_main_t * mcm) -{ return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t); } +{ + return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t); +} always_inline word mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m) -{ return mc_max_message_size_in_bytes (mcm) - serialize_vlib_buffer_n_bytes (m); } +{ + return mc_max_message_size_in_bytes (mcm) - + serialize_vlib_buffer_n_bytes (m); +} void unserialize_mc_stream (serialize_main_t * m, va_list * va); void mc_stream_join_process_hold (void); #endif /* included_vlib_mc_h */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ |