/* * mc.h: vlib reliable sequenced multicast distributed applications * * Copyright (c) 2010 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef included_vlib_mc_h #define included_vlib_mc_h #include #include #include #include #ifndef MC_EVENT_LOGGING #define MC_EVENT_LOGGING 1 #endif always_inline uword mc_need_byte_swap (void) { return CLIB_ARCH_IS_LITTLE_ENDIAN; } /* * Used to uniquely identify hosts. * For IP4 this would be ip4_address plus tcp/udp port. */ typedef union { u8 as_u8[8]; u64 as_u64; } mc_peer_id_t; always_inline mc_peer_id_t mc_byte_swap_peer_id (mc_peer_id_t i) { /* Peer id is already in network byte order. */ return i; } always_inline int mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b) { return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8)); } /* Assert mastership. Lowest peer_id amount all peers wins mastership. Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP). So, we don't need a message opcode. */ typedef CLIB_PACKED (struct { /* Peer id asserting mastership. */ mc_peer_id_t peer_id; /* Global sequence number asserted. */ u32 global_sequence;}) mc_msg_master_assert_t; always_inline void mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->global_sequence = clib_byte_swap_u32 (r->global_sequence); } } #define foreach_mc_msg_type \ _ (master_assert) \ _ (join_or_leave_request) \ _ (join_reply) \ _ (user_request) \ _ (user_ack) \ _ (catchup_request) \ _ (catchup_reply) typedef enum { #define _(f) MC_MSG_TYPE_##f, foreach_mc_msg_type #undef _ } mc_relay_msg_type_t; /* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; /* MC_MSG_TYPE_join_or_leave_request */ /* Stream to join or leave. */ u32 stream_index; /* join = 1, leave = 0 */ u8 is_join;}) mc_msg_join_or_leave_request_t; always_inline void mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->type = clib_byte_swap_u32 (r->type); r->stream_index = clib_byte_swap_u32 (r->stream_index); } } /* Join reply. Multicast over MC_TRANSPORT_JOIN. */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; mc_relay_msg_type_t type:32; /* MC_MSG_TYPE_join_reply */ u32 stream_index; /* Peer ID to contact to catchup with this stream. */ mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t; always_inline void mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->type = clib_byte_swap_u32 (r->type); r->stream_index = clib_byte_swap_u32 (r->stream_index); r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id); } } /* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then relayed by relay master after filling in global sequence number. */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; u32 stream_index; /* Global sequence number as filled in by relay master. */ u32 global_sequence; /* Local sequence number as filled in by peer sending message. */ u32 local_sequence; /* Size of request data. */ u32 n_data_bytes; /* Opaque request data. */ u8 data[0];}) mc_msg_user_request_t; always_inline void mc_byte_swap_msg_user_request (mc_msg_user_request_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->stream_index = clib_byte_swap_u32 (r->stream_index); r->global_sequence = clib_byte_swap_u32 (r->global_sequence); r->local_sequence = clib_byte_swap_u32 (r->local_sequence); r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes); } } /* Sent unicast over ACK channel. */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; u32 global_sequence; u32 stream_index; u32 local_sequence; i32 seq_cmp_result;}) mc_msg_user_ack_t; always_inline void mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->stream_index = clib_byte_swap_u32 (r->stream_index); r->global_sequence = clib_byte_swap_u32 (r->global_sequence); r->local_sequence = clib_byte_swap_u32 (r->local_sequence); r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result); } } /* Sent/received unicast over catchup channel (e.g. using TCP). */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; u32 stream_index;}) mc_msg_catchup_request_t; always_inline void mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->stream_index = clib_byte_swap_u32 (r->stream_index); } } /* Sent/received unicast over catchup channel. */ typedef CLIB_PACKED (struct { mc_peer_id_t peer_id; u32 stream_index; /* Last global sequence number included in catchup data. */ u32 last_global_sequence_included; /* Size of catchup data. */ u32 n_data_bytes; /* Catchup data. */ u8 data[0];}) mc_msg_catchup_reply_t; always_inline void mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r) { if (mc_need_byte_swap ()) { r->peer_id = mc_byte_swap_peer_id (r->peer_id); r->stream_index = clib_byte_swap_u32 (r->stream_index); r->last_global_sequence_included = clib_byte_swap_u32 (r->last_global_sequence_included); r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes); } } typedef struct _mc_serialize_msg { /* Name for this type. */ char *name; /* Functions to serialize/unserialize data. */ serialize_function_t *serialize; serialize_function_t *unserialize; /* Maximum message size in bytes when serialized. If zero then this will be set to the largest sent message. */ u32 max_n_bytes_serialized; /* Opaque to use for first argument to serialize/unserialize function. */ u32 opaque; /* Index in global message vector. */ u32 global_index; /* Registration list */ struct _mc_serialize_msg *next_registration; } mc_serialize_msg_t; typedef struct { /* Index into global message vector. */ u32 global_index; } mc_serialize_stream_msg_t; #define MC_SERIALIZE_MSG(x,...) \ __VA_ARGS__ mc_serialize_msg_t x; \ static void __mc_serialize_msg_registration_##x (void) \ __attribute__((__constructor__)) ; \ static void __mc_serialize_msg_registration_##x (void) \ { \ vlib_main_t * vm = vlib_get_main(); \ x.next_registration = vm->mc_msg_registrations; \ vm->mc_msg_registrations = &x; \ } \ __VA_ARGS__ mc_serialize_msg_t x typedef enum { MC_TRANSPORT_MASTERSHIP, MC_TRANSPORT_JOIN, MC_TRANSPORT_USER_REQUEST_TO_RELAY, MC_TRANSPORT_USER_REQUEST_FROM_RELAY, MC_N_TRANSPORT_TYPE, } mc_transport_type_t; typedef struct { clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type, u32 buffer_index); clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id, u32 buffer_index); /* Returns catchup opaque. */ uword (*catchup_request_fun) (void *opaque, u32 stream_index, mc_peer_id_t catchup_peer_id); void (*catchup_send_fun) (void *opaque, uword catchup_opaque, u8 * data_vector); /* Opaque passed to callbacks. */ void *opaque; mc_peer_id_t our_ack_peer_id; mc_peer_id_t our_catchup_peer_id; /* Max packet size (MTU) for this transport. For IP this is interface MTU less IP + UDP header size. */ u32 max_packet_size; format_function_t *format_peer_id; } mc_transport_t; typedef struct { /* Count of messages received from this peer from the past/future (with seq_cmp != 0). */ u64 n_msgs_from_past; u64 n_msgs_from_future; } mc_stream_peer_stats_t; typedef struct { /* ID of this peer. */ mc_peer_id_t id; /* The last sequence we received from this peer. */ u32 last_sequence_received; mc_stream_peer_stats_t stats, stats_last_clear; } mc_stream_peer_t; typedef struct { u32 buffer_index; /* Cached copy of local sequence number from buffer.
/*
 * Copyright (c) 2015 Cisco and/or