diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/session.c | 7 | ||||
-rw-r--r-- | src/vnet/session/session.h | 18 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 16 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 111 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 22 | ||||
-rw-r--r-- | src/vnet/session/transport_interface.h | 61 |
6 files changed, 233 insertions, 2 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 189c5375fbb..1d421b978c1 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1355,6 +1355,8 @@ session_manager_main_enable (vlib_main_t * vm) vec_validate (smm->free_event_vector, num_threads - 1); vec_validate (smm->vpp_event_queues, num_threads - 1); vec_validate (smm->peekers_rw_locks, num_threads - 1); + vec_validate (smm->dispatch_period, num_threads - 1); + vec_validate (smm->last_vlib_time, num_threads - 1); vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES); for (i = 0; i < TRANSPORT_N_PROTO; i++) @@ -1373,6 +1375,9 @@ session_manager_main_enable (vlib_main_t * vm) _vec_len (smm->pending_event_vector[i]) = 0; vec_validate (smm->pending_disconnects[i], 0); _vec_len (smm->pending_disconnects[i]) = 0; + + smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]); + if (num_threads > 1) clib_rwlock_init (&smm->peekers_rw_locks[i]); } @@ -1419,7 +1424,7 @@ session_manager_main_enable (vlib_main_t * vm) /* Enable transports */ transport_enable_disable (vm, 1); - + transport_init_tx_pacers_period (); return 0; } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 914e0581fec..f0aa36cc1bc 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -215,6 +215,12 @@ struct _session_manager_main /** per-worker session context */ session_tx_context_t *ctx; + /** Our approximation of a "complete" dispatch loop period */ + f64 *dispatch_period; + + /** vlib_time_now last time around the track */ + f64 *last_vlib_time; + /** vpp fifo event queue */ svm_msg_q_t **vpp_event_queues; @@ -494,6 +500,18 @@ transport_tx_fifo_size (transport_connection_t * tc) return s->server_tx_fifo->nitems; } +always_inline f64 +transport_dispatch_period (u32 thread_index) +{ + return session_manager_main.dispatch_period[thread_index]; +} + +always_inline f64 +transport_time_now (u32 thread_index) +{ + return session_manager_main.last_vlib_time[thread_index]; +} + always_inline u32 session_get_index (stream_session_t * s) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index c1aea6723b6..eb974397165 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -568,7 +568,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ctx->transport_vft = transport_protocol_get_vft (tp); ctx->tc = session_tx_get_transport (ctx, peek_data); ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - ctx->snd_space = ctx->transport_vft->send_space (ctx->tc); + ctx->snd_space = + transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time); if (ctx->snd_space == 0 || ctx->snd_mss == 0) { vec_add1 (smm->pending_event_vector[thread_index], *e); @@ -685,6 +686,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, _vec_len (smm->tx_buffers[thread_index]) = n_bufs; *n_tx_packets += ctx->n_segs_per_evt; + transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd); vlib_put_next_frame (vm, node, next_index, n_left_to_next); /* If we couldn't dequeue all bytes mark as partially read */ @@ -743,6 +745,17 @@ session_event_get_session (session_event_t * e, u8 thread_index) return session_get_if_valid (e->fifo->master_session_index, thread_index); } +static void +session_update_dispatch_period (session_manager_main_t * smm, f64 now, + u32 thread_index) +{ + f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8; + + sample = now - smm->last_vlib_time[thread_index]; + smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period; + smm->last_vlib_time[thread_index] = now; +} + static uword session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) @@ -764,6 +777,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, /* * Update transport time */ + session_update_dispatch_period (smm, now, thread_index); transport_update_time (now, thread_index); /* diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index d74a218a5bf..c333c4161d4 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -42,6 +42,13 @@ static transport_endpoint_t *local_endpoints; */ static clib_spinlock_t local_endpoints_lock; +/* + * Period used by transport pacers. Initialized by session layer + */ +static double transport_pacer_period; + +#define TRANSPORT_PACER_MIN_MSS 1460 + u8 * format_transport_proto (u8 * s, va_list * args) { @@ -376,6 +383,110 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt, return 0; } +#define SPACER_CPU_TICKS_PER_PERIOD_SHIFT 10 +#define SPACER_CPU_TICKS_PER_PERIOD (1 << SPACER_CPU_TICKS_PER_PERIOD_SHIFT) + +u8 * +format_transport_pacer (u8 * s, va_list * args) +{ + spacer_t *pacer = va_arg (*args, spacer_t *); + + s = format (s, "bucket %u max_burst %u tokens/period %.3f last_update %x", + pacer->bucket, pacer->max_burst_size, pacer->tokens_per_period, + pacer->last_update); + return s; +} + +static inline u32 +spacer_max_burst (spacer_t * pacer, u64 norm_time_now) +{ + u64 n_periods = norm_time_now - pacer->last_update; + + pacer->last_update = norm_time_now; + pacer->bucket += n_periods * pacer->tokens_per_period; + return clib_min (pacer->bucket, pacer->max_burst_size); +} + +static inline void +spacer_update_bucket (spacer_t * pacer, u32 bytes) +{ + ASSERT (pacer->bucket >= bytes); + pacer->bucket -= bytes; +} + +static inline void +spacer_update_max_burst_size (spacer_t * pacer, u32 max_burst_bytes) +{ + pacer->max_burst_size = clib_max (max_burst_bytes, TRANSPORT_PACER_MIN_MSS); +} + +static inline void +spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec) +{ + ASSERT (rate_bytes_per_sec != 0); + pacer->tokens_per_period = rate_bytes_per_sec / transport_pacer_period; +} + +void +transport_connection_tx_pacer_init (transport_connection_t * tc, + u32 rate_bytes_per_sec, u32 burst_bytes) +{ + vlib_main_t *vm = vlib_get_main (); + u64 time_now = vm->clib_time.last_cpu_time; + spacer_t *pacer = &tc->pacer; + + tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED; + spacer_update_max_burst_size (&tc->pacer, burst_bytes); + spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec); + pacer->last_update = time_now >> SPACER_CPU_TICKS_PER_PERIOD_SHIFT; + pacer->bucket = burst_bytes; +} + +void +transport_connection_tx_pacer_update (transport_connection_t * tc, + u64 bytes_per_sec) +{ + u32 burst_size; + + burst_size = bytes_per_sec * transport_dispatch_period (tc->thread_index); + spacer_set_pace_rate (&tc->pacer, bytes_per_sec); + spacer_update_max_burst_size (&tc->pacer, burst_size); +} + +u32 +transport_connection_max_tx_burst (transport_connection_t * tc, u64 time_now) +{ + u32 snd_space, max_paced_burst; + u32 mss; + + snd_space = tp_vfts[tc->proto].send_space (tc); + if (transport_connection_is_tx_paced (tc)) + { + time_now >>= SPACER_CPU_TICKS_PER_PERIOD_SHIFT; + max_paced_burst = spacer_max_burst (&tc->pacer, time_now); + mss = tp_vfts[tc->proto].send_mss (tc); + max_paced_burst = (max_paced_burst < mss) ? 0 : max_paced_burst; + snd_space = clib_min (snd_space, max_paced_burst); + snd_space = snd_space - snd_space % mss; + } + return snd_space; +} + +void +transport_connection_update_tx_stats (transport_connection_t * tc, u32 bytes) +{ + tc->stats.tx_bytes += bytes; + if (transport_connection_is_tx_paced (tc)) + spacer_update_bucket (&tc->pacer, bytes); +} + +void +transport_init_tx_pacers_period (void) +{ + f64 cpu_freq = os_cpu_clock_frequency (); + transport_pacer_period = cpu_freq / SPACER_CPU_TICKS_PER_PERIOD; +} + void transport_update_time (f64 time_now, u8 thread_index) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index e29f3ca9557..07366692c3e 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -23,6 +23,19 @@ /* * Protocol independent transport properties associated to a session */ +typedef struct _transport_stats +{ + u64 tx_bytes; +} transport_stats_t; + +typedef struct _spacer +{ + u64 bucket; + u32 max_burst_size; + f32 tokens_per_period; + u64 last_update; +} spacer_t; + typedef struct _transport_connection { /** Connection ID */ @@ -54,6 +67,10 @@ typedef struct _transport_connection /*fib_node_index_t rmt_fei; dpo_id_t rmt_dpo; */ + u8 flags; /**< Transport specific flags */ + transport_stats_t stats; /**< Transport connection stats */ + spacer_t pacer; /**< Simple transport pacer */ + #if TRANSPORT_DEBUG elog_track_t elog_track; /**< Event logging */ u32 cc_stat_tstamp; /**< CC stats timestamp */ @@ -79,8 +96,13 @@ typedef struct _transport_connection #define c_rmt_fei connection.rmt_fei #define c_rmt_dpo connection.rmt_dpo #define c_opaque_id connection.opaque_conn_id +#define c_stats connection.stats +#define c_pacer connection.pacer +#define c_flags connection.flags } transport_connection_t; +#define TRANSPORT_CONNECTION_F_IS_TX_PACED 1 << 0 + typedef enum _transport_proto { TRANSPORT_PROTO_TCP, diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index 745a7db444c..ec9bd43e30f 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -102,6 +102,67 @@ transport_tx_fn_type_t transport_protocol_tx_fn_type (transport_proto_t tp); void transport_update_time (f64 time_now, u8 thread_index); void transport_enable_disable (vlib_main_t * vm, u8 is_en); +/** + * Initialize tx pacer for connection + * + * @param tc transport connection + * @param rate_bytes_per_second initial byte rate + * @param burst_bytes initial burst size in bytes + */ +void transport_connection_tx_pacer_init (transport_connection_t * tc, + u32 rate_bytes_per_sec, + u32 burst_bytes); + +/** + * Update tx pacer pacing rate + * + * @param tc transport connection + * @param bytes_per_sec new pacing rate + */ +void transport_connection_tx_pacer_update (transport_connection_t * tc, + u64 bytes_per_sec); + +/** + * Get maximum tx burst allowed for transport connection + * + * @param tc transport connection + * @param time_now current cpu time as returned by @ref clib_cpu_time_now + */ +u32 transport_connection_max_tx_burst (transport_connection_t * tc, + u64 time_now); + +/** + * Initialize period for tx pacers + * + * Defines a unit of time with respect to number of cpu cycles that is to + * be used by all tx pacers. + */ +void transport_init_tx_pacers_period (void); + +/** + * Check if transport connection is paced + */ +always_inline u8 +transport_connection_is_tx_paced (transport_connection_t * tc) +{ + return (tc->flags & TRANSPORT_CONNECTION_F_IS_TX_PACED); +} + +u8 *format_transport_pacer (u8 * s, va_list * args); + +/** + * Update tx byte stats for transport connection + * + * If tx pacing is enabled, this also updates pacer bucket to account for the + * amount of bytes that have been sent. + * + * @param tc transport connection + * @param pkts packets recently sent + * @param bytes bytes recently sent + */ +void transport_connection_update_tx_stats (transport_connection_t * tc, + u32 bytes); + #endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */ /* |