aboutsummaryrefslogtreecommitdiffstats
path: root/src/vlib/mc.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/vlib/mc.h')
-rw-r--r--src/vlib/mc.h695
1 files changed, 0 insertions, 695 deletions
diff --git a/src/vlib/mc.h b/src/vlib/mc.h
deleted file mode 100644
index 28f94350537..00000000000
--- a/src/vlib/mc.h
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * mc.h: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef included_vlib_mc_h
-#define included_vlib_mc_h
-
-#include <vppinfra/elog.h>
-#include <vppinfra/fifo.h>
-#include <vppinfra/mhash.h>
-#include <vlib/node.h>
-
-#ifndef MC_EVENT_LOGGING
-#define MC_EVENT_LOGGING 1
-#endif
-
-always_inline uword
-mc_need_byte_swap (void)
-{
- return CLIB_ARCH_IS_LITTLE_ENDIAN;
-}
-
-/*
- * Used to uniquely identify hosts.
- * For IP4 this would be ip4_address plus tcp/udp port.
- */
-typedef union
-{
- u8 as_u8[8];
- u64 as_u64;
-} mc_peer_id_t;
-
-always_inline mc_peer_id_t
-mc_byte_swap_peer_id (mc_peer_id_t i)
-{
- /* Peer id is already in network byte order. */
- return i;
-}
-
-always_inline int
-mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
-{
- return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
-}
-
-/* Assert mastership. Lowest peer_id amount all peers wins mastership.
- Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
- So, we don't need a message opcode. */
-typedef CLIB_PACKED (struct
- {
- /* Peer id asserting mastership. */
- mc_peer_id_t peer_id;
- /* Global sequence number asserted. */
- u32 global_sequence;}) mc_msg_master_assert_t;
-
-always_inline void
-mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- }
-}
-
-#define foreach_mc_msg_type \
- _ (master_assert) \
- _ (join_or_leave_request) \
- _ (join_reply) \
- _ (user_request) \
- _ (user_ack) \
- _ (catchup_request) \
- _ (catchup_reply)
-
-typedef enum
-{
-#define _(f) MC_MSG_TYPE_##f,
- foreach_mc_msg_type
-#undef _
-} mc_relay_msg_type_t;
-
-/* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
- {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
- /* MC_MSG_TYPE_join_or_leave_request */
- /* Stream to join or leave. */
- u32 stream_index;
- /* join = 1, leave = 0 */
- u8 is_join;}) mc_msg_join_or_leave_request_t;
-
-always_inline void
-mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->type = clib_byte_swap_u32 (r->type);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- }
-}
-
-/* Join reply. Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
- {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
- /* MC_MSG_TYPE_join_reply */
- u32 stream_index;
- /* Peer ID to contact to catchup with this stream. */
- mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
-
-always_inline void
-mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->type = clib_byte_swap_u32 (r->type);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
- }
-}
-
-/* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
- relayed by relay master after filling in global sequence number. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id; u32 stream_index;
- /* Global sequence number as filled in by relay master. */
- u32 global_sequence;
- /* Local sequence number as filled in by peer sending message. */
- u32 local_sequence;
- /* Size of request data. */
- u32 n_data_bytes;
- /* Opaque request data. */
- u8 data[0];}) mc_msg_user_request_t;
-
-always_inline void
-mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
- r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
- }
-}
-
-/* Sent unicast over ACK channel. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id;
- u32 global_sequence; u32 stream_index;
- u32 local_sequence;
- i32 seq_cmp_result;}) mc_msg_user_ack_t;
-
-always_inline void
-mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
- r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
- r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
- }
-}
-
-/* Sent/received unicast over catchup channel (e.g. using TCP). */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id;
- u32 stream_index;}) mc_msg_catchup_request_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- }
-}
-
-/* Sent/received unicast over catchup channel. */
-typedef CLIB_PACKED (struct
- {
- mc_peer_id_t peer_id; u32 stream_index;
- /* Last global sequence number included in catchup data. */
- u32 last_global_sequence_included;
- /* Size of catchup data. */
- u32 n_data_bytes;
- /* Catchup data. */
- u8 data[0];}) mc_msg_catchup_reply_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
-{
- if (mc_need_byte_swap ())
- {
- r->peer_id = mc_byte_swap_peer_id (r->peer_id);
- r->stream_index = clib_byte_swap_u32 (r->stream_index);
- r->last_global_sequence_included =
- clib_byte_swap_u32 (r->last_global_sequence_included);
- r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
- }
-}
-
-typedef struct _mc_serialize_msg
-{
- /* Name for this type. */
- char *name;
-
- /* Functions to serialize/unserialize data. */
- serialize_function_t *serialize;
- serialize_function_t *unserialize;
-
- /* Maximum message size in bytes when serialized.
- If zero then this will be set to the largest sent message. */
- u32 max_n_bytes_serialized;
-
- /* Opaque to use for first argument to serialize/unserialize function. */
- u32 opaque;
-
- /* Index in global message vector. */
- u32 global_index;
-
- /* Registration list */
- struct _mc_serialize_msg *next_registration;
-} mc_serialize_msg_t;
-
-typedef struct
-{
- /* Index into global message vector. */
- u32 global_index;
-} mc_serialize_stream_msg_t;
-
-#define MC_SERIALIZE_MSG(x,...) \
- __VA_ARGS__ mc_serialize_msg_t x; \
-static void __mc_serialize_msg_registration_##x (void) \
- __attribute__((__constructor__)) ; \
-static void __mc_serialize_msg_registration_##x (void) \
-{ \
- vlib_main_t * vm = vlib_get_main(); \
- x.next_registration = vm->mc_msg_registrations; \
- vm->mc_msg_registrations = &x; \
-} \
-static void __mc_serialize_msg_unregistration_##x (void) \
- __attribute__((__destructor__)) ; \
-static void __mc_serialize_msg_unregistration_##x (void) \
-{ \
- vlib_main_t * vm = vlib_get_main(); \
- VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
- next_registration); \
-} \
-__VA_ARGS__ mc_serialize_msg_t x
-
-typedef enum
-{
- MC_TRANSPORT_MASTERSHIP,
- MC_TRANSPORT_JOIN,
- MC_TRANSPORT_USER_REQUEST_TO_RELAY,
- MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
- MC_N_TRANSPORT_TYPE,
-} mc_transport_type_t;
-
-typedef struct
-{
- clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
- u32 buffer_index);
-
- clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
- u32 buffer_index);
-
- /* Returns catchup opaque. */
- uword (*catchup_request_fun) (void *opaque, u32 stream_index,
- mc_peer_id_t catchup_peer_id);
-
- void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
- u8 * data_vector);
-
- /* Opaque passed to callbacks. */
- void *opaque;
-
- mc_peer_id_t our_ack_peer_id;
- mc_peer_id_t our_catchup_peer_id;
-
- /* Max packet size (MTU) for this transport.
- For IP this is interface MTU less IP + UDP header size. */
- u32 max_packet_size;
-
- format_function_t *format_peer_id;
-} mc_transport_t;
-
-typedef struct
-{
- /* Count of messages received from this peer from the past/future
- (with seq_cmp != 0). */
- u64 n_msgs_from_past;
- u64 n_msgs_from_future;
-} mc_stream_peer_stats_t;
-
-typedef struct
-{
- /* ID of this peer. */
- mc_peer_id_t id;
-
- /* The last sequence we received from this peer. */
- u32 last_sequence_received;
-
- mc_stream_peer_stats_t stats, stats_last_clear;
-} mc_stream_peer_t;
-
-typedef struct
-{
- u32 buffer_index;
-
- /* Cached copy of local sequence number from buffer. */
- u32 local_sequence;
-
- /* Number of times this buffer has been sent (retried). */
- u32 n_retries;
-
- /* Previous/next retries in doubly-linked list. */
- u32 prev_index, next_index;
-
- /* Bitmap of all peers which have acked this msg */
- uword *unacked_by_peer_bitmap;
-
- /* Message send or resend time */
- f64 sent_at;
-} mc_retry_t;
-
-typedef struct
-{
- /* Number of retries sent for this stream. */
- u64 n_retries;
-} mc_stream_stats_t;
-
-struct mc_main_t;
-struct mc_stream_t;
-
-typedef struct
-{
- /* Stream name. */
- char *name;
-
- /* Number of outstanding messages. */
- u32 window_size;
-
- /* Retry interval, in seconds */
- f64 retry_interval;
-
- /* Retry limit */
- u32 retry_limit;
-
- /* User rx buffer callback */
- void (*rx_buffer) (struct mc_main_t * mc_main,
- struct mc_stream_t * stream,
- mc_peer_id_t peer_id, u32 buffer_index);
-
- /* User callback to create a snapshot */
- u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
- u8 * snapshot_vector,
- u32 last_global_sequence_included);
-
- /* User callback to replay a snapshot */
- void (*catchup) (struct mc_main_t * mc_main,
- u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
- /* Callback to save a snapshot for offline replay */
- void (*save_snapshot) (struct mc_main_t * mc_main,
- u32 is_catchup,
- u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
- /* Called when a peer dies */
- void (*peer_died) (struct mc_main_t * mc_main,
- struct mc_stream_t * stream, mc_peer_id_t peer_id);
-} mc_stream_config_t;
-
-#define foreach_mc_stream_state \
- _ (invalid) \
- _ (name_known) \
- _ (join_in_progress) \
- _ (catchup) \
- _ (ready)
-
-typedef enum
-{
-#define _(f) MC_STREAM_STATE_##f,
- foreach_mc_stream_state
-#undef _
-} mc_stream_state_t;
-
-typedef struct mc_stream_t
-{
- mc_stream_config_t config;
-
- mc_stream_state_t state;
-
- /* Index in stream pool. */
- u32 index;
-
- /* Stream index 0 is always for MC internal use. */
-#define MC_STREAM_INDEX_INTERNAL 0
-
- mc_retry_t *retry_pool;
-
- /* Head and tail index of retry pool. */
- u32 retry_head_index, retry_tail_index;
-
- /*
- * Country club for recently retired messages
- * If the set of peers is expanding and a new peer
- * misses a message, we can easily retire the FIFO
- * element before we even know about the new peer
- */
- mc_retry_t *retired_fifo;
-
- /* Hash mapping local sequence to retry pool index. */
- uword *retry_index_by_local_sequence;
-
- /* catch-up fifo of VLIB buffer indices.
- start recording when catching up. */
- u32 *catchup_fifo;
-
- mc_stream_stats_t stats, stats_last_clear;
-
- /* Peer pool. */
- mc_stream_peer_t *peers;
-
- /* Bitmap with ones for all peers in peer pool. */
- uword *all_peer_bitmap;
-
- /* Map of 64 bit id to index in stream pool. */
- mhash_t peer_index_by_id;
-
- /* Timeout, in case we're alone in the world */
- f64 join_timeout;
-
- vlib_one_time_waiting_process_t *procs_waiting_for_join_done;
-
- vlib_one_time_waiting_process_t *procs_waiting_for_open_window;
-
- /* Next sequence number to use */
- u32 our_local_sequence;
-
- /*
- * Last global sequence we processed.
- * When supplying catchup data, we need to tell
- * the client precisely where to start replaying
- */
- u32 last_global_sequence_processed;
-
- /* Vector of unique messages we've sent on this stream. */
- mc_serialize_stream_msg_t *stream_msgs;
-
- /* Vector global message index into per stream message index. */
- u32 *stream_msg_index_by_global_index;
-
- /* Hashed by message name. */
- uword *stream_msg_index_by_name;
-
- u64 user_requests_sent;
- u64 user_requests_received;
-} mc_stream_t;
-
-always_inline void
-mc_stream_free (mc_stream_t * s)
-{
- pool_free (s->retry_pool);
- hash_free (s->retry_index_by_local_sequence);
- clib_fifo_free (s->catchup_fifo);
- pool_free (s->peers);
- mhash_free (&s->peer_index_by_id);
- vec_free (s->procs_waiting_for_join_done);
- vec_free (s->procs_waiting_for_open_window);
-}
-
-always_inline void
-mc_stream_init (mc_stream_t * s)
-{
- memset (s, 0, sizeof (s[0]));
- s->retry_head_index = s->retry_tail_index = ~0;
-}
-
-typedef struct
-{
- u32 stream_index;
- u32 catchup_opaque;
- u8 *catchup_snapshot;
-} mc_catchup_process_arg_t;
-
-typedef enum
-{
- MC_RELAY_STATE_NEGOTIATE,
- MC_RELAY_STATE_MASTER,
- MC_RELAY_STATE_SLAVE,
-} mc_relay_state_t;
-
-typedef struct
-{
- mc_peer_id_t peer_id;
-
- f64 time_last_master_assert_received;
-} mc_mastership_peer_t;
-
-typedef struct
-{
- u32 stream_index;
- u32 buffer_index;
-} mc_stream_and_buffer_t;
-
-typedef struct mc_main_t
-{
- mc_relay_state_t relay_state;
-
- /* Mastership */
- u32 we_can_be_relay_master;
-
- u64 relay_master_peer_id;
-
- mc_mastership_peer_t *mastership_peers;
-
- /* Map of 64 bit id to index in stream pool. */
- mhash_t mastership_peer_index_by_id;
-
- /* The transport we're using. */
- mc_transport_t transport;
-
- /* Last-used global sequence number. */
- u32 relay_global_sequence;
-
- /* Vector of streams. */
- mc_stream_t *stream_vector;
-
- /* Hash table mapping stream name to pool index. */
- uword *stream_index_by_name;
-
- uword *procs_waiting_for_stream_name_by_name;
-
- vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool;
-
- int joins_in_progress;
-
- mc_catchup_process_arg_t *catchup_process_args;
-
- /* Node indices for mastership, join ager,
- retry and catchup processes. */
- u32 mastership_process;
- u32 join_ager_process;
- u32 retry_process;
- u32 catchup_process;
- u32 unserialize_process;
-
- /* Global vector of messages. */
- mc_serialize_msg_t **global_msgs;
-
- /* Hash table mapping message name to index. */
- uword *global_msg_index_by_name;
-
- /* Shared serialize/unserialize main. */
- serialize_main_t serialize_mains[VLIB_N_RX_TX];
-
- vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
-
- /* Convenience variables */
- struct vlib_main_t *vlib_main;
- elog_main_t *elog_main;
-
- /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
- mhash_t elog_id_by_peer_id;
-
- uword *elog_id_by_msg_name;
-
- /* For mc_unserialize. */
- mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers;
-} mc_main_t;
-
-always_inline mc_stream_t *
-mc_stream_by_name (mc_main_t * m, char *name)
-{
- uword *p = hash_get (m->stream_index_by_name, name);
- return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
-}
-
-always_inline mc_stream_t *
-mc_stream_by_index (mc_main_t * m, u32 i)
-{
- return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
-}
-
-always_inline void
-mc_clear_stream_stats (mc_main_t * m)
-{
- mc_stream_t *s;
- mc_stream_peer_t *p;
- vec_foreach (s, m->stream_vector)
- {
- s->stats_last_clear = s->stats;
- /* *INDENT-OFF* */
- pool_foreach (p, s->peers, ({
- p->stats_last_clear = p->stats;
- }));
- /* *INDENT-ON* */
- }
-}
-
-/* Declare all message handlers. */
-#define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
-foreach_mc_msg_type
-#undef _
- u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
-
-void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
-
-void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
-
-u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
-
-void mc_main_init (mc_main_t * mcm, char *tag);
-
-void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
-
-void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
- u32 * bi_return);
-
-format_function_t format_mc_main;
-
-clib_error_t *mc_serialize_internal (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, ...);
-
-clib_error_t *mc_serialize_va (mc_main_t * mc,
- u32 stream_index,
- u32 multiple_messages_per_vlib_buffer,
- mc_serialize_msg_t * msg, va_list * va);
-
-#define mc_serialize_stream(mc,si,msg,args...) \
- mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize(mc,msg,args...) \
- mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize2(mc,add,msg,args...) \
- mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
-
-void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
-uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
- serialize_main_t * m);
-
-serialize_function_t serialize_mc_main, unserialize_mc_main;
-
-always_inline uword
-mc_max_message_size_in_bytes (mc_main_t * mcm)
-{
- return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
-}
-
-always_inline word
-mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
-{
- return mc_max_message_size_in_bytes (mcm) -
- serialize_vlib_buffer_n_bytes (m);
-}
-
-void unserialize_mc_stream (serialize_main_t * m, va_list * va);
-void mc_stream_join_process_hold (void);
-
-#endif /* included_vlib_mc_h */
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */