summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/session.c7
-rw-r--r--src/vnet/session/session.h18
-rw-r--r--src/vnet/session/session_node.c16
-rw-r--r--src/vnet/session/transport.c111
-rw-r--r--src/vnet/session/transport.h22
-rw-r--r--src/vnet/session/transport_interface.h61
6 files changed, 233 insertions, 2 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 189c5375fbb..1d421b978c1 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1355,6 +1355,8 @@ session_manager_main_enable (vlib_main_t * vm)
vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
vec_validate (smm->peekers_rw_locks, num_threads - 1);
+ vec_validate (smm->dispatch_period, num_threads - 1);
+ vec_validate (smm->last_vlib_time, num_threads - 1);
vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
@@ -1373,6 +1375,9 @@ session_manager_main_enable (vlib_main_t * vm)
_vec_len (smm->pending_event_vector[i]) = 0;
vec_validate (smm->pending_disconnects[i], 0);
_vec_len (smm->pending_disconnects[i]) = 0;
+
+ smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+
if (num_threads > 1)
clib_rwlock_init (&smm->peekers_rw_locks[i]);
}
@@ -1419,7 +1424,7 @@ session_manager_main_enable (vlib_main_t * vm)
/* Enable transports */
transport_enable_disable (vm, 1);
-
+ transport_init_tx_pacers_period ();
return 0;
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 914e0581fec..f0aa36cc1bc 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -215,6 +215,12 @@ struct _session_manager_main
/** per-worker session context */
session_tx_context_t *ctx;
+ /** Our approximation of a "complete" dispatch loop period */
+ f64 *dispatch_period;
+
+ /** vlib_time_now last time around the track */
+ f64 *last_vlib_time;
+
/** vpp fifo event queue */
svm_msg_q_t **vpp_event_queues;
@@ -494,6 +500,18 @@ transport_tx_fifo_size (transport_connection_t * tc)
return s->server_tx_fifo->nitems;
}
+always_inline f64
+transport_dispatch_period (u32 thread_index)
+{
+ return session_manager_main.dispatch_period[thread_index];
+}
+
+always_inline f64
+transport_time_now (u32 thread_index)
+{
+ return session_manager_main.last_vlib_time[thread_index];
+}
+
always_inline u32
session_get_index (stream_session_t * s)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index c1aea6723b6..eb974397165 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -568,7 +568,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
ctx->transport_vft = transport_protocol_get_vft (tp);
ctx->tc = session_tx_get_transport (ctx, peek_data);
ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- ctx->snd_space = ctx->transport_vft->send_space (ctx->tc);
+ ctx->snd_space =
+ transport_connection_max_tx_burst (ctx->tc, vm->clib_time.last_cpu_time);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
vec_add1 (smm->pending_event_vector[thread_index], *e);
@@ -685,6 +686,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
_vec_len (smm->tx_buffers[thread_index]) = n_bufs;
*n_tx_packets += ctx->n_segs_per_evt;
+ transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd);
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
/* If we couldn't dequeue all bytes mark as partially read */
@@ -743,6 +745,17 @@ session_event_get_session (session_event_t * e, u8 thread_index)
return session_get_if_valid (e->fifo->master_session_index, thread_index);
}
+static void
+session_update_dispatch_period (session_manager_main_t * smm, f64 now,
+ u32 thread_index)
+{
+ f64 sample, prev_period = smm->dispatch_period[thread_index], a = 0.8;
+
+ sample = now - smm->last_vlib_time[thread_index];
+ smm->dispatch_period[thread_index] = a * sample + (1 - a) * prev_period;
+ smm->last_vlib_time[thread_index] = now;
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -764,6 +777,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/*
* Update transport time
*/
+ session_update_dispatch_period (smm, now, thread_index);
transport_update_time (now, thread_index);
/*
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index d74a218a5bf..c333c4161d4 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -42,6 +42,13 @@ static transport_endpoint_t *local_endpoints;
*/
static clib_spinlock_t local_endpoints_lock;
+/*
+ * Period used by transport pacers. Initialized by session layer
+ */
+static double transport_pacer_period;
+
+#define TRANSPORT_PACER_MIN_MSS 1460
+
u8 *
format_transport_proto (u8 * s, va_list * args)
{
@@ -376,6 +383,110 @@ transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt,
return 0;
}
+#define SPACER_CPU_TICKS_PER_PERIOD_SHIFT 10
+#define SPACER_CPU_TICKS_PER_PERIOD (1 << SPACER_CPU_TICKS_PER_PERIOD_SHIFT)
+
+u8 *
+format_transport_pacer (u8 * s, va_list * args)
+{
+ spacer_t *pacer = va_arg (*args, spacer_t *);
+
+ s = format (s, "bucket %u max_burst %u tokens/period %.3f last_update %x",
+ pacer->bucket, pacer->max_burst_size, pacer->tokens_per_period,
+ pacer->last_update);
+ return s;
+}
+
+static inline u32
+spacer_max_burst (spacer_t * pacer, u64 norm_time_now)
+{
+ u64 n_periods = norm_time_now - pacer->last_update;
+
+ pacer->last_update = norm_time_now;
+ pacer->bucket += n_periods * pacer->tokens_per_period;
+ return clib_min (pacer->bucket, pacer->max_burst_size);
+}
+
+static inline void
+spacer_update_bucket (spacer_t * pacer, u32 bytes)
+{
+ ASSERT (pacer->bucket >= bytes);
+ pacer->bucket -= bytes;
+}
+
+static inline void
+spacer_update_max_burst_size (spacer_t * pacer, u32 max_burst_bytes)
+{
+ pacer->max_burst_size = clib_max (max_burst_bytes, TRANSPORT_PACER_MIN_MSS);
+}
+
+static inline void
+spacer_set_pace_rate (spacer_t * pacer, u64 rate_bytes_per_sec)
+{
+ ASSERT (rate_bytes_per_sec != 0);
+ pacer->tokens_per_period = rate_bytes_per_sec / transport_pacer_period;
+}
+
+void
+transport_connection_tx_pacer_init (transport_connection_t * tc,
+ u32 rate_bytes_per_sec, u32 burst_bytes)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ u64 time_now = vm->clib_time.last_cpu_time;
+ spacer_t *pacer = &tc->pacer;
+
+ tc->flags |= TRANSPORT_CONNECTION_F_IS_TX_PACED;
+ spacer_update_max_burst_size (&tc->pacer, burst_bytes);
+ spacer_set_pace_rate (&tc->pacer, rate_bytes_per_sec);
+ pacer->last_update = time_now >> SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+ pacer->bucket = burst_bytes;
+}
+
+void
+transport_connection_tx_pacer_update (transport_connection_t * tc,
+ u64 bytes_per_sec)
+{
+ u32 burst_size;
+
+ burst_size = bytes_per_sec * transport_dispatch_period (tc->thread_index);
+ spacer_set_pace_rate (&tc->pacer, bytes_per_sec);
+ spacer_update_max_burst_size (&tc->pacer, burst_size);
+}
+
+u32
+transport_connection_max_tx_burst (transport_connection_t * tc, u64 time_now)
+{
+ u32 snd_space, max_paced_burst;
+ u32 mss;
+
+ snd_space = tp_vfts[tc->proto].send_space (tc);
+ if (transport_connection_is_tx_paced (tc))
+ {
+ time_now >>= SPACER_CPU_TICKS_PER_PERIOD_SHIFT;
+ max_paced_burst = spacer_max_burst (&tc->pacer, time_now);
+ mss = tp_vfts[tc->proto].send_mss (tc);
+ max_paced_burst = (max_paced_burst < mss) ? 0 : max_paced_burst;
+ snd_space = clib_min (snd_space, max_paced_burst);
+ snd_space = snd_space - snd_space % mss;
+ }
+ return snd_space;
+}
+
+void
+transport_connection_update_tx_stats (transport_connection_t * tc, u32 bytes)
+{
+ tc->stats.tx_bytes += bytes;
+ if (transport_connection_is_tx_paced (tc))
+ spacer_update_bucket (&tc->pacer, bytes);
+}
+
+void
+transport_init_tx_pacers_period (void)
+{
+ f64 cpu_freq = os_cpu_clock_frequency ();
+ transport_pacer_period = cpu_freq / SPACER_CPU_TICKS_PER_PERIOD;
+}
+
void
transport_update_time (f64 time_now, u8 thread_index)
{
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index e29f3ca9557..07366692c3e 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -23,6 +23,19 @@
/*
* Protocol independent transport properties associated to a session
*/
+typedef struct _transport_stats
+{
+ u64 tx_bytes;
+} transport_stats_t;
+
+typedef struct _spacer
+{
+ u64 bucket;
+ u32 max_burst_size;
+ f32 tokens_per_period;
+ u64 last_update;
+} spacer_t;
+
typedef struct _transport_connection
{
/** Connection ID */
@@ -54,6 +67,10 @@ typedef struct _transport_connection
/*fib_node_index_t rmt_fei;
dpo_id_t rmt_dpo; */
+ u8 flags; /**< Transport specific flags */
+ transport_stats_t stats; /**< Transport connection stats */
+ spacer_t pacer; /**< Simple transport pacer */
+
#if TRANSPORT_DEBUG
elog_track_t elog_track; /**< Event logging */
u32 cc_stat_tstamp; /**< CC stats timestamp */
@@ -79,8 +96,13 @@ typedef struct _transport_connection
#define c_rmt_fei connection.rmt_fei
#define c_rmt_dpo connection.rmt_dpo
#define c_opaque_id connection.opaque_conn_id
+#define c_stats connection.stats
+#define c_pacer connection.pacer
+#define c_flags connection.flags
} transport_connection_t;
+#define TRANSPORT_CONNECTION_F_IS_TX_PACED 1 << 0
+
typedef enum _transport_proto
{
TRANSPORT_PROTO_TCP,
diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h
index 745a7db444c..ec9bd43e30f 100644
--- a/src/vnet/session/transport_interface.h
+++ b/src/vnet/session/transport_interface.h
@@ -102,6 +102,67 @@ transport_tx_fn_type_t transport_protocol_tx_fn_type (transport_proto_t tp);
void transport_update_time (f64 time_now, u8 thread_index);
void transport_enable_disable (vlib_main_t * vm, u8 is_en);
+/**
+ * Initialize tx pacer for connection
+ *
+ * @param tc transport connection
+ * @param rate_bytes_per_second initial byte rate
+ * @param burst_bytes initial burst size in bytes
+ */
+void transport_connection_tx_pacer_init (transport_connection_t * tc,
+ u32 rate_bytes_per_sec,
+ u32 burst_bytes);
+
+/**
+ * Update tx pacer pacing rate
+ *
+ * @param tc transport connection
+ * @param bytes_per_sec new pacing rate
+ */
+void transport_connection_tx_pacer_update (transport_connection_t * tc,
+ u64 bytes_per_sec);
+
+/**
+ * Get maximum tx burst allowed for transport connection
+ *
+ * @param tc transport connection
+ * @param time_now current cpu time as returned by @ref clib_cpu_time_now
+ */
+u32 transport_connection_max_tx_burst (transport_connection_t * tc,
+ u64 time_now);
+
+/**
+ * Initialize period for tx pacers
+ *
+ * Defines a unit of time with respect to number of cpu cycles that is to
+ * be used by all tx pacers.
+ */
+void transport_init_tx_pacers_period (void);
+
+/**
+ * Check if transport connection is paced
+ */
+always_inline u8
+transport_connection_is_tx_paced (transport_connection_t * tc)
+{
+ return (tc->flags & TRANSPORT_CONNECTION_F_IS_TX_PACED);
+}
+
+u8 *format_transport_pacer (u8 * s, va_list * args);
+
+/**
+ * Update tx byte stats for transport connection
+ *
+ * If tx pacing is enabled, this also updates pacer bucket to account for the
+ * amount of bytes that have been sent.
+ *
+ * @param tc transport connection
+ * @param pkts packets recently sent
+ * @param bytes bytes recently sent
+ */
+void transport_connection_update_tx_stats (transport_connection_t * tc,
+ u32 bytes);
+
#endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */
/*