diff options
-rw-r--r-- | src/vnet/session/session.c | 7 | ||||
-rw-r--r-- | src/vnet/session/session.h | 18 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 16 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 111 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 22 | ||||
-rw-r--r-- | src/vnet/session/transport_interface.h | 61 | ||||
-rw-r--r-- | src/vnet/tcp/tcp.c | 44 | ||||
-rw-r--r-- | src/vnet/tcp/tcp.h | 23 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_input.c | 22 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_output.c | 2 |
10 files changed, 305 insertions, 21 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 189c5375fbb..1d421b978c1 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1355,6 +1355,8 @@ session_manager_main_enable (vlib_main_t * vm) vec_validate (smm->free_event_vector, num_threads - 1); vec_validate (smm->vpp_event_queues, num_threads - 1); vec_validate (smm->peekers_rw_locks, num_threads - 1); + vec_validate (smm->dispatch_period, num_threads - 1); + vec_validate (smm->last_vlib_time, num_threads - 1); vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES); for (i = 0; i < TRANSPORT_N_PROTO; i++) @@ -1373,6 +1375,9 @@ session_manager_main_enable (vlib_main_t * vm) _vec_len (smm->pending_event_vector[i]) = 0; vec_validate (smm->pending_disconnects[i], 0); _vec_len (smm->pending_disconnects[i]) = 0; + + smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]); + if (num_threads > 1) clib_rwlock_init (&smm->peekers_rw_locks[i]); } @@ -1419,7 +1424,7 @@ session_manager_main_enable (vlib_main_t * vm) /* Enable transports */ transport_enable_disable (vm, 1); - + transport_init_tx_pacers_period (); return 0; } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 914e0581fec..f0aa36cc1bc 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -215,6 +215,12 @@ struct _session_manager_main /** per-worker session context */ session_tx_context_t *ctx; + /** Our approximation of a "complete" dispatch loop period */ + f64 *dispatch_period; + + /** vlib_time_now last time around the track */ + f64 *last_vlib_time; + /** vpp fifo event queue */ svm_msg_q_t **vpp_event_queues; @@ -494,6 +500,18 @@ transport_tx_fifo_size (transport_connection_t * tc) return s->server_tx_fifo->nitems; } +always_inline f64 +transport_dispatch_period (u32 thread_index) +{ + return session_manager_main.dispatch_period[thread_index]; +} + +always_inline f64 +transport_time_now (u32 thread_index) +{ + return session_manager_main.last_vlib_time[thread_index]; +} + always_inline u32 session_get_index (stream_session_t * s) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index c1aea6723b6..eb974397165 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -568,7 +568,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ctx->transport_vft = transport_protocol_get_vft (tp); ctx->tc = session_tx_get_transport (ctx, peek_data); ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - ctx->snd_space = ctx->transport_vft->send_space (ctx->tc); + ctx->snd_space = + transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time); if (ctx->snd_space == 0 || ctx->snd_mss == 0) { vec_add1 (smm->pending_event_vector[thread_index], *e); @@ -685,6 +686,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, _vec_len (smm->tx_buffers[thread_index]) = n_bufs; *n_tx_packets += ctx->n_segs_per_evt; + transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd); vlib_put_next_frame (vm, node, next_index, n_left_to_next); /* If we couldn't dequeue all bytes mark as partially read */ @@ -743,6 +745,17 @@ session_event_get_session (session_event_t * e, u8 thread_index) return session_get_if_valid (e->fifo->master_session_index, thread_index); } +static void +session_update_dispatch_period (session_manager_main_t * smm, f64 now, + u32 thread_index) +{ + f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8; + + sample = now - smm->last_vlib_time[thread_index]; + smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period; + smm->last_vlib_time[thread_index] = now; +} + static uword session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) @@ -764,6 +777,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, /* * Update transport time */ + session_update_dispatch_period (smm, now, thread_index); transport_update_time (now, thread_index); /* diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index d74a218a5bf..c333c4161d4 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -42,6 +42,13 @@ static transport_endpoint_t *local_endpoints; */ static clib_spinlock_t local_endpoints_lock; +/* + * Period used by transport pacers. Initialized by session layer + */ +static double transport_pacer_period; + +#define TRANSPORT_PACER_MIN_MSS 1460 + u8 * format_transport_proto (u8 * s, va_list * args) { @@ -376,6 +383,110 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt, return 0; } +#define SPACER_CPU_TICKS_PER_PERIOD_SHIFT 10 +#define SPACER_CPU_TICKS_PER_PERIOD (1 << SPACER_CPU_TICKS_PER_PERIOD_SHIFT) + +u8 * +format_transport_pacer (u8 * s, va_list * args) +{ + spacer_t *pacer = va_arg (*args, spacer_t *); + + s = format (s, "bucket %u max_burst %u tokens/period %.3f last_update %x", + pacer->bucket, pacer->max_burst_size, pacer->tokens_per_period, + pacer->last_update); + return s; +} + +static inline u32 +spacer_max_burst (spacer_t * pacer, u64 norm_time_now) +{ + u64 n_periods = norm_time_now - pacer->last_update; + + pacer->last_update = norm_time_now; + pacer->bucket += n_periods * pacer->tokens_per_period; + return clib_min (pacer->bucket, pacer->max_burst_size); +} + +static inline void +spacer_update_bucket (spacer_t * pacer, u32 bytes) +{ + ASSERT (pacer->bucket >= bytes); + pacer->bucket -= bytes; +} + +static inline void +spacer_update_max_burst_size (spacer_t * pacer, u32 max_burst_bytes) +{ + pacer->max_burst_size = clib_max (max_burst_bytes, TRANSPORT_PACER_MIN_MSS); +} + +static inline void +spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec) +{ + ASSERT (rate_bytes_per_sec != 0); + pacer->tokens_per_period = rate_bytes_per_sec / transport_pacer_period; +} + +void +transport_connection_tx_pacer_init (transport_connection_t * tc, + u32 rate_bytes_per_sec, u32 burst_bytes) +{ + vlib_main_t *vm = vlib_get_main (); + u64 time_now = vm->clib_time.last_cpu_time; + spacer_t *pacer = &tc->pacer; + + tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED; + spacer_update_max_burst_size (&tc->pacer, burst_bytes); + spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec); + pacer->last_update = time_now >> SPACER_CPU_TICKS_PER_PERIOD_SHIFT; + pacer->bucket = burst_bytes; +} + +void +transport_connection_tx_pacer_update (transport_connection_t * tc, + u64 bytes_per_sec) +{ + u32 burst_size; + + burst_size = bytes_per_sec * transport_dispatch_period (tc->thread_index); + spacer_set_pace_rate (&tc->pacer, bytes_per_sec); + spacer_update_max_burst_size (&tc->pacer, burst_size); +} + +u32 +transport_connection_max_tx_burst (transport_connection_t * tc, u64 time_now) +{ + u32 snd_space, max_paced_burst; + u32 mss; + + snd_space = tp_vfts[tc->proto].send_space (tc); + if (transport_connection_is_tx_paced (tc)) + { + time_now >>= SPACER_CPU_TICKS_PER_PERIOD_SHIFT; + max_paced_burst = spacer_max_burst (&tc->pacer, time_now); + mss = tp_vfts[tc->proto].send_mss (tc); + max_paced_burst = (max_paced_burst < mss) ? 0 : max_paced_burst; + snd_space = clib_min (snd_space, max_paced_burst); + snd_space = snd_space - snd_space % mss; + } + return snd_space; +} + +void +transport_connection_update_tx_stats (transport_connection_t * tc, u32 bytes) +{ + tc->stats.tx_bytes += bytes; + if (transport_connection_is_tx_paced (tc)) + spacer_update_bucket (&tc->pacer, bytes); +} + +void +transport_init_tx_pacers_period (void) +{ + f64 cpu_freq = os_cpu_clock_frequency (); + transport_pacer_period = cpu_freq / SPACER_CPU_TICKS_PER_PERIOD; +} + void transport_update_time (f64 time_now, u8 thread_index) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index e29f3ca9557..07366692c3e 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -23,6 +23,19 @@ /* * Protocol independent transport properties associated to a session */ +typedef struct _transport_stats +{ + u64 tx_bytes; +} transport_stats_t; + +typedef struct _spacer +{ + u64 bucket; + u32 max_burst_size; + f32 tokens_per_period; + u64 last_update; +} spacer_t; + typedef struct _transport_connection { /** Connection ID */ @@ -54,6 +67,10 @@ typedef struct _transport_connection /*fib_node_index_t rmt_fei; dpo_id_t rmt_dpo; */ + u8 flags; /**< Transport specific flags */ + transport_stats_t stats; /**< Transport connection stats */ + spacer_t pacer; /**< Simple transport pacer */ + #if TRANSPORT_DEBUG elog_track_t elog_track; /**< Event logging */ u32 cc_stat_tstamp; /**< CC stats timestamp */ @@ -79,8 +96,13 @@ typedef struct _transport_connection #define c_rmt_fei connection.rmt_fei #define c_rmt_dpo connection.rmt_dpo #define c_opaque_id connection.opaque_conn_id +#define c_stats connection.stats +#define c_pacer connection.pacer +#define c_flags connection.flags } transport_connection_t; +#define TRANSPORT_CONNECTION_F_IS_TX_PACED 1 << 0 + typedef enum _transport_proto { TRANSPORT_PROTO_TCP, diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index 745a7db444c..ec9bd43e30f 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -102,6 +102,67 @@ transport_tx_fn_type_t transport_protocol_tx_fn_type (transport_proto_t tp); void transport_update_time (f64 time_now, u8 thread_index); void transport_enable_disable (vlib_main_t * vm, u8 is_en); +/** + * Initialize tx pacer for connection + * + * @param tc transport connection + * @param rate_bytes_per_second initial byte rate + * @param burst_bytes initial burst size in bytes + */ +void transport_connection_tx_pacer_init (transport_connection_t * tc, + u32 rate_bytes_per_sec, + u32 burst_bytes); + +/** + * Update tx pacer pacing rate + * + * @param tc transport connection + * @param bytes_per_sec new pacing rate + */ +void transport_connection_tx_pacer_update (transport_connection_t * tc, + u64 bytes_per_sec); + +/** + * Get maximum tx burst allowed for transport connection + * + * @param tc transport connection + * @param time_now current cpu time as returned by @ref clib_cpu_time_now + */ +u32 transport_connection_max_tx_burst (transport_connection_t * tc, + u64 time_now); + +/** + * Initialize period for tx pacers + * + * Defines a unit of time with respect to number of cpu cycles that is to + * be used by all tx pacers. + */ +void transport_init_tx_pacers_period (void); + +/** + * Check if transport connection is paced + */ +always_inline u8 +transport_connection_is_tx_paced (transport_connection_t * tc) +{ + return (tc->flags & TRANSPORT_CONNECTION_F_IS_TX_PACED); +} + +u8 *format_transport_pacer (u8 * s, va_list * args); + +/** + * Update tx byte stats for transport connection + * + * If tx pacing is enabled, this also updates pacer bucket to account for the + * amount of bytes that have been sent. + * + * @param tc transport connection + * @param pkts packets recently sent + * @param bytes bytes recently sent + */ +void transport_connection_update_tx_stats (transport_connection_t * tc, + u32 bytes); + #endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */ /* diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index cb05b8c0533..626b49997ac 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -555,6 +555,16 @@ tcp_init_snd_vars (tcp_connection_t * tc) tc->snd_una_max = tc->snd_nxt; } +void +tcp_enable_pacing (tcp_connection_t * tc) +{ + u32 max_burst, byte_rate; + max_burst = 16 * tc->snd_mss; + byte_rate = 2 << 16; + transport_connection_tx_pacer_init (&tc->connection, byte_rate, max_burst); + tc->mrtt_us = (u32) ~ 0; +} + /** Initialize tcp connection variables * * Should be called after having received a msg from the peer, i.e., a SYN or @@ -572,7 +582,11 @@ tcp_connection_init_vars (tcp_connection_t * tc) if (!tc->c_is_ip4 && ip6_address_is_link_local_unicast (&tc->c_rmt_ip6)) tcp_add_del_adjacency (tc, 1); - // tcp_connection_fib_attach (tc); + /* tcp_connection_fib_attach (tc); */ + + if (transport_connection_is_tx_paced (&tc->connection) + || tcp_main.tx_pacing) + tcp_enable_pacing (tc); } static int @@ -784,14 +798,19 @@ format_tcp_vars (u8 * s, va_list * args) s = format (s, " limited_transmit %u\n", tc->limited_transmit - tc->iss); s = format (s, " tsecr %u tsecr_last_ack %u\n", tc->rcv_opts.tsecr, tc->tsecr_last_ack); - s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %u ", tc->rto, - tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts); + s = format (s, " rto %u rto_boff %u srtt %u rttvar %u rtt_ts %2.5f ", + tc->rto, tc->rto_boff, tc->srtt, tc->rttvar, tc->rtt_ts); s = format (s, "rtt_seq %u\n", tc->rtt_seq); s = format (s, " tsval_recent %u tsval_recent_age %u\n", tc->tsval_recent, tcp_time_now () - tc->tsval_recent_age); if (tc->state >= TCP_STATE_ESTABLISHED) - s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb, - tc); + { + s = format (s, " scoreboard: %U\n", format_tcp_scoreboard, &tc->sack_sb, + tc); + if (transport_connection_is_tx_paced (&tc->connection)) + s = format (s, " pacer: %U\n", format_transport_pacer, + &tc->connection.pacer); + } if (vec_len (tc->snd_sacks)) s = format (s, " sacks tx: %U\n", format_tcp_sacks, tc); @@ -1129,6 +1148,19 @@ const static transport_proto_vft_t tcp_proto = { }; /* *INDENT-ON* */ +void +tcp_update_pacer (tcp_connection_t * tc) +{ + f64 srtt; + + if (!transport_connection_is_tx_paced (&tc->connection)) + return; + + srtt = clib_min ((f64) tc->srtt * TCP_TICK, tc->mrtt_us); + transport_connection_tx_pacer_update (&tc->connection, + ((f64) tc->cwnd) / srtt); +} + static void tcp_timer_keep_handler (u32 conn_index) { @@ -1408,6 +1440,8 @@ tcp_config_fn (vlib_main_t * vm, unformat_input_t * input) else if (unformat (input, "max-rx-fifo %U", unformat_memory_size, &tm->max_rx_fifo)) ; + else if (unformat (input, "tx-pacing")) + tm->tx_pacing = 1; else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input); diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index a036072e546..4ba3d5e9d87 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -160,7 +160,7 @@ enum }; #define TCP_SCOREBOARD_TRACE (0) -#define TCP_MAX_SACK_BLOCKS 15 /**< Max number of SACK blocks stored */ +#define TCP_MAX_SACK_BLOCKS 32 /**< Max number of SACK blocks stored */ #define TCP_INVALID_SACK_HOLE_INDEX ((u32)~0) typedef struct _scoreboard_trace_elt @@ -319,8 +319,9 @@ typedef struct _tcp_connection u32 rto_boff; /**< Index for RTO backoff */ u32 srtt; /**< Smoothed RTT */ u32 rttvar; /**< Smoothed mean RTT difference. Approximates variance */ - u32 rtt_ts; /**< Timestamp for tracked ACK */ u32 rtt_seq; /**< Sequence number for tracked ACK */ + f64 rtt_ts; /**< Timestamp for tracked ACK */ + f64 mrtt_us; /**< High precision mrtt from tracked acks */ u16 mss; /**< Our max seg size that includes options */ u32 limited_transmit; /**< snd_nxt when limited transmit starts */ @@ -444,6 +445,9 @@ typedef struct _tcp_main u32 last_v6_address_rotor; ip6_address_t *ip6_src_addresses; + /** Enable tx pacing for new connections */ + u8 tx_pacing; + u8 punt_unknown4; u8 punt_unknown6; @@ -692,6 +696,12 @@ tcp_time_now (void) return tcp_main.wrk_ctx[vlib_get_thread_index ()].time_now; } +always_inline f64 +tcp_time_now_us (u32 thread_index) +{ + return transport_time_now (thread_index); +} + always_inline u32 tcp_set_time_now (u32 thread_index) { @@ -706,6 +716,15 @@ void tcp_connection_timers_init (tcp_connection_t * tc); void tcp_connection_timers_reset (tcp_connection_t * tc); void tcp_init_snd_vars (tcp_connection_t * tc); void tcp_connection_init_vars (tcp_connection_t * tc); +void tcp_update_pacer (tcp_connection_t * tc); + +always_inline void +tcp_cc_rcv_ack (tcp_connection_t * tc) +{ + tc->cc_algo->rcv_ack (tc); + tcp_update_pacer (tc); + tc->tsecr_last_ack = tc->rcv_opts.tsecr; +} always_inline void tcp_connection_force_ack (tcp_connection_t * tc, vlib_buffer_t * b) diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index 39a538ba681..ac0e996567e 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -462,14 +462,15 @@ tcp_update_rtt (tcp_connection_t * tc, u32 ack) if (tc->rtt_ts && seq_geq (ack, tc->rtt_seq)) { - mrtt = tcp_time_now () - tc->rtt_ts; + tc->mrtt_us = tcp_time_now_us (tc->c_thread_index) - tc->rtt_ts; + mrtt = clib_max ((u32) (tc->mrtt_us * THZ), 1); } /* As per RFC7323 TSecr can be used for RTTM only if the segment advances * snd_una, i.e., the left side of the send window: * seq_lt (tc->snd_una, ack). This is a condition for calling update_rtt */ else if (tcp_opts_tstamp (&tc->rcv_opts) && tc->rcv_opts.tsecr) { - mrtt = tcp_time_now () - tc->rcv_opts.tsecr; + mrtt = clib_max (tcp_time_now () - tc->rcv_opts.tsecr, 1); } /* Ignore dubious measurements */ @@ -1079,12 +1080,14 @@ tcp_cc_fastrecovery_exit (tcp_connection_t * tc) tc->snd_nxt = tc->snd_una_max; tc->snd_rxt_bytes = 0; - /* HACK: since we don't have an output pacer, force slow start */ - tc->cwnd = 20 * tc->snd_mss; - tcp_fastrecovery_off (tc); tcp_fastrecovery_1_smss_off (tc); tcp_fastrecovery_first_off (tc); + + /* Update pacer because our cwnd changed. Also makes sure + * that we recompute the max burst size */ + tcp_update_pacer (tc); + TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3); } @@ -1153,8 +1156,7 @@ tcp_cc_update (tcp_connection_t * tc, vlib_buffer_t * b) ASSERT (!tcp_in_cong_recovery (tc) || tcp_is_lost_fin (tc)); /* Congestion avoidance */ - tc->cc_algo->rcv_ack (tc); - tc->tsecr_last_ack = tc->rcv_opts.tsecr; + tcp_cc_rcv_ack (tc); /* If a cumulative ack, make sure dupacks is 0 */ tc->rcv_dupacks = 0; @@ -1372,8 +1374,7 @@ partial_ack: tc->snd_nxt = tc->snd_una_max; /* Treat as congestion avoidance ack */ - tc->cc_algo->rcv_ack (tc); - tc->tsecr_last_ack = tc->rcv_opts.tsecr; + tcp_cc_rcv_ack (tc); return; } @@ -1391,8 +1392,7 @@ partial_ack: /* Post RTO timeout don't try anything fancy */ if (tcp_in_recovery (tc)) { - tc->cc_algo->rcv_ack (tc); - tc->tsecr_last_ack = tc->rcv_opts.tsecr; + tcp_cc_rcv_ack (tc); transport_add_tx_event (&tc->connection); return; } diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 2e6036b410a..f14a61263d4 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -1202,7 +1202,7 @@ tcp_push_header (tcp_connection_t * tc, vlib_buffer_t * b) /* If not tracking an ACK, start tracking */ if (tc->rtt_ts == 0 && !tcp_in_cong_recovery (tc)) { - tc->rtt_ts = tcp_time_now (); + tc->rtt_ts = tcp_time_now_us (tc->c_thread_index); tc->rtt_seq = tc->snd_nxt; } if (PREDICT_FALSE (!tcp_timer_is_active (tc, TCP_TIMER_RETRANSMIT))) |