diff options
-rw-r--r-- | src/vnet/session/session.c | 13 | ||||
-rw-r--r-- | src/vnet/session/session.h | 5 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 76 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 18 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 48 | ||||
-rw-r--r-- | src/vnet/session/transport_types.h | 12 | ||||
-rw-r--r-- | src/vnet/tcp/tcp.c | 65 | ||||
-rw-r--r-- | src/vnet/tcp/tcp.h | 2 | ||||
-rwxr-xr-x | src/vnet/tcp/tcp_input.c | 4 | ||||
-rw-r--r-- | src/vnet/tcp/tcp_output.c | 10 | ||||
-rw-r--r-- | src/vnet/udp/udp.c | 25 |
11 files changed, 175 insertions, 103 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e9cda361f37..15d949c76b5 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -145,6 +145,19 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) } } +void +sesssion_reschedule_tx (transport_connection_t * tc) +{ + session_worker_t *wrk = session_main_get_worker (tc->thread_index); + session_evt_elt_t *elt; + + ASSERT (tc->thread_index == vlib_get_thread_index ()); + + elt = session_evt_alloc_new (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; +} + static void session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e85637283a7..777984b519b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -48,14 +48,12 @@ typedef struct session_tx_context_ session_t *s; transport_proto_vft_t *transport_vft; transport_connection_t *tc; + transport_send_params_t sp; u32 max_dequeue; - u32 snd_space; u32 left_to_snd; - u32 tx_offset; u32 max_len_to_snd; u16 deq_per_first_buf; u16 deq_per_buf; - u16 snd_mss; u16 n_segs_per_evt; u8 n_bufs_per_seg; CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); @@ -429,6 +427,7 @@ void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, void *rpc_args); void session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio); +void sesssion_reschedule_tx (transport_connection_t * tc); transport_connection_t *session_get_transport (session_t * s); void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index ad24f429fbc..b1c2428874e 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -567,7 +567,7 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, b->total_length_not_including_first_buffer = 0; chain_b = b; - left_from_seg = clib_min (ctx->snd_mss - b->current_length, + left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length, ctx->left_to_snd); to_deq = left_from_seg; for (j = 1; j < ctx->n_bufs_per_seg; j++) @@ -583,8 +583,8 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, - ctx->tx_offset, len_to_deq, data); - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset, len_to_deq, data); + ctx->sp.tx_offset += n_bytes_read; } else { @@ -651,12 +651,12 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { - n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset, + n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset, len_to_deq, data0); ASSERT (n_bytes_read > 0); /* Keep track of progress locally, transport is also supposed to * increment it independently when pushing the header */ - ctx->tx_offset += n_bytes_read; + ctx->sp.tx_offset += n_bytes_read; } else { @@ -756,13 +756,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, if (peek_data) { /* Offset in rx fifo from where to peek data */ - ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc); - if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue)) + if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue)) { ctx->max_len_to_snd = 0; return; } - ctx->max_dequeue -= ctx->tx_offset; + ctx->max_dequeue -= ctx->sp.tx_offset; } else { @@ -782,34 +781,34 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ASSERT (ctx->max_dequeue > 0); /* Ensure we're not writing more than transport window allows */ - if (ctx->max_dequeue < ctx->snd_space) + if (ctx->max_dequeue < ctx->sp.snd_space) { /* Constrained by tx queue. Try to send only fully formed segments */ - ctx->max_len_to_snd = - (ctx->max_dequeue > ctx->snd_mss) ? - ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue; + ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ? + (ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) : + ctx->max_dequeue; /* TODO Nagle ? */ } else { /* Expectation is that snd_space0 is already a multiple of snd_mss */ - ctx->max_len_to_snd = ctx->snd_space; + ctx->max_len_to_snd = ctx->sp.snd_space; } /* Check if we're tx constrained by the node */ - ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss); if (ctx->n_segs_per_evt > max_segs) { ctx->n_segs_per_evt = max_segs; - ctx->max_len_to_snd = max_segs * ctx->snd_mss; + ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss; } n_bytes_per_buf = vlib_buffer_get_default_data_size (vm); ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN); - n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss; + n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss; ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); - ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); - ctx->deq_per_first_buf = clib_min (ctx->snd_mss, + ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf); + ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN); } @@ -817,12 +816,12 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, always_inline void session_tx_maybe_reschedule (session_worker_t * wrk, session_tx_context_t * ctx, - session_evt_elt_t * elt, u8 is_peek) + session_evt_elt_t * elt) { session_t *s = ctx->s; svm_fifo_unset_event (s->tx_fifo); - if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0)) + if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset) if (svm_fifo_set_event (s->tx_fifo)) session_evt_add_head_old (wrk, elt); } @@ -880,20 +879,23 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } } - ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc); - if (PREDICT_FALSE (ctx->snd_mss == 0)) - { - session_evt_add_old (wrk, elt); - return SESSION_TX_NO_DATA; - } - - ctx->snd_space = transport_connection_snd_space (ctx->tc); + transport_connection_snd_params (ctx->tc, &ctx->sp); - /* This flow queue is "empty" so it should be re-evaluated before - * the ones that have data to send. */ - if (!ctx->snd_space) + if (!ctx->sp.snd_space) { - session_evt_add_head_old (wrk, elt); + /* This flow queue is "empty" so it should be re-evaluated before + * the ones that have data to send. */ + if (PREDICT_TRUE (!ctx->sp.flags)) + session_evt_add_head_old (wrk, elt); + /* Request to postpone the session, e.g., zero-wnd and transport + * is not currently probing */ + else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE) + session_evt_add_old (wrk, elt); + /* If the deschedule flag was set, remove session from scheduler. + * Transport is responsible for rescheduling this session. */ + else + transport_connection_deschedule (ctx->tc); + return SESSION_TX_NO_DATA; } @@ -905,9 +907,9 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_add_head_old (wrk, elt); return SESSION_TX_NO_DATA; } - snd_space = clib_min (ctx->snd_space, snd_space); - ctx->snd_space = snd_space >= ctx->snd_mss ? - snd_space - snd_space % ctx->snd_mss : snd_space; + snd_space = clib_min (ctx->sp.snd_space, snd_space); + ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ? + snd_space - snd_space % ctx->sp.snd_mss : snd_space; } /* Check how much we can pull. */ @@ -916,7 +918,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (PREDICT_FALSE (!ctx->max_len_to_snd)) { transport_connection_tx_pacer_reset_bucket (ctx->tc, 0); - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); return SESSION_TX_NO_DATA; } @@ -1019,7 +1021,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (ctx->max_len_to_snd < ctx->max_dequeue) session_evt_add_old (wrk, elt); else - session_tx_maybe_reschedule (wrk, ctx, elt, peek_data); + session_tx_maybe_reschedule (wrk, ctx, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index c8c58357afd..e27aaf3ff6b 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -103,6 +103,8 @@ format_transport_connection (u8 * s, va_list * args) indent = format_get_indent (s) + 1; s = format (s, "%Upacer: %U\n", format_white_space, indent, format_transport_pacer, &tc->pacer, tc->thread_index); + s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent, + tc->flags); } return s; } @@ -720,6 +722,22 @@ transport_connection_tx_pacer_update_bytes (transport_connection_t * tc, } void +transport_connection_reschedule (transport_connection_t * tc) +{ + tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; + if (transport_max_tx_dequeue (tc)) + sesssion_reschedule_tx (tc); + else + { + session_t *s = session_get (tc->s_index, tc->thread_index); + svm_fifo_unset_event (s->tx_fifo); + if (svm_fifo_max_dequeue_cons (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo)) + sesssion_reschedule_tx (tc); + } +} + +void transport_update_time (clib_time_type_t time_now, u8 thread_index) { transport_proto_vft_t *vft; diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index adc695f5e5a..b2be990947c 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -32,6 +32,21 @@ typedef struct _transport_options_t u8 half_open_has_fifos; } transport_options_t; +typedef enum transport_snd_flags_ +{ + TRANSPORT_SND_F_DESCHED = 1 << 0, + TRANSPORT_SND_F_POSTPONE = 1 << 1, + TRANSPORT_SND_N_FLAGS +} __clib_packed transport_snd_flags_t; + +typedef struct transport_send_params_ +{ + u32 snd_space; + u32 tx_offset; + u16 snd_mss; + transport_snd_flags_t flags; +} transport_send_params_t; + /* * Transport protocol virtual function table */ @@ -54,9 +69,8 @@ typedef struct _transport_proto_vft */ u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b); - u16 (*send_mss) (transport_connection_t * tc); - u32 (*send_space) (transport_connection_t * tc); - u32 (*tx_fifo_offset) (transport_connection_t * tc); + int (*send_params) (transport_connection_t * tconn, + transport_send_params_t *sp); void (*update_time) (f64 time_now, u8 thread_index); void (*flush_data) (transport_connection_t *tconn); int (*custom_tx) (void *session, u32 max_burst_size); @@ -151,16 +165,38 @@ transport_app_rx_evt (transport_proto_t tp, u32 conn_index, u32 thread_index) } /** - * Get maximum tx burst allowed for transport connection + * Get send parameters for transport connection + * + * These include maximum tx burst, mss, tx offset and other flags + * transport might want to provide to sessin layer * * @param tc transport connection + * @param sp send paramaters + * */ static inline u32 -transport_connection_snd_space (transport_connection_t * tc) +transport_connection_snd_params (transport_connection_t * tc, + transport_send_params_t * sp) { - return tp_vfts[tc->proto].send_space (tc); + return tp_vfts[tc->proto].send_params (tc, sp); } +static inline u8 +transport_connection_is_descheduled (transport_connection_t * tc) +{ + if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED) + return 1; + return 0; +} + +static inline void +transport_connection_deschedule (transport_connection_t * tc) +{ + tc->flags |= TRANSPORT_CONNECTION_F_DESCHED; +} + +void transport_connection_reschedule (transport_connection_t * tc); + void transport_register_protocol (transport_proto_t transport_proto, const transport_proto_vft_t * vft, fib_protocol_t fib_proto, u32 output_node); diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h index 459fb0c5833..323d261ad89 100644 --- a/src/vnet/session/transport_types.h +++ b/src/vnet/session/transport_types.h @@ -43,9 +43,15 @@ typedef enum transport_service_type_ typedef enum transport_connection_flags_ { TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0, - TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup - Does not apply to local apps and - transports using the network layer (udp/tcp) */ + /** + * Don't register connection in lookup. Does not apply to local apps + * and transports using the network layer (udp/tcp) + */ + TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, + /** + * Connection descheduled by the session layer. + */ + TRANSPORT_CONNECTION_F_DESCHED = 1 << 2, } transport_connection_flags_t; typedef struct _spacer diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 7712ade4416..4a0ffc137e5 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -1195,29 +1195,6 @@ tcp_session_cal_goal_size (tcp_connection_t * tc) return goal_size > tc->snd_mss ? goal_size : tc->snd_mss; } -/** - * Compute maximum segment size for session layer. - * - * Since the result needs to be the actual data length, it first computes - * the tcp options to be used in the next burst and subtracts their - * length from the connection's snd_mss. - */ -static u16 -tcp_session_send_mss (transport_connection_t * trans_conn) -{ - tcp_connection_t *tc = (tcp_connection_t *) trans_conn; - - /* Ensure snd_mss does accurately reflect the amount of data we can push - * in a segment. This also makes sure that options are updated according to - * the current state of the connection. */ - tcp_update_burst_snd_vars (tc); - - if (PREDICT_FALSE (tc->cfg_flags & TCP_CFG_F_TSO)) - return tcp_session_cal_goal_size (tc); - - return tc->snd_mss; -} - always_inline u32 tcp_round_snd_space (tcp_connection_t * tc, u32 snd_space) { @@ -1281,23 +1258,39 @@ tcp_snd_space (tcp_connection_t * tc) return tcp_snd_space_inline (tc); } -static u32 -tcp_session_send_space (transport_connection_t * trans_conn) +static int +tcp_session_send_params (transport_connection_t * trans_conn, + transport_send_params_t * sp) { tcp_connection_t *tc = (tcp_connection_t *) trans_conn; - return clib_min (tcp_snd_space_inline (tc), - tc->snd_wnd - (tc->snd_nxt - tc->snd_una)); -} -static u32 -tcp_session_tx_fifo_offset (transport_connection_t * trans_conn) -{ - tcp_connection_t *tc = (tcp_connection_t *) trans_conn; + /* Ensure snd_mss does accurately reflect the amount of data we can push + * in a segment. This also makes sure that options are updated according to + * the current state of the connection. */ + tcp_update_burst_snd_vars (tc); - ASSERT (seq_geq (tc->snd_nxt, tc->snd_una)); + if (PREDICT_FALSE (tc->cfg_flags & TCP_CFG_F_TSO)) + sp->snd_mss = tcp_session_cal_goal_size (tc); + else + sp->snd_mss = tc->snd_mss; + + sp->snd_space = clib_min (tcp_snd_space_inline (tc), + tc->snd_wnd - (tc->snd_nxt - tc->snd_una)); + ASSERT (seq_geq (tc->snd_nxt, tc->snd_una)); /* This still works if fast retransmit is on */ - return (tc->snd_nxt - tc->snd_una); + sp->tx_offset = tc->snd_nxt - tc->snd_una; + + sp->flags = 0; + if (!tc->snd_wnd) + { + if (tcp_timer_is_active (tc, TCP_TIMER_PERSIST)) + sp->flags = TRANSPORT_SND_F_DESCHED; + else + sp->flags = TRANSPORT_SND_F_POSTPONE; + } + + return 0; } static void @@ -1509,10 +1502,8 @@ const static transport_proto_vft_t tcp_proto = { .close = tcp_session_close, .cleanup = tcp_session_cleanup, .reset = tcp_session_reset, - .send_mss = tcp_session_send_mss, - .send_space = tcp_session_send_space, + .send_params = tcp_session_send_params, .update_time = tcp_update_time, - .tx_fifo_offset = tcp_session_tx_fifo_offset, .flush_data = tcp_session_flush_data, .custom_tx = tcp_session_custom_tx, .format_connection = format_tcp_session, diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index 6094ac5110a..8fa9013e31a 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -1244,6 +1244,8 @@ tcp_persist_timer_update (tcp_connection_t * tc) always_inline void tcp_persist_timer_reset (tcp_connection_t * tc) { + if (transport_connection_is_descheduled (&tc->connection)) + transport_connection_reschedule (&tc->connection); tcp_timer_reset (tc, TCP_TIMER_PERSIST); } diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index 0e8e68b1339..4f31d21c3c1 100755 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -1312,7 +1312,9 @@ tcp_update_snd_wnd (tcp_connection_t * tc, u32 seq, u32 ack, u32 snd_wnd) } else { - tcp_persist_timer_reset (tc); + if (PREDICT_FALSE (tcp_timer_is_active (tc, TCP_TIMER_PERSIST))) + tcp_persist_timer_reset (tc); + if (PREDICT_FALSE (!tcp_in_recovery (tc) && tc->rto_boff > 0)) { tc->rto_boff = 0; diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 6ed478fd1bc..b77713e1538 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -1637,7 +1637,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) /* Problem already solved or worse */ if (tc->state == TCP_STATE_CLOSED || tc->snd_wnd > tc->snd_mss || (tc->flags & TCP_CONN_FINSNT)) - return; + goto update_scheduler; available_bytes = transport_max_tx_dequeue (&tc->connection); offset = tc->snd_nxt - tc->snd_una; @@ -1651,7 +1651,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) } if (available_bytes <= offset) - return; + goto update_scheduler; /* Increment RTO backoff */ tc->rto_boff += 1; @@ -1665,6 +1665,7 @@ tcp_timer_persist_handler (tcp_connection_t * tc) tcp_persist_timer_set (tc); return; } + b = vlib_get_buffer (vm, bi); data = tcp_init_buffer (vm, b); @@ -1693,6 +1694,11 @@ tcp_timer_persist_handler (tcp_connection_t * tc) /* Just sent new data, enable retransmit */ tcp_retransmit_timer_update (tc); + +update_scheduler: + + if (transport_connection_is_descheduled (&tc->connection)) + transport_connection_reschedule (&tc->connection); } /** diff --git a/src/vnet/udp/udp.c b/src/vnet/udp/udp.c index 34cebec93d2..109f4683e7f 100644 --- a/src/vnet/udp/udp.c +++ b/src/vnet/udp/udp.c @@ -273,18 +273,17 @@ format_udp_listener_session (u8 * s, va_list * args) return format (s, "%U", format_udp_connection, uc, verbose); } -u16 -udp_send_mss (transport_connection_t * t) -{ - /* TODO figure out MTU of output interface */ - return 1460; -} - -u32 -udp_send_space (transport_connection_t * t) +static int +udp_session_send_params (transport_connection_t * tconn, + transport_send_params_t * sp) { /* No constraint on TX window */ - return ~0; + sp->snd_space = ~0; + /* TODO figure out MTU of output interface */ + sp->snd_mss = 1460; + sp->tx_offset = 0; + sp->flags = 0; + return 0; } int @@ -357,8 +356,7 @@ static const transport_proto_vft_t udp_proto = { .get_half_open = udp_session_get_half_open, .close = udp_session_close, .cleanup = udp_session_cleanup, - .send_mss = udp_send_mss, - .send_space = udp_send_space, + .send_params = udp_session_send_params, .format_connection = format_udp_session, .format_half_open = format_udp_half_open_session, .format_listener = format_udp_listener_session, @@ -412,8 +410,7 @@ static const transport_proto_vft_t udpc_proto = { .get_half_open = udp_session_get_half_open, .close = udp_session_close, .cleanup = udp_session_cleanup, - .send_mss = udp_send_mss, - .send_space = udp_send_space, + .send_params = udp_session_send_params, .format_connection = format_udp_session, .format_half_open = format_udp_half_open_session, .format_listener = format_udp_listener_session, |