From 26dd6de91b4d36ac04154c7eb6339684db6684a0 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Tue, 23 Jul 2019 23:54:47 -0700 Subject: session tcp: handle rxt and acks as custom events Type: feature Control ack generation and retransmissions with session layer scheduler. Change-Id: Iacdf9f84ab81f44851980aa45a83e75f29be2b7b Signed-off-by: Florin Coras --- src/plugins/quic/quic.c | 2 +- src/plugins/unittest/session_test.c | 3 +- src/vnet/session/application_local.c | 2 +- src/vnet/session/session.c | 25 ++++++ src/vnet/session/session.h | 22 +++++ src/vnet/session/session_node.c | 23 ++++- src/vnet/session/session_types.h | 1 + src/vnet/session/transport.h | 6 +- src/vnet/tcp/tcp.c | 11 +-- src/vnet/tcp/tcp.h | 24 ++--- src/vnet/tcp/tcp_input.c | 109 ++++------------------- src/vnet/tcp/tcp_output.c | 167 ++++++++++++++++++++++++----------- src/vnet/tls/tls.c | 2 +- 13 files changed, 212 insertions(+), 185 deletions(-) diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index 8bb94221002..435113a8cd2 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -2100,7 +2100,7 @@ quic_custom_app_rx_callback (transport_connection_t * tc) } static int -quic_custom_tx_callback (void *s) +quic_custom_tx_callback (void *s, u32 max_burst_size) { session_t *stream_session = (session_t *) s; quicly_stream_t *stream; diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c index 00a70b878dc..819c42661ae 100644 --- a/src/plugins/unittest/session_test.c +++ b/src/plugins/unittest/session_test.c @@ -375,8 +375,7 @@ session_test_endpoint_cfg (vlib_main_t * vm, unformat_input_t * input) SESSION_TEST ((error == 0), "connect should work"); /* wait for stuff to happen */ - while ((connected_session_index == ~0 - || vec_len (tcp_main.wrk_ctx[0].pending_acks)) && ++tries < 100) + while (connected_session_index == ~0 && ++tries < 100) vlib_process_suspend (vm, 100e-3); clib_warning ("waited %.1f seconds for connections", tries / 10.0); SESSION_TEST ((connected_session_index != ~0), "session should exist"); diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 4a268c7d30b..7d8fb468f5a 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -467,7 +467,7 @@ format_ct_connection_id (u8 * s, va_list * args) } static int -ct_custom_tx (void *session) +ct_custom_tx (void *session, u32 max_burst_size) { session_t *s = (session_t *) session; if (session_has_transport (s)) diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 477215e3d34..1c8b7fb4be4 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -118,6 +118,31 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) } } +void +session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) +{ + session_t *s; + + s = session_get (tc->s_index, tc->thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); + if (!(s->flags & SESSION_F_CUSTOM_TX)) + { + s->flags |= SESSION_F_CUSTOM_TX; + if (svm_fifo_set_event (s->tx_fifo)) + { + session_worker_t *wrk; + session_evt_elt_t *elt; + wrk = session_main_get_worker (tc->thread_index); + if (has_prio) + elt = session_evt_alloc_new (wrk); + else + elt = session_evt_alloc_old (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; + } + } +} + static void session_program_transport_close (session_t * s) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index a3f2a01929b..8f7bd6c999c 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -243,6 +243,26 @@ session_evt_add_pending_disconnects (session_worker_t * wrk, session_evt_pending_disconnects_head (wrk)); } +static inline session_evt_elt_t * +session_evt_alloc_new (session_worker_t * wrk) +{ + session_evt_elt_t *elt; + elt = session_evt_elt_alloc (wrk); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + pool_elt_at_index (wrk->event_elts, wrk->new_head)); + return elt; +} + +static inline session_evt_elt_t * +session_evt_alloc_old (session_worker_t * wrk) +{ + session_evt_elt_t *elt; + elt = session_evt_elt_alloc (wrk); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + pool_elt_at_index (wrk->event_elts, wrk->old_head)); + return elt; +} + always_inline u8 session_is_valid (u32 si, u8 thread_index) { @@ -394,6 +414,8 @@ void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args); void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, void *rpc_args); +void session_add_self_custom_tx_evt (transport_connection_t * tc, + u8 has_prio); transport_connection_t *session_get_transport (session_t * s); void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 999776f02bd..1bb5cb2717f 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -618,7 +618,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_elt_t * elt, int *n_tx_packets, u8 peek_data) { - u32 next_index, next0, next1, *to_next, n_left_to_next; + u32 next_index, next0, next1, *to_next, n_left_to_next, max_burst; u32 n_trace, n_bufs_needed = 0, n_left, pbi; session_tx_context_t *ctx = &wrk->ctx; session_main_t *smm = &session_main; @@ -637,6 +637,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, next_index = smm->session_type_to_next[ctx->s->session_type]; next0 = next1 = next_index; + max_burst = VLIB_FRAME_SIZE - *n_tx_packets; tp = session_get_transport_proto (ctx->s); ctx->transport_vft = transport_protocol_get_vft (tp); @@ -649,6 +650,20 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, ctx->transport_vft->flush_data (ctx->tc); } + if (ctx->s->flags & SESSION_F_CUSTOM_TX) + { + u32 n_custom_tx; + ctx->s->flags &= ~SESSION_F_CUSTOM_TX; + n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, max_burst); + *n_tx_packets += n_custom_tx; + max_burst -= n_custom_tx; + if (!max_burst) + { + session_evt_add_old (wrk, elt); + return SESSION_TX_OK; + } + } + ctx->snd_space = transport_connection_snd_space (ctx->tc, wrk->vm->clib_time. last_cpu_time, @@ -664,8 +679,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, svm_fifo_unset_event (ctx->s->tx_fifo); /* Check how much we can pull. */ - session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets, - peek_data); + session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data); if (PREDICT_FALSE (!ctx->max_len_to_snd)) return SESSION_TX_NO_DATA; @@ -823,7 +837,8 @@ session_tx_fifo_dequeue_internal (session_worker_t * wrk, if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)) return 0; svm_fifo_unset_event (s->tx_fifo); - return transport_custom_tx (session_get_transport_proto (s), s); + return transport_custom_tx (session_get_transport_proto (s), s, + VLIB_FRAME_SIZE - *n_tx_packets); } always_inline session_t * diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 25b61166b11..9a5bc768808 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -142,6 +142,7 @@ typedef enum session_flags_ { SESSION_F_RX_EVT = 1, SESSION_F_PROXY = (1 << 1), + SESSION_F_CUSTOM_TX = (1 << 2), } session_flags_t; typedef struct session_ diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 6e2feb02ba4..058a9aee34c 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -52,7 +52,7 @@ typedef struct _transport_proto_vft u32 (*tx_fifo_offset) (transport_connection_t * tc); void (*update_time) (f64 time_now, u8 thread_index); void (*flush_data) (transport_connection_t *tconn); - int (*custom_tx) (void *session); + int (*custom_tx) (void *session, u32 max_burst_size); int (*app_rx_evt) (transport_connection_t *tconn); /* @@ -127,9 +127,9 @@ transport_get_half_open (transport_proto_t tp, u32 conn_index) } static inline int -transport_custom_tx (transport_proto_t tp, void *s) +transport_custom_tx (transport_proto_t tp, void *s, u32 max_burst_size) { - return tp_vfts[tp].custom_tx (s); + return tp_vfts[tp].custom_tx (s, max_burst_size); } static inline int diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 928d1bac257..5a215b658a0 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -1196,8 +1196,6 @@ tcp_update_time (f64 now, u8 thread_index) tcp_set_time_now (wrk); tw_timer_expire_timers_16t_2w_512sl (&wrk->timer_wheel, now); - tcp_do_fastretransmits (wrk); - tcp_send_acks (wrk); tcp_flush_frames_to_output (wrk); } @@ -1228,6 +1226,7 @@ const static transport_proto_vft_t tcp_proto = { .update_time = tcp_update_time, .tx_fifo_offset = tcp_session_tx_fifo_offset, .flush_data = tcp_session_flush_data, + .custom_tx = tcp_session_custom_tx, .format_connection = format_tcp_session, .format_listener = format_tcp_listener_session, .format_half_open = format_tcp_half_open_session, @@ -1476,17 +1475,9 @@ tcp_main_enable (vlib_main_t * vm) for (thread = 0; thread < num_threads; thread++) { - vec_validate (tm->wrk_ctx[thread].pending_fast_rxt, 255); - vec_validate (tm->wrk_ctx[thread].ongoing_fast_rxt, 255); - vec_validate (tm->wrk_ctx[thread].postponed_fast_rxt, 255); vec_validate (tm->wrk_ctx[thread].pending_deq_acked, 255); - vec_validate (tm->wrk_ctx[thread].pending_acks, 255); vec_validate (tm->wrk_ctx[thread].pending_disconnects, 255); - vec_reset_length (tm->wrk_ctx[thread].pending_fast_rxt); - vec_reset_length (tm->wrk_ctx[thread].ongoing_fast_rxt); - vec_reset_length (tm->wrk_ctx[thread].postponed_fast_rxt); vec_reset_length (tm->wrk_ctx[thread].pending_deq_acked); - vec_reset_length (tm->wrk_ctx[thread].pending_acks); vec_reset_length (tm->wrk_ctx[thread].pending_disconnects); tm->wrk_ctx[thread].vm = vlib_mains[thread]; diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index 7ccc06aea9c..8f2665d2a6a 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -464,21 +464,9 @@ typedef struct tcp_worker_ctx_ /** tx frames for ip 4/6 lookup nodes */ vlib_frame_t *ip_lookup_tx_frames[2]; - /** vector of connections needing fast rxt */ - u32 *pending_fast_rxt; - - /** vector of connections now doing fast rxt */ - u32 *ongoing_fast_rxt; - - /** vector of connections that will do fast rxt */ - u32 *postponed_fast_rxt; - /** vector of pending ack dequeues */ u32 *pending_deq_acked; - /** vector of pending acks */ - u32 *pending_acks; - /** vector of pending disconnect notifications */ u32 *pending_disconnects; @@ -700,15 +688,12 @@ void tcp_update_burst_snd_vars (tcp_connection_t * tc); void tcp_update_rto (tcp_connection_t * tc); void tcp_flush_frame_to_output (tcp_worker_ctx_t * wrk, u8 is_ip4); void tcp_flush_frames_to_output (tcp_worker_ctx_t * wrk); -void tcp_program_fastretransmit (tcp_worker_ctx_t * wrk, - tcp_connection_t * tc); -void tcp_do_fastretransmits (tcp_worker_ctx_t * wrk); - -void tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc); -void tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc); -void tcp_send_acks (tcp_worker_ctx_t * wrk); void tcp_send_window_update_ack (tcp_connection_t * tc); +void tcp_program_ack (tcp_connection_t * tc); +void tcp_program_dupack (tcp_connection_t * tc); +void tcp_program_fastretransmit (tcp_connection_t * tc); + /* * Rate estimation */ @@ -961,6 +946,7 @@ tcp_set_time_now (tcp_worker_ctx_t * wrk) u32 tcp_session_push_header (transport_connection_t * tconn, vlib_buffer_t * b); +int tcp_session_custom_tx (void *conn, u32 max_burst_size); void tcp_connection_timers_init (tcp_connection_t * tc); void tcp_connection_timers_reset (tcp_connection_t * tc); diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index 5f3764893c9..4695fbb1161 100755 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -317,7 +317,7 @@ tcp_segment_validate (tcp_worker_ctx_t * wrk, tcp_connection_t * tc0, * SEG.TSval */ else if (!tcp_rst (th0)) { - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp); goto error; } @@ -340,7 +340,7 @@ tcp_segment_validate (tcp_worker_ctx_t * wrk, tcp_connection_t * tc0, } else { - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); TCP_EVT_DBG (TCP_EVT_SYNACK_RCVD, tc0); *error0 = TCP_ERROR_SYN_ACKS_RCVD; } @@ -368,7 +368,7 @@ tcp_segment_validate (tcp_worker_ctx_t * wrk, tcp_connection_t * tc0, /* If not RST, send dup ack */ if (!tcp_rst (th0)) { - tcp_program_dupack (wrk, tc0); + tcp_program_dupack (tc0); TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp); } goto error; @@ -391,7 +391,7 @@ tcp_segment_validate (tcp_worker_ctx_t * wrk, tcp_connection_t * tc0, if (PREDICT_FALSE (tcp_syn (th0))) { /* As per RFC5961 send challenge ack instead of reset */ - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); *error0 = TCP_ERROR_SPURIOUS_SYN; goto error; } @@ -1199,6 +1199,7 @@ tcp_cc_fastrecovery_clear (tcp_connection_t * tc) tcp_fastrecovery_off (tc); tcp_fastrecovery_first_off (tc); + tc->flags &= ~TCP_CONN_FRXT_PENDING; TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3); } @@ -1305,81 +1306,6 @@ tcp_should_fastrecover (tcp_connection_t * tc) || tcp_should_fastrecover_sack (tc)); } -#ifndef CLIB_MARCH_VARIANT -void -tcp_program_fastretransmit (tcp_worker_ctx_t * wrk, tcp_connection_t * tc) -{ - if (!(tc->flags & TCP_CONN_FRXT_PENDING)) - { - vec_add1 (wrk->pending_fast_rxt, tc->c_c_index); - tc->flags |= TCP_CONN_FRXT_PENDING; - } -} - -void -tcp_do_fastretransmits (tcp_worker_ctx_t * wrk) -{ - u32 *ongoing_fast_rxt, burst_bytes, sent_bytes, thread_index; - u32 max_burst_size, burst_size, n_segs = 0, n_segs_now; - tcp_connection_t *tc; - u64 last_cpu_time; - int i; - - if (vec_len (wrk->pending_fast_rxt) == 0 - && vec_len (wrk->postponed_fast_rxt) == 0) - return; - - thread_index = wrk->vm->thread_index; - last_cpu_time = wrk->vm->clib_time.last_cpu_time; - ongoing_fast_rxt = wrk->ongoing_fast_rxt; - vec_append (ongoing_fast_rxt, wrk->postponed_fast_rxt); - vec_append (ongoing_fast_rxt, wrk->pending_fast_rxt); - - _vec_len (wrk->postponed_fast_rxt) = 0; - _vec_len (wrk->pending_fast_rxt) = 0; - - max_burst_size = VLIB_FRAME_SIZE / vec_len (ongoing_fast_rxt); - max_burst_size = clib_max (max_burst_size, 1); - - for (i = 0; i < vec_len (ongoing_fast_rxt); i++) - { - tc = tcp_connection_get (ongoing_fast_rxt[i], thread_index); - if (!tc) - continue; - if (!tcp_in_fastrecovery (tc)) - { - tc->flags &= ~TCP_CONN_FRXT_PENDING; - continue; - } - - if (n_segs >= VLIB_FRAME_SIZE) - { - vec_add1 (wrk->postponed_fast_rxt, ongoing_fast_rxt[i]); - continue; - } - - tc->flags &= ~TCP_CONN_FRXT_PENDING; - burst_size = clib_min (max_burst_size, VLIB_FRAME_SIZE - n_segs); - burst_bytes = transport_connection_tx_pacer_burst (&tc->connection, - last_cpu_time); - burst_size = clib_min (burst_size, burst_bytes / tc->snd_mss); - if (!burst_size) - { - tcp_program_fastretransmit (wrk, tc); - continue; - } - - n_segs_now = tcp_fast_retransmit (wrk, tc, burst_size); - sent_bytes = clib_min (n_segs_now * tc->snd_mss, burst_bytes); - transport_connection_tx_pacer_update_bytes (&tc->connection, - sent_bytes); - n_segs += n_segs_now; - } - _vec_len (ongoing_fast_rxt) = 0; - wrk->ongoing_fast_rxt = ongoing_fast_rxt; -} -#endif /* CLIB_MARCH_VARIANT */ - /** * One function to rule them all ... and in the darkness bind them */ @@ -1393,7 +1319,7 @@ tcp_cc_handle_event (tcp_connection_t * tc, tcp_rate_sample_t * rs, { if (tc->bytes_acked) goto partial_ack; - tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc); + tcp_program_fastretransmit (tc); return; } /* @@ -1449,8 +1375,7 @@ tcp_cc_handle_event (tcp_connection_t * tc, tcp_rate_sample_t * rs, pacer_wnd = clib_max (0.1 * tc->cwnd, 2 * tc->snd_mss); tcp_connection_tx_pacer_reset (tc, pacer_wnd, 0 /* start bucket */ ); - tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), - tc); + tcp_program_fastretransmit (tc); return; } else if (!tc->bytes_acked @@ -1571,7 +1496,7 @@ partial_ack: /* * Since this was a partial ack, try to retransmit some more data */ - tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc); + tcp_program_fastretransmit (tc); } /** @@ -1712,7 +1637,7 @@ tcp_rcv_fin (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, vlib_buffer_t * b, /* Account for the FIN and send ack */ tc->rcv_nxt += 1; - tcp_program_ack (wrk, tc); + tcp_program_ack (tc); /* Enter CLOSE-WAIT and notify session. To avoid lingering * in CLOSE-WAIT, set timer (reuse WAITCLOSE). */ tcp_connection_set_state (tc, TCP_STATE_CLOSE_WAIT); @@ -1976,7 +1901,7 @@ tcp_segment_rcv (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, * retransmissions since we may not have any data to send */ if (seq_leq (vnet_buffer (b)->tcp.seq_end, tc->rcv_nxt)) { - tcp_program_ack (wrk, tc); + tcp_program_ack (tc); error = TCP_ERROR_SEGMENT_OLD; goto done; } @@ -1996,7 +1921,7 @@ tcp_segment_rcv (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, /* RFC2581: Enqueue and send DUPACK for fast retransmit */ error = tcp_session_enqueue_ooo (tc, b, n_data_bytes); - tcp_program_dupack (wrk, tc); + tcp_program_dupack (tc); TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc, vnet_buffer (b)->tcp); goto done; } @@ -2013,7 +1938,7 @@ in_order: goto done; } - tcp_program_ack (wrk, tc); + tcp_program_ack (tc); done: return error; @@ -2591,7 +2516,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node, } else { - tcp_program_ack (wrk, new_tc0); + tcp_program_ack (new_tc0); } drop: @@ -2921,7 +2846,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, if (!is_fin0) goto drop; - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME); goto drop; @@ -2961,7 +2886,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, case TCP_STATE_ESTABLISHED: /* Account for the FIN and send ack */ tc0->rcv_nxt += 1; - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); tcp_connection_set_state (tc0, TCP_STATE_CLOSE_WAIT); tcp_program_disconnect (wrk, tc0); tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME); @@ -2995,7 +2920,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, else { tcp_connection_set_state (tc0, TCP_STATE_CLOSING); - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); /* Wait for ACK for our FIN but not forever */ tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME); } @@ -3006,7 +2931,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, tcp_connection_set_state (tc0, TCP_STATE_TIME_WAIT); tcp_connection_timers_reset (tc0); tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME); - tcp_program_ack (wrk, tc0); + tcp_program_ack (tc0); session_transport_closed_notify (&tc0->connection); break; case TCP_STATE_TIME_WAIT: diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c index 7b0303f3033..79cc95e9f0b 100644 --- a/src/vnet/tcp/tcp_output.c +++ b/src/vnet/tcp/tcp_output.c @@ -1188,21 +1188,21 @@ tcp_send_ack (tcp_connection_t * tc) } void -tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc) +tcp_program_ack (tcp_connection_t * tc) { if (!(tc->flags & TCP_CONN_SNDACK)) { - vec_add1 (wrk->pending_acks, tc->c_c_index); + session_add_self_custom_tx_evt (&tc->connection, 1); tc->flags |= TCP_CONN_SNDACK; } } void -tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc) +tcp_program_dupack (tcp_connection_t * tc) { if (!(tc->flags & TCP_CONN_SNDACK)) { - vec_add1 (wrk->pending_acks, tc->c_c_index); + session_add_self_custom_tx_evt (&tc->connection, 1); tc->flags |= TCP_CONN_SNDACK; } if (tc->pending_dupacks < 255) @@ -1210,51 +1210,13 @@ tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc) } void -tcp_send_acks (tcp_worker_ctx_t * wrk) +tcp_program_fastretransmit (tcp_connection_t * tc) { - u32 thread_index, *pending_acks; - tcp_connection_t *tc; - int i, j, n_acks; - - if (!vec_len (wrk->pending_acks)) - return; - - thread_index = wrk->vm->thread_index; - pending_acks = wrk->pending_acks; - for (i = 0; i < vec_len (pending_acks); i++) + if (!(tc->flags & TCP_CONN_FRXT_PENDING)) { - tc = tcp_connection_get (pending_acks[i], thread_index); - tc->flags &= ~TCP_CONN_SNDACK; - if (!tc->pending_dupacks) - { - tcp_send_ack (tc); - continue; - } - - /* If we're supposed to send dupacks but have no ooo data - * send only one ack */ - if (!vec_len (tc->snd_sacks)) - { - tcp_send_ack (tc); - continue; - } - - /* Start with first sack block */ - tc->snd_sack_pos = 0; - - /* Generate enough dupacks to cover all sack blocks. Do not generate - * more sacks than the number of packets received. But do generate at - * least 3, i.e., the number needed to signal congestion, if needed. */ - n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS; - n_acks = clib_min (n_acks, tc->pending_dupacks); - n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3)); - for (j = 0; j < n_acks; j++) - tcp_send_ack (tc); - - tc->pending_dupacks = 0; - tc->snd_sack_pos = 0; + session_add_self_custom_tx_evt (&tc->connection, 0); + tc->flags |= TCP_CONN_FRXT_PENDING; } - _vec_len (wrk->pending_acks) = 0; } /** @@ -1281,7 +1243,6 @@ tcp_timer_delack_handler (u32 index) void tcp_send_window_update_ack (tcp_connection_t * tc) { - tcp_worker_ctx_t *wrk = tcp_get_worker (tc->c_thread_index); u32 win; if (tcp_zero_rwnd_sent (tc)) @@ -1290,7 +1251,7 @@ tcp_send_window_update_ack (tcp_connection_t * tc) if (win > 0) { tcp_zero_rwnd_sent_off (tc); - tcp_program_ack (wrk, tc); + tcp_program_ack (tc); } } } @@ -1853,7 +1814,7 @@ tcp_fast_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, snd_space = tcp_available_cc_snd_space (tc); if (snd_space < tc->snd_mss) { - tcp_program_fastretransmit (wrk, tc); + tcp_program_fastretransmit (tc); return 0; } @@ -1877,7 +1838,7 @@ tcp_fast_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, snd_space / tc->snd_mss); n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size); if (max_deq > n_segs_now * tc->snd_mss) - tcp_program_fastretransmit (wrk, tc); + tcp_program_fastretransmit (tc); n_segs += n_segs_now; goto done; } @@ -1929,7 +1890,7 @@ tcp_fast_retransmit_sack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, } if (hole) - tcp_program_fastretransmit (wrk, tc); + tcp_program_fastretransmit (tc); done: return n_segs; @@ -1990,7 +1951,7 @@ send_unsent: burst_size = clib_min (burst_size - n_segs, snd_space / tc->snd_mss); n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size); if (max_deq > n_segs_now * tc->snd_mss) - tcp_program_fastretransmit (wrk, tc); + tcp_program_fastretransmit (tc); n_segs += n_segs_now; } @@ -2011,6 +1972,108 @@ tcp_fast_retransmit (tcp_worker_ctx_t * wrk, tcp_connection_t * tc, else return tcp_fast_retransmit_no_sack (wrk, tc, burst_size); } + +static int +tcp_send_acks (tcp_connection_t * tc, u32 max_burst_size) +{ + int j, n_acks; + + if (!tc->pending_dupacks) + { + tcp_send_ack (tc); + return 1; + } + + /* If we're supposed to send dupacks but have no ooo data + * send only one ack */ + if (!vec_len (tc->snd_sacks)) + { + tcp_send_ack (tc); + return 1; + } + + /* Start with first sack block */ + tc->snd_sack_pos = 0; + + /* Generate enough dupacks to cover all sack blocks. Do not generate + * more sacks than the number of packets received. But do generate at + * least 3, i.e., the number needed to signal congestion, if needed. */ + n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS; + n_acks = clib_min (n_acks, tc->pending_dupacks); + n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3)); + for (j = 0; j < clib_min (n_acks, max_burst_size); j++) + tcp_send_ack (tc); + + if (n_acks < max_burst_size) + { + tc->pending_dupacks = 0; + tc->snd_sack_pos = 0; + return n_acks; + } + else + { + TCP_DBG ("constrained by burst size"); + tc->pending_dupacks = n_acks - max_burst_size; + tcp_program_dupack (tc); + return max_burst_size; + } +} + +static int +tcp_do_fastretransmit (tcp_connection_t * tc, u32 max_burst_size) +{ + u32 n_segs = 0, burst_size, sent_bytes, burst_bytes; + tcp_worker_ctx_t *wrk; + + wrk = tcp_get_worker (tc->c_thread_index); + burst_bytes = transport_connection_tx_pacer_burst (&tc->connection, + wrk->vm-> + clib_time.last_cpu_time); + burst_size = clib_min (max_burst_size, burst_bytes / tc->snd_mss); + if (!burst_size) + { + tcp_program_fastretransmit (tc); + return 0; + } + + n_segs = tcp_fast_retransmit (wrk, tc, burst_size); + sent_bytes = clib_min (n_segs * tc->snd_mss, burst_bytes); + transport_connection_tx_pacer_update_bytes (&tc->connection, sent_bytes); + return n_segs; +} + +int +tcp_session_custom_tx (void *conn, u32 max_burst_size) +{ + tcp_connection_t *tc = (tcp_connection_t *) conn; + u32 n_segs = 0; + + if (tcp_in_fastrecovery (tc) && (tc->flags & TCP_CONN_FRXT_PENDING)) + { + tc->flags &= ~TCP_CONN_FRXT_PENDING; + n_segs = tcp_do_fastretransmit (tc, max_burst_size); + max_burst_size -= n_segs; + } + + if (!(tc->flags & TCP_CONN_SNDACK)) + return n_segs; + + tc->flags &= ~TCP_CONN_SNDACK; + + /* We have retransmitted packets and no dupack */ + if (n_segs && !tc->pending_dupacks) + return n_segs; + + if (!max_burst_size) + { + tcp_program_ack (tc); + return max_burst_size; + } + + n_segs += tcp_send_acks (tc, max_burst_size); + + return n_segs; +} #endif /* CLIB_MARCH_VARIANT */ static void diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c index 4a9ec4e7eb6..f7780feaa85 100644 --- a/src/vnet/tls/tls.c +++ b/src/vnet/tls/tls.c @@ -660,7 +660,7 @@ tls_listener_get (u32 listener_index) } int -tls_custom_tx_callback (void *session) +tls_custom_tx_callback (void *session, u32 max_burst_size) { session_t *app_session = (session_t *) session; tls_ctx_t *ctx; -- cgit 1.2.3-korg