summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2020-03-13 17:54:42 +0000
committerDave Barach <openvpp@barachs.net>2020-03-19 14:46:01 +0000
commit70f879d2852dfc042ad0911a4a6e4a1714c0eb83 (patch)
treed7ea7d76b8ec034d41ead0b9ada2db18d9676670 /src/vnet/session
parent7fd59cc79c9fb0cccd0cb5c0b4579d0f0a004f6b (diff)
session tcp udp: consolidate transport snd apis
Type: improvement Use only one api to retrieve transport send parameters. Additionally, allow transports to request postponing and descheduling of events. With this, tcp now requests descheduling of sessions when the connections are stuck probing for zero snd_wnd Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I722c974f3e68fa15424c519a1fffacda43af050c
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/session.c13
-rw-r--r--src/vnet/session/session.h5
-rw-r--r--src/vnet/session/session_node.c76
-rw-r--r--src/vnet/session/transport.c18
-rw-r--r--src/vnet/session/transport.h48
-rw-r--r--src/vnet/session/transport_types.h12
6 files changed, 123 insertions, 49 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index e9cda361f37..15d949c76b5 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -145,6 +145,19 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
}
}
+void
+sesssion_reschedule_tx (transport_connection_t * tc)
+{
+ session_worker_t *wrk = session_main_get_worker (tc->thread_index);
+ session_evt_elt_t *elt;
+
+ ASSERT (tc->thread_index == vlib_get_thread_index ());
+
+ elt = session_evt_alloc_new (wrk);
+ elt->evt.session_index = tc->s_index;
+ elt->evt.event_type = SESSION_IO_EVT_TX;
+}
+
static void
session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
{
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e85637283a7..777984b519b 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -48,14 +48,12 @@ typedef struct session_tx_context_
session_t *s;
transport_proto_vft_t *transport_vft;
transport_connection_t *tc;
+ transport_send_params_t sp;
u32 max_dequeue;
- u32 snd_space;
u32 left_to_snd;
- u32 tx_offset;
u32 max_len_to_snd;
u16 deq_per_first_buf;
u16 deq_per_buf;
- u16 snd_mss;
u16 n_segs_per_evt;
u8 n_bufs_per_seg;
CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
@@ -429,6 +427,7 @@ void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
void *rpc_args);
void session_add_self_custom_tx_evt (transport_connection_t * tc,
u8 has_prio);
+void sesssion_reschedule_tx (transport_connection_t * tc);
transport_connection_t *session_get_transport (session_t * s);
void session_get_endpoint (session_t * s, transport_endpoint_t * tep,
u8 is_lcl);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index ad24f429fbc..b1c2428874e 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -567,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
b->total_length_not_including_first_buffer = 0;
chain_b = b;
- left_from_seg = clib_min (ctx->snd_mss - b->current_length,
+ left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
ctx->left_to_snd);
to_deq = left_from_seg;
for (j = 1; j < ctx->n_bufs_per_seg; j++)
@@ -583,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
- ctx->tx_offset, len_to_deq, data);
- ctx->tx_offset += n_bytes_read;
+ ctx->sp.tx_offset, len_to_deq, data);
+ ctx->sp.tx_offset += n_bytes_read;
}
else
{
@@ -651,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset,
+ n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
len_to_deq, data0);
ASSERT (n_bytes_read > 0);
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
- ctx->tx_offset += n_bytes_read;
+ ctx->sp.tx_offset += n_bytes_read;
}
else
{
@@ -756,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
- ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
- if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
+ if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
{
ctx->max_len_to_snd = 0;
return;
}
- ctx->max_dequeue -= ctx->tx_offset;
+ ctx->max_dequeue -= ctx->sp.tx_offset;
}
else
{
@@ -782,34 +781,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
ASSERT (ctx->max_dequeue > 0);
/* Ensure we're not writing more than transport window allows */
- if (ctx->max_dequeue < ctx->snd_space)
+ if (ctx->max_dequeue < ctx->sp.snd_space)
{
/* Constrained by tx queue. Try to send only fully formed segments */
- ctx->max_len_to_snd =
- (ctx->max_dequeue > ctx->snd_mss) ?
- ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
+ ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
+ (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
+ ctx->max_dequeue;
/* TODO Nagle ? */
}
else
{
/* Expectation is that snd_space0 is already a multiple of snd_mss */
- ctx->max_len_to_snd = ctx->snd_space;
+ ctx->max_len_to_snd = ctx->sp.snd_space;
}
/* Check if we're tx constrained by the node */
- ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
+ ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
if (ctx->n_segs_per_evt > max_segs)
{
ctx->n_segs_per_evt = max_segs;
- ctx->max_len_to_snd = max_segs * ctx->snd_mss;
+ ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
}
n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
- n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss;
+ n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
- ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
- ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
+ ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
+ ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
n_bytes_per_buf -
TRANSPORT_MAX_HDRS_LEN);
}
@@ -817,12 +816,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
always_inline void
session_tx_maybe_reschedule (session_worker_t * wrk,
session_tx_context_t * ctx,
- session_evt_elt_t * elt, u8 is_peek)
+ session_evt_elt_t * elt)
{
session_t *s = ctx->s;
svm_fifo_unset_event (s->tx_fifo);
- if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0))
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
if (svm_fifo_set_event (s->tx_fifo))
session_evt_add_head_old (wrk, elt);
}
@@ -880,20 +879,23 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
}
}
- ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- if (PREDICT_FALSE (ctx->snd_mss == 0))
- {
- session_evt_add_old (wrk, elt);
- return SESSION_TX_NO_DATA;
- }
-
- ctx->snd_space = transport_connection_snd_space (ctx->tc);
+ transport_connection_snd_params (ctx->tc, &ctx->sp);
- /* This flow queue is "empty" so it should be re-evaluated before
- * the ones that have data to send. */
- if (!ctx->snd_space)
+ if (!ctx->sp.snd_space)
{
- session_evt_add_head_old (wrk, elt);
+ /* This flow queue is "empty" so it should be re-evaluated before
+ * the ones that have data to send. */
+ if (PREDICT_TRUE (!ctx->sp.flags))
+ session_evt_add_head_old (wrk, elt);
+ /* Request to postpone the session, e.g., zero-wnd and transport
+ * is not currently probing */
+ else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
+ session_evt_add_old (wrk, elt);
+ /* If the deschedule flag was set, remove session from scheduler.
+ * Transport is responsible for rescheduling this session. */
+ else
+ transport_connection_deschedule (ctx->tc);
+
return SESSION_TX_NO_DATA;
}
@@ -905,9 +907,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
session_evt_add_head_old (wrk, elt);
return SESSION_TX_NO_DATA;
}
- snd_space = clib_min (ctx->snd_space, snd_space);
- ctx->snd_space = snd_space >= ctx->snd_mss ?
- snd_space - snd_space % ctx->snd_mss : snd_space;
+ snd_space = clib_min (ctx->sp.snd_space, snd_space);
+ ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
+ snd_space - snd_space % ctx->sp.snd_mss : snd_space;
}
/* Check how much we can pull. */
@@ -916,7 +918,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
if (PREDICT_FALSE (!ctx->max_len_to_snd))
{
transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
- session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+ session_tx_maybe_reschedule (wrk, ctx, elt);
return SESSION_TX_NO_DATA;
}
@@ -1019,7 +1021,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
if (ctx->max_len_to_snd < ctx->max_dequeue)
session_evt_add_old (wrk, elt);
else
- session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+ session_tx_maybe_reschedule (wrk, ctx, elt);
if (!peek_data
&& ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index c8c58357afd..e27aaf3ff6b 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -103,6 +103,8 @@ format_transport_connection (u8 * s, va_list * args)
indent = format_get_indent (s) + 1;
s = format (s, "%Upacer: %U\n", format_white_space, indent,
format_transport_pacer, &tc->pacer, tc->thread_index);
+ s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent,
+ tc->flags);
}
return s;
}
@@ -720,6 +722,22 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc,
}
void
+transport_connection_reschedule (transport_connection_t * tc)
+{
+ tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+ if (transport_max_tx_dequeue (tc))
+ sesssion_reschedule_tx (tc);
+ else
+ {
+ session_t *s = session_get (tc->s_index, tc->thread_index);
+ svm_fifo_unset_event (s->tx_fifo);
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo))
+ if (svm_fifo_set_event (s->tx_fifo))
+ sesssion_reschedule_tx (tc);
+ }
+}
+
+void
transport_update_time (clib_time_type_t time_now, u8 thread_index)
{
transport_proto_vft_t *vft;
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index adc695f5e5a..b2be990947c 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -32,6 +32,21 @@ typedef struct _transport_options_t
u8 half_open_has_fifos;
} transport_options_t;
+typedef enum transport_snd_flags_
+{
+ TRANSPORT_SND_F_DESCHED = 1 << 0,
+ TRANSPORT_SND_F_POSTPONE = 1 << 1,
+ TRANSPORT_SND_N_FLAGS
+} __clib_packed transport_snd_flags_t;
+
+typedef struct transport_send_params_
+{
+ u32 snd_space;
+ u32 tx_offset;
+ u16 snd_mss;
+ transport_snd_flags_t flags;
+} transport_send_params_t;
+
/*
* Transport protocol virtual function table
*/
@@ -54,9 +69,8 @@ typedef struct _transport_proto_vft
*/
u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b);
- u16 (*send_mss) (transport_connection_t * tc);
- u32 (*send_space) (transport_connection_t * tc);
- u32 (*tx_fifo_offset) (transport_connection_t * tc);
+ int (*send_params) (transport_connection_t * tconn,
+ transport_send_params_t *sp);
void (*update_time) (f64 time_now, u8 thread_index);
void (*flush_data) (transport_connection_t *tconn);
int (*custom_tx) (void *session, u32 max_burst_size);
@@ -151,16 +165,38 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index)
}
/**
- * Get maximum tx burst allowed for transport connection
+ * Get send parameters for transport connection
+ *
+ * These include maximum tx burst, mss, tx offset and other flags
+ * transport might want to provide to sessin layer
*
* @param tc transport connection
+ * @param sp send paramaters
+ *
*/
static inline u32
-transport_connection_snd_space (transport_connection_t * tc)
+transport_connection_snd_params (transport_connection_t * tc,
+ transport_send_params_t * sp)
{
- return tp_vfts[tc->proto].send_space (tc);
+ return tp_vfts[tc->proto].send_params (tc, sp);
}
+static inline u8
+transport_connection_is_descheduled (transport_connection_t * tc)
+{
+ if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED)
+ return 1;
+ return 0;
+}
+
+static inline void
+transport_connection_deschedule (transport_connection_t * tc)
+{
+ tc->flags |= TRANSPORT_CONNECTION_F_DESCHED;
+}
+
+void transport_connection_reschedule (transport_connection_t * tc);
+
void transport_register_protocol (transport_proto_t transport_proto,
const transport_proto_vft_t * vft,
fib_protocol_t fib_proto, u32 output_node);
diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h
index 459fb0c5833..323d261ad89 100644
--- a/src/vnet/session/transport_types.h
+++ b/src/vnet/session/transport_types.h
@@ -43,9 +43,15 @@ typedef enum transport_service_type_
typedef enum transport_connection_flags_
{
TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0,
- TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup
- Does not apply to local apps and
- transports using the network layer (udp/tcp) */
+ /**
+ * Don't register connection in lookup. Does not apply to local apps
+ * and transports using the network layer (udp/tcp)
+ */
+ TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1,
+ /**
+ * Connection descheduled by the session layer.
+ */
+ TRANSPORT_CONNECTION_F_DESCHED = 1 << 2,
} transport_connection_flags_t;
typedef struct _spacer