summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/session.c13
-rw-r--r--src/vnet/session/session.h5
-rw-r--r--src/vnet/session/session_node.c76
-rw-r--r--src/vnet/session/transport.c18
-rw-r--r--src/vnet/session/transport.h48
-rw-r--r--src/vnet/session/transport_types.h12
6 files changed, 123 insertions, 49 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index e9cda361f37..15d949c76b5 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -145,6 +145,19 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
}
}
+void
+sesssion_reschedule_tx (transport_connection_t * tc)
+{
+ session_worker_t *wrk = session_main_get_worker (tc->thread_index);
+ session_evt_elt_t *elt;
+
+ ASSERT (tc->thread_index == vlib_get_thread_index ());
+
+ elt = session_evt_alloc_new (wrk);
+ elt->evt.session_index = tc->s_index;
+ elt->evt.event_type = SESSION_IO_EVT_TX;
+}
+
static void
session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
{
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e85637283a7..777984b519b 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -48,14 +48,12 @@ typedef struct session_tx_context_
session_t *s;
transport_proto_vft_t *transport_vft;
transport_connection_t *tc;
+ transport_send_params_t sp;
u32 max_dequeue;
- u32 snd_space;
u32 left_to_snd;
- u32 tx_offset;
u32 max_len_to_snd;
u16 deq_per_first_buf;
u16 deq_per_buf;
- u16 snd_mss;
u16 n_segs_per_evt;
u8 n_bufs_per_seg;
CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
@@ -429,6 +427,7 @@ void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
void *rpc_args);
void session_add_self_custom_tx_evt (transport_connection_t * tc,
u8 has_prio);
+void sesssion_reschedule_tx (transport_connection_t * tc);
transport_connection_t *session_get_transport (session_t * s);
void session_get_endpoint (session_t * s, transport_endpoint_t * tep,
u8 is_lcl);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index ad24f429fbc..b1c2428874e 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -567,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
b->total_length_not_including_first_buffer = 0;
chain_b = b;
- left_from_seg = clib_min (ctx->snd_mss - b->current_length,
+ left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
ctx->left_to_snd);
to_deq = left_from_seg;
for (j = 1; j < ctx->n_bufs_per_seg; j++)
@@ -583,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
- ctx->tx_offset, len_to_deq, data);
- ctx->tx_offset += n_bytes_read;
+ ctx->sp.tx_offset, len_to_deq, data);
+ ctx->sp.tx_offset += n_bytes_read;
}
else
{
@@ -651,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset,
+ n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
len_to_deq, data0);
ASSERT (n_bytes_read > 0);
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
- ctx->tx_offset += n_bytes_read;
+ ctx->sp.tx_offset += n_bytes_read;
}
else
{
@@ -756,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
- ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
- if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
+ if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
{
ctx->max_len_to_snd = 0;
return;
}
- ctx->max_dequeue -= ctx->tx_offset;
+ ctx->max_dequeue -= ctx->sp.tx_offset;
}
else
{
@@ -782,34 +781,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
ASSERT (ctx->max_dequeue > 0);
/* Ensure we're not writing more than transport window allows */
- if (ctx->max_dequeue < ctx->snd_space)
+ if (ctx->max_dequeue < ctx->sp.snd_space)
{
/* Constrained by tx queue. Try to send only fully formed segments */
- ctx->max_len_to_snd =
- (ctx->max_dequeue > ctx->snd_mss) ?
- ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
+ ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
+ (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
+ ctx->max_dequeue;
/* TODO Nagle ? */
}
else
{
/* Expectation is that snd_space0 is already a multiple of snd_mss */
- ctx->max_len_to_snd = ctx->snd_space;
+ ctx->max_len_to_snd = ctx->sp.snd_space;
}
/* Check if we're tx constrained by the node */
- ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
+ ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
if (ctx->n_segs_per_evt > max_segs)
{
ctx->n_segs_per_evt = max_segs;
- ctx->max_len_to_snd = max_segs * ctx->snd_mss;
+ ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
}
n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
- n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss;
+ n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
- ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
- ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
+ ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
+ ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
n_bytes_per_buf -
TRANSPORT_MAX_HDRS_LEN);
}
@@ -817,12 +816,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
always_inline void
session_tx_maybe_reschedule (session_worker_t * wrk,
session_tx_context_t * ctx,
- session_evt_elt_t * elt, u8 is_peek)
+ session_evt_elt_t * elt)
{
session_t *s = ctx->s;
svm_fifo_unset_event (s->tx_fifo);
- if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0))
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
if (svm_fifo_set_event (s->tx_fifo))
session_evt_add_head_old (wrk, elt);
}
@@ -880,20 +879,23 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
}
}
- ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- if (PREDICT_FALSE (ctx->snd_mss == 0))
- {
- session_evt_add_old (wrk, elt);
- return SESSION_TX_NO_DATA;
- }
-
- ctx->snd_space = transport_connection_snd_space (ctx->tc);
+ transport_connection_snd_params (ctx->tc, &ctx->sp);
- /* This flow queue is "empty" so it should be re-evaluated before
- * the ones that have data to send. */
- if (!ctx->snd_space)
+ if (!ctx->sp.snd_space)
{
- session_evt_add_head_old (wrk, elt);
+ /* This flow queue is "empty" so it should be re-evaluated before
+ * the ones that have data to send. */
+ if (PREDICT_TRUE (!ctx->sp.flags))
+ session_evt_add_head_old (wrk, elt);
+ /* Request to postpone the session, e.g., zero-wnd and transport
+ * is not currently probing */
+ else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
+ session_evt_add_old (wrk, elt);
+ /* If the deschedule flag was set, remove session from scheduler.
+ * Transport is responsible for rescheduling this session. */
+ else
+ transport_connection_deschedule (ctx->tc);
+
return SESSION_TX_NO_DATA;
}
@@ -905,9 +907,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
session_evt_add_head_old (wrk, elt);
return SESSION_TX_NO_DATA;
}
- snd_space = clib_min (ctx->snd_space, snd_space);
- ctx->snd_space = snd_space >= ctx->snd_mss ?
- snd_space - snd_space % ctx->snd_mss : snd_space;
+ snd_space = clib_min (ctx->sp.snd_space, snd_space);
+ ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
+ snd_space - snd_space % ctx->sp.snd_mss : snd_space;
}
/* Check how much we can pull. */
@@ -916,7 +918,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
if (PREDICT_FALSE (!ctx->max_len_to_snd))
{
transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
- session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+ session_tx_maybe_reschedule (wrk, ctx, elt);
return SESSION_TX_NO_DATA;
}
@@ -1019,7 +1021,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
if (ctx->max_len_to_snd < ctx->max_dequeue)
session_evt_add_old (wrk, elt);
else
- session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+ session_tx_maybe_reschedule (wrk, ctx, elt);
if (!peek_data
&& ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index c8c58357afd..e27aaf3ff6b 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -103,6 +103,8 @@ format_transport_connection (u8 * s, va_list * args)
indent = format_get_indent (s) + 1;
s = format (s, "%Upacer: %U\n", format_white_space, indent,
format_transport_pacer, &tc->pacer, tc->thread_index);
+ s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent,
+ tc->flags);
}
return s;
}
@@ -720,6 +722,22 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc,
}
void
+transport_connection_reschedule (transport_connection_t * tc)
+{
+ tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+ if (transport_max_tx_dequeue (tc))
+ sesssion_reschedule_tx (tc);
+ else
+ {
+ session_t *s = session_get (tc->s_index, tc->thread_index);
+ svm_fifo_unset_event (s->tx_fifo);
+ if (svm_fifo_max_dequeue_cons (s->tx_fifo))
+ if (svm_fifo_set_event (s->tx_fifo))
+ sesssion_reschedule_tx (tc);
+ }
+}
+
+void
transport_update_time (clib_time_type_t time_now, u8 thread_index)
{
transport_proto_vft_t *vft;
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index adc695f5e5a..b2be990947c 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -32,6 +32,21 @@ typedef struct _transport_options_t
u8 half_open_has_fifos;
} transport_options_t;
+typedef enum transport_snd_flags_
+{
+ TRANSPORT_SND_F_DESCHED = 1 << 0,
+ TRANSPORT_SND_F_POSTPONE = 1 << 1,
+ TRANSPORT_SND_N_FLAGS
+} __clib_packed transport_snd_flags_t;
+
+typedef struct transport_send_params_
+{
+ u32 snd_space;
+ u32 tx_offset;
+ u16 snd_mss;
+ transport_snd_flags_t flags;
+} transport_send_params_t;
+
/*
* Transport protocol virtual function table
*/
@@ -54,9 +69,8 @@ typedef struct _transport_proto_vft
*/
u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b);
- u16 (*send_mss) (transport_connection_t * tc);
- u32 (*send_space) (transport_connection_t * tc);
- u32 (*tx_fifo_offset) (transport_connection_t * tc);
+ int (*send_params) (transport_connection_t * tconn,
+ transport_send_params_t *sp);
void (*update_time) (f64 time_now, u8 thread_index);
void (*flush_data) (transport_connection_t *tconn);
int (*custom_tx) (void *session, u32 max_burst_size);
@@ -151,16 +165,38 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index)
}
/**
- * Get maximum tx burst allowed for transport connection
+ * Get send parameters for transport connection
+ *
+ * These include maximum tx burst, mss, tx offset and other flags
+ * transport might want to provide to sessin layer
*
* @param tc transport connection
+ * @param sp send paramaters
+ *
*/
static inline u32
-transport_connection_snd_space (transport_connection_t * tc)
+transport_connection_snd_params (transport_connection_t * tc,
+ transport_send_params_t * sp)
{
- return tp_vfts[tc->proto].send_space (tc);
+ return tp_vfts[tc->proto].send_params (tc, sp);
}
+static inline u8
+transport_connection_is_descheduled (transport_connection_t * tc)
+{
+ if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED)
+ return 1;
+ return 0;
+}
+
+static inline void
+transport_connection_deschedule (transport_connection_t * tc)
+{
+ tc->flags |= TRANSPORT_CONNECTION_F_DESCHED;
+}
+
+void transport_connection_reschedule (transport_connection_t * tc);
+
void transport_register_protocol (transport_proto_t transport_proto,
const transport_proto_vft_t * vft,
fib_protocol_t fib_proto, u32 output_node);
diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h
index 459fb0c5833..323d261ad89 100644
--- a/src/vnet/session/transport_types.h
+++ b/src/vnet/session/transport_types.h
@@ -43,9 +43,15 @@ typedef enum transport_service_type_
typedef enum transport_connection_flags_
{
TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0,
- TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup
- Does not apply to local apps and
- transports using the network layer (udp/tcp) */
+ /**
+ * Don't register connection in lookup. Does not apply to local apps
+ * and transports using the network layer (udp/tcp)
+ */
+ TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1,
+ /**
+ * Connection descheduled by the session layer.
+ */
+ TRANSPORT_CONNECTION_F_DESCHED = 1 << 2,
} transport_connection_flags_t;
typedef struct _spacer