summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/vcl/vppcom.c17
-rw-r--r--src/vnet/session/session.h8
-rw-r--r--src/vnet/session/session_node.c15
-rw-r--r--src/vnet/session/transport_interface.h1
-rw-r--r--src/vnet/tcp/tcp.c11
-rw-r--r--src/vnet/tcp/tcp.h3
-rw-r--r--src/vnet/tcp/tcp_input.c6
-rw-r--r--src/vnet/tcp/tcp_output.c7
8 files changed, 61 insertions, 7 deletions
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 0eaab6cbd6b..9bb6a9880d1 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -1665,8 +1665,9 @@ vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
return (e->event_type == SESSION_IO_EVT_CT_RX);
}
-int
-vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+static inline int
+vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
+ u8 is_flush)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
int rv, n_write, is_nonblocking;
@@ -1733,6 +1734,9 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+ if (is_flush && !vcl_session_is_ct (s))
+ et = SESSION_IO_EVT_TX_FLUSH;
+
if (s->is_dgram)
n_write = app_send_dgram_raw (tx_fifo, &s->transport,
s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
@@ -1748,6 +1752,13 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
return n_write;
}
+int
+vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
+{
+ return vppcom_session_write_inline (session_handle, buf, n,
+ 0 /* is_flush */ );
+}
+
static vcl_session_t *
vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
{
@@ -3345,7 +3356,7 @@ vppcom_session_sendto (uint32_t session_handle, void *buffer,
getpid (), flags, flags);
}
- return (vppcom_session_write (session_handle, buffer, buflen));
+ return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
}
int
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index d5f040edc59..be2490fbe95 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -36,6 +36,7 @@ typedef enum
SESSION_IO_EVT_CT_RX,
FIFO_EVENT_APP_TX,
SESSION_IO_EVT_CT_TX,
+ SESSION_IO_EVT_TX_FLUSH,
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_BUILTIN_TX,
@@ -519,6 +520,13 @@ transport_max_rx_enqueue (transport_connection_t * tc)
}
always_inline u32
+transport_max_tx_dequeue (transport_connection_t * tc)
+{
+ stream_session_t *s = session_get (tc->s_index, tc->thread_index);
+ return svm_fifo_max_dequeue (s->server_tx_fifo);
+}
+
+always_inline u32
transport_rx_fifo_size (transport_connection_t * tc)
{
stream_session_t *s = session_get (tc->s_index, tc->thread_index);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 14576685a76..f4e0eaa993e 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -572,9 +572,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
ctx->transport_vft = transport_protocol_get_vft (tp);
ctx->tc = session_tx_get_transport (ctx, peek_data);
ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
- ctx->snd_space =
- transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time,
- ctx->snd_mss);
+
+ if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
+ {
+ if (ctx->transport_vft->flush_data)
+ ctx->transport_vft->flush_data (ctx->tc);
+ }
+
+ ctx->snd_space = transport_connection_snd_space (ctx->tc,
+ vm->
+ clib_time.last_cpu_time,
+ ctx->snd_mss);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
vec_add1 (wrk->pending_event_vector, *e);
@@ -828,6 +836,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
e = &fifo_events[i];
switch (e->event_type)
{
+ case SESSION_IO_EVT_TX_FLUSH:
case FIFO_EVENT_APP_TX:
/* Don't try to send more that one frame per dispatch cycle */
if (n_tx_packets == VLIB_FRAME_SIZE)
diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h
index 3bfed415874..10579c45c64 100644
--- a/src/vnet/session/transport_interface.h
+++ b/src/vnet/session/transport_interface.h
@@ -61,6 +61,7 @@ typedef struct _transport_proto_vft
u32 (*send_space) (transport_connection_t * tc);
u32 (*tx_fifo_offset) (transport_connection_t * tc);
void (*update_time) (f64 time_now, u8 thread_index);
+ void (*flush_data) (transport_connection_t *tconn);
/*
* Connection retrieval
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index f703d634b54..6d6a880eda8 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -1123,6 +1123,16 @@ tcp_session_push_header (transport_connection_t * tconn, vlib_buffer_t * b)
return tcp_push_header (tc, b);
}
+static void
+tcp_session_flush_data (transport_connection_t * tconn)
+{
+ tcp_connection_t *tc = (tcp_connection_t *) tconn;
+ if (tc->flags & TCP_CONN_PSH_PENDING)
+ return;
+ tc->flags |= TCP_CONN_PSH_PENDING;
+ tc->psh_seq = tc->snd_una_max + transport_max_tx_dequeue (tconn) - 1;
+}
+
/* *INDENT-OFF* */
const static transport_proto_vft_t tcp_proto = {
.enable = vnet_tcp_enable_disable,
@@ -1139,6 +1149,7 @@ const static transport_proto_vft_t tcp_proto = {
.send_space = tcp_session_send_space,
.update_time = tcp_update_time,
.tx_fifo_offset = tcp_session_tx_fifo_offset,
+ .flush_data = tcp_session_flush_data,
.format_connection = format_tcp_session,
.format_listener = format_tcp_listener_session,
.format_half_open = format_tcp_half_open_session,
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 5b235b65d74..46b03ac14b4 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -124,6 +124,7 @@ extern timer_expiration_handler tcp_timer_retransmit_syn_handler;
_(FRXT_PENDING, "Fast-retransmit pending") \
_(FRXT_FIRST, "Fast-retransmit first again") \
_(DEQ_PENDING, "Pending dequeue acked") \
+ _(PSH_PENDING, "Pending psh packet") \
typedef enum _tcp_connection_flag_bits
{
@@ -334,6 +335,8 @@ typedef struct _tcp_connection
u32 last_fib_check; /**< Last time we checked fib route for peer */
u32 sw_if_index; /**< Interface for the connection */
u32 tx_fifo_size; /**< Tx fifo size. Used to constrain cwnd */
+
+ u32 psh_seq; /**< Add psh header for seg that includes this */
} tcp_connection_t;
/* *INDENT-OFF* */
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index dff18029155..f04fa5d8901 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -543,6 +543,12 @@ tcp_handle_postponed_dequeues (tcp_worker_ctx_t * wrk)
tc->burst_acked = 0;
tcp_validate_txf_size (tc, tc->snd_una_max - tc->snd_una);
+ if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+ {
+ if (seq_leq (tc->psh_seq, tc->snd_una))
+ tc->flags &= ~TCP_CONN_PSH_PENDING;
+ }
+
/* If everything has been acked, stop retransmit timer
* otherwise update. */
tcp_retransmit_timer_update (tc);
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 74fc15fe6cc..7cee34995e3 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -1169,7 +1169,12 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b,
advertise_wnd = tcp_window_to_advertise (tc, next_state);
flags = tcp_make_state_flags (tc, next_state);
-
+ if (PREDICT_FALSE (tc->flags & TCP_CONN_PSH_PENDING))
+ {
+ if (seq_geq (tc->psh_seq, tc->snd_nxt)
+ && seq_lt (tc->psh_seq, tc->snd_nxt + data_len))
+ flags |= TCP_FLAG_PSH;
+ }
th = vlib_buffer_push_tcp (b, tc->c_lcl_port, tc->c_rmt_port, tc->snd_nxt,
tc->rcv_nxt, tcp_hdr_opts_len, flags,
advertise_wnd);