diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/session.c | 13 | ||||
-rw-r--r-- | src/vnet/session/session.h | 5 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 76 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 18 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 48 | ||||
-rw-r--r-- | src/vnet/session/transport_types.h | 12 |
6 files changed, 123 insertions, 49 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e9cda361f37..15d949c76b5 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -145,6 +145,19 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) } } +void +sesssion_reschedule_tx (transport_connection_t * tc) +{ + session_worker_t *wrk = session_main_get_worker (tc->thread_index); + session_evt_elt_t *elt; + + ASSERT (tc->thread_index == vlib_get_thread_index ()); + + elt = session_evt_alloc_new (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; +} + static void session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e85637283a7..777984b519b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -48,14 +48,12 @@ typedef struct session_tx_context_ session_t *s; transport_proto_vft_t *transport_vft; transport_connection_t *tc; + transport_send_params_t sp; u32 max_dequeue; - u32 snd_space; u32 left_to_snd; - u32 tx_offset; u32 max_len_to_snd; u16 deq_per_first_buf; u16 deq_per_buf; - u16 snd_mss; u16 n_segs_per_evt; u8 n_bufs_per_seg; CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); @@ -429,6 +427,7 @@ void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, void *rpc_args); void session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio); +void sesssion_reschedule_tx (transport_connection_t * tc); transport_connection_t *session_get_transport (session_t * s); void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index ad24f429fbc..b1c2428874e 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -567,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, b->total_length_not_including_first_buffer = 0; chain_b = b; - left_from_seg = clib_min (ctx->snd_mss - b->current_length, + left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length, ctx->left_to_snd); to_deq = left_from_seg; for (j = 1; j < ctx->n_bufs_per_seg; j++) @@ -583,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, - ctx->tx_offset, len_to_deq, data); - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset, len_to_deq, data); + ctx->sp.tx_offset += n_bytes_read; } else { @@ -651,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { - n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset, + n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0); ASSERT (n_bytes_read > 0); /* Keep track of progress locally, transport is also supposed to * increment it independently when pushing the header */ - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset += n_bytes_read; } else { @@ -756,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { /* Offset in rx fifo from where to peek data */ - ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc); - if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue)) + if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue)) { ctx->max_len_to_snd = 0; return; } - ctx->max_dequeue -= ctx->tx_offset; + ctx->max_dequeue -= ctx->sp.tx_offset; } else { @@ -782,34 +781,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ASSERT (ctx->max_dequeue > 0); /* Ensure we're not writing more than transport window allows */ - if (ctx->max_dequeue < ctx->snd_space) + if (ctx->max_dequeue < ctx->sp.snd_space) { /* Constrained by tx queue. Try to send only fully formed segments */ - ctx->max_len_to_snd = - (ctx->max_dequeue > ctx->snd_mss) ? - ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue; + ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ? + (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) : + ctx->max_dequeue; /* TODO Nagle ? */ } else { /* Expectation is that snd_space0 is already a multiple of snd_mss */ - ctx->max_len_to_snd = ctx->snd_space; + ctx->max_len_to_snd = ctx->sp.snd_space; } /* Check if we're tx constrained by the node */ - ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss); if (ctx->n_segs_per_evt > max_segs) { ctx->n_segs_per_evt = max_segs; - ctx->max_len_to_snd = max_segs * ctx->snd_mss; + ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss; } n_bytes_per_buf = vlib_buffer_get_default_data_size (vm); ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN); - n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss; + n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss; ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); - ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); - ctx->deq_per_first_buf = clib_min (ctx->snd_mss, + ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf); + ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN); } @@ -817,12 +816,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, always_inline void session_tx_maybe_reschedule (session_worker_t * wrk, session_tx_context_t * ctx, - session_evt_elt_t * elt, u8 is_peek) + session_evt_elt_t * elt) { session_t *s = ctx->s; svm_fifo_unset_event (s->tx_fifo); - if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0)) + if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset) if (svm_fifo_set_event (s->tx_fifo)) session_evt_add_head_old (wrk, elt); } @@ -880,20 +879,23 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } } - ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - if (PREDICT_FALSE (ctx->snd_mss == 0)) - { - session_evt_add_old (wrk, elt); - return SESSION_TX_NO_DATA; - } - - ctx->snd_space = transport_connection_snd_space (ctx->tc); + transport_connection_snd_params (ctx->tc, &ctx->sp); - /* This flow queue is "empty" so it should be re-evaluated before - * the ones that have data to send. */ - if (!ctx->snd_space) + if (!ctx->sp.snd_space) { - session_evt_add_head_old (wrk, elt); + /* This flow queue is "empty" so it should be re-evaluated before + * the ones that have data to send. */ + if (PREDICT_TRUE (!ctx->sp.flags)) + session_evt_add_head_old (wrk, elt); + /* Request to postpone the session, e.g., zero-wnd and transport + * is not currently probing */ + else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE) + session_evt_add_old (wrk, elt); + /* If the deschedule flag was set, remove session from scheduler. + * Transport is responsible for rescheduling this session. */ + else + transport_connection_deschedule (ctx->tc); + return SESSION_TX_NO_DATA; } @@ -905,9 +907,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_add_head_old (wrk, elt); return SESSION_TX_NO_DATA; } - snd_space = clib_min (ctx->snd_space, snd_space); - ctx->snd_space = snd_space >= ctx->snd_mss ? - snd_space - snd_space % ctx->snd_mss : snd_space; + snd_space = clib_min (ctx->sp.snd_space, snd_space); + ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ? + snd_space - snd_space % ctx->sp.snd_mss : snd_space; } /* Check how much we can pull. */ @@ -916,7 +918,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (PREDICT_FALSE (!ctx->max_len_to_snd)) { transport_connection_tx_pacer_reset_bucket (ctx->tc, 0); - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); return SESSION_TX_NO_DATA; } @@ -1019,7 +1021,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (ctx->max_len_to_snd < ctx->max_dequeue) session_evt_add_old (wrk, elt); else - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index c8c58357afd..e27aaf3ff6b 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -103,6 +103,8 @@ format_transport_connection (u8 * s, va_list * args) indent = format_get_indent (s) + 1; s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); + s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, + tc->flags); } return s; } @@ -720,6 +722,22 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc, } void +transport_connection_reschedule (transport_connection_t * tc) +{ + tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; + if (transport_max_tx_dequeue (tc)) + sesssion_reschedule_tx (tc); + else + { + session_t *s = session_get (tc->s_index, tc->thread_index); + svm_fifo_unset_event (s->tx_fifo); + if (svm_fifo_max_dequeue_cons (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo)) + sesssion_reschedule_tx (tc); + } +} + +void transport_update_time (clib_time_type_t time_now, u8 thread_index) { transport_proto_vft_t *vft; diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index adc695f5e5a..b2be990947c 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -32,6 +32,21 @@ typedef struct _transport_options_t u8 half_open_has_fifos; } transport_options_t; +typedef enum transport_snd_flags_ +{ + TRANSPORT_SND_F_DESCHED = 1 << 0, + TRANSPORT_SND_F_POSTPONE = 1 << 1, + TRANSPORT_SND_N_FLAGS +} __clib_packed transport_snd_flags_t; + +typedef struct transport_send_params_ +{ + u32 snd_space; + u32 tx_offset; + u16 snd_mss; + transport_snd_flags_t flags; +} transport_send_params_t; + /* * Transport protocol virtual function table */ @@ -54,9 +69,8 @@ typedef struct _transport_proto_vft */ u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b); - u16 (*send_mss) (transport_connection_t * tc); - u32 (*send_space) (transport_connection_t * tc); - u32 (*tx_fifo_offset) (transport_connection_t * tc); + int (*send_params) (transport_connection_t * tconn, + transport_send_params_t *sp); void (*update_time) (f64 time_now, u8 thread_index); void (*flush_data) (transport_connection_t *tconn); int (*custom_tx) (void *session, u32 max_burst_size); @@ -151,16 +165,38 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index) } /** - * Get maximum tx burst allowed for transport connection + * Get send parameters for transport connection + * + * These include maximum tx burst, mss, tx offset and other flags + * transport might want to provide to sessin layer * * @param tc transport connection + * @param sp send paramaters + * */ static inline u32 -transport_connection_snd_space (transport_connection_t * tc) +transport_connection_snd_params (transport_connection_t * tc, + transport_send_params_t * sp) { - return tp_vfts[tc->proto].send_space (tc); + return tp_vfts[tc->proto].send_params (tc, sp); } +static inline u8 +transport_connection_is_descheduled (transport_connection_t * tc) +{ + if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED) + return 1; + return 0; +} + +static inline void +transport_connection_deschedule (transport_connection_t * tc) +{ + tc->flags |= TRANSPORT_CONNECTION_F_DESCHED; +} + +void transport_connection_reschedule (transport_connection_t * tc); + void transport_register_protocol (transport_proto_t transport_proto, const transport_proto_vft_t * vft, fib_protocol_t fib_proto, u32 output_node); diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h index 459fb0c5833..323d261ad89 100644 --- a/src/vnet/session/transport_types.h +++ b/src/vnet/session/transport_types.h @@ -43,9 +43,15 @@ typedef enum transport_service_type_ typedef enum transport_connection_flags_ { TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0, - TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup - Does not apply to local apps and - transports using the network layer (udp/tcp) */ + /** + * Don't register connection in lookup. Does not apply to local apps + * and transports using the network layer (udp/tcp) + */ + TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, + /** + * Connection descheduled by the session layer. + */ + TRANSPORT_CONNECTION_F_DESCHED = 1 << 2, } transport_connection_flags_t; typedef struct _spacer |