aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>2019-12-03 15:08:27 +0100
committerDave Wallace <dwallacelf@gmail.com>2019-12-04 18:08:51 +0000
commitc9cb8f6d9c6e1f51dfdf3e9f4ba0c9f38d1ca6d6 (patch)
tree88a0ebd269caeceb3dc1a7df4867d527455269ef /src
parentd9577b4aa3a325e46c09796835d22122bec9e3d0 (diff)
quic: fix stream tx_fifo race condition
Type: fix There is a race condition in when receiving TX from a client application : As egress_emit writes as much data as possible to the stream, if during egress_emit the app writes to the fifo, the data will be directly passed to quicly. Then TX callback happens and triggers a scheduler update telling quilcy the stream has data to send. When the next egress_emit is called and no more data has come, we have nothing to write, we return len = 0 to quicly which breaks an assert if a loss happens later on. Change-Id: I47e00a14dfc9068b5dac7b5c090a89124aea004f Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/plugins/quic/quic.c75
-rw-r--r--src/plugins/quic/quic.h13
2 files changed, 61 insertions, 27 deletions
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index d1f188443d6..f51881e508a 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -169,7 +169,7 @@ quic_ctx_free (quic_ctx_t * ctx)
{
QUIC_DBG (2, "Free ctx %u %x", ctx->c_thread_index, ctx->c_c_index);
u32 thread_index = ctx->c_thread_index;
- ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
+ QUIC_ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
if (CLIB_DEBUG)
clib_memset (ctx, 0xfb, sizeof (*ctx));
pool_put (quic_main.ctx_pool[thread_index], ctx);
@@ -295,14 +295,14 @@ quic_ack_rx_data (session_t * stream_session)
sctx = quic_ctx_get (stream_session->connection_index,
stream_session->thread_index);
- ASSERT (quic_ctx_is_stream (sctx));
+ QUIC_ASSERT (quic_ctx_is_stream (sctx));
stream = sctx->stream;
stream_data = (quic_stream_data_t *) stream->data;
f = stream_session->rx_fifo;
max_deq = svm_fifo_max_dequeue (f);
- ASSERT (stream_data->app_rx_data_len >= max_deq);
+ QUIC_ASSERT (stream_data->app_rx_data_len >= max_deq);
quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
stream_data->app_rx_data_len = max_deq;
@@ -330,7 +330,7 @@ quic_connection_delete (quic_ctx_t * ctx)
QUIC_DBG (2, "Deleting connection %u", ctx->c_c_index);
- ASSERT (!quic_ctx_is_stream (ctx));
+ QUIC_ASSERT (!quic_ctx_is_stream (ctx));
quic_stop_ctx_timer (ctx);
/* Delete the connection from the connection map */
@@ -428,14 +428,14 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
/* Read dest address from quicly-provided sockaddr */
if (hdr.is_ip4)
{
- ASSERT (packet->dest.sa.sa_family == AF_INET);
+ QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET);
struct sockaddr_in *sa4 = (struct sockaddr_in *) &packet->dest.sa;
hdr.rmt_port = sa4->sin_port;
hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr;
}
else
{
- ASSERT (packet->dest.sa.sa_family == AF_INET6);
+ QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET6);
struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &packet->dest.sa;
hdr.rmt_port = sa6->sin6_port;
clib_memcpy (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16);
@@ -474,7 +474,7 @@ quic_send_packets (quic_ctx_t * ctx)
if (quic_ctx_is_stream (ctx))
ctx = quic_ctx_get (ctx->quic_connection_ctx_id, ctx->c_thread_index);
- ASSERT (!quic_ctx_is_stream (ctx));
+ QUIC_ASSERT (!quic_ctx_is_stream (ctx));
udp_session = session_get_from_handle_if_valid (ctx->udp_session_handle);
if (!udp_session)
@@ -590,13 +590,12 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
- u32 max_enq;
+ u32 max_enq, rlen, rv;
quic_ctx_t *sctx;
session_t *stream_session;
app_worker_t *app_wrk;
svm_fifo_t *f;
quic_stream_data_t *stream_data;
- int rlen, rv;
stream_data = (quic_stream_data_t *) stream->data;
sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
@@ -640,7 +639,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
stream_session->app_wrk_index,
stream_session->thread_index, f, len, rlen, off, max_enq);
stream_data->app_rx_data_len += rlen;
- ASSERT (rlen >= len);
+ QUIC_ASSERT (rlen >= len);
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
{
@@ -656,7 +655,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
rlen = svm_fifo_enqueue_with_offset (f,
off - stream_data->app_rx_data_len,
len, (u8 *) src);
- ASSERT (rlen == 0);
+ QUIC_ASSERT (rlen == 0);
}
return 0;
}
@@ -664,16 +663,22 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
void
quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
{
+ quic_stream_data_t *stream_data;
session_t *stream_session;
svm_fifo_t *f;
- int rv;
+ u32 rv;
+ stream_data = (quic_stream_data_t *) stream->data;
stream_session = get_stream_session_from_stream (stream);
f = stream_session->tx_fifo;
+ QUIC_ASSERT (stream_data->app_tx_data_len >= delta);
+ stream_data->app_tx_data_len -= delta;
rv = svm_fifo_dequeue_drop (f, delta);
- ASSERT (rv == delta);
- quicly_stream_sync_sendbuf (stream, 0);
+ QUIC_ASSERT (rv == delta);
+
+ rv = quicly_stream_sync_sendbuf (stream, 0);
+ QUIC_ASSERT (!rv);
}
int
@@ -681,16 +686,18 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
size_t * len, int *wrote_all)
{
u32 deq_max, first_deq, max_rd_chunk, rem_offset;
+ quic_stream_data_t *stream_data;
session_t *stream_session;
svm_fifo_t *f;
+ stream_data = (quic_stream_data_t *) stream->data;
stream_session = get_stream_session_from_stream (stream);
f = stream_session->tx_fifo;
QUIC_DBG (3, "Emitting %u, offset %u", *len, off);
deq_max = svm_fifo_max_dequeue_cons (f);
- ASSERT (off <= deq_max);
+ QUIC_ASSERT (off <= deq_max);
if (off + *len < deq_max)
{
*wrote_all = 0;
@@ -699,8 +706,11 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
{
*wrote_all = 1;
*len = deq_max - off;
- QUIC_DBG (3, "Wrote ALL, %u", *len);
}
+ QUIC_ASSERT (*len > 0);
+
+ if (off + *len > stream_data->app_tx_data_len)
+ stream_data->app_tx_data_len = off + *len;
/* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
max_rd_chunk = svm_fifo_max_read_chunk (f);
@@ -780,6 +790,7 @@ quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
stream_data->ctx_id = sctx_id;
stream_data->thread_index = sctx->c_thread_index;
stream_data->app_rx_data_len = 0;
+ stream_data->app_tx_data_len = 0;
sctx->c_s_index = stream_session->session_index;
stream_session->session_state = SESSION_STATE_CREATED;
@@ -1024,6 +1035,7 @@ quic_connect_stream (session_t * quic_session, u32 opaque)
stream_data->ctx_id = sctx->c_c_index;
stream_data->thread_index = sctx->c_thread_index;
stream_data->app_rx_data_len = 0;
+ stream_data->app_tx_data_len = 0;
stream_session->session_state = SESSION_STATE_READY;
/* For now we only reset streams. Cleanup will be triggered by timers */
@@ -1235,7 +1247,7 @@ quic_stop_listen (u32 lctx_index)
QUIC_DBG (2, "Called quic_stop_listen");
quic_ctx_t *lctx;
lctx = quic_ctx_get (lctx_index, 0);
- ASSERT (quic_ctx_is_listener (lctx));
+ QUIC_ASSERT (quic_ctx_is_listener (lctx));
vnet_unlisten_args_t a = {
.handle = lctx->udp_session_handle,
.app_index = quic_main.app_index,
@@ -1470,7 +1482,7 @@ quic_transfer_connection (u32 ctx_index, u32 dest_thread)
QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread);
temp_ctx = clib_mem_alloc (sizeof (quic_ctx_t));
- ASSERT (temp_ctx);
+ QUIC_ASSERT (temp_ctx != NULL);
ctx = quic_ctx_get (ctx_index, thread_index);
clib_memcpy (temp_ctx, ctx, sizeof (quic_ctx_t));
@@ -1590,9 +1602,9 @@ quic_udp_session_migrate_callback (session_t * s, session_handle_t new_sh)
quic_ctx_t *ctx;
QUIC_ERR ("Session %x migrated to %lx", s->session_index, new_sh);
- ASSERT (vlib_get_thread_index () == s->thread_index);
+ QUIC_ASSERT (vlib_get_thread_index () == s->thread_index);
ctx = quic_ctx_get (s->opaque, s->thread_index);
- ASSERT (ctx->udp_session_handle == session_handle (s));
+ QUIC_ASSERT (ctx->udp_session_handle == session_handle (s));
ctx->udp_session_handle = new_sh;
#if QUIC_DEBUG >= 1
@@ -1674,8 +1686,10 @@ static int
quic_custom_tx_callback (void *s, u32 max_burst_size)
{
session_t *stream_session = (session_t *) s;
+ quic_stream_data_t *stream_data;
quicly_stream_t *stream;
quic_ctx_t *ctx;
+ u32 max_deq;
int rv;
if (PREDICT_FALSE
@@ -1690,9 +1704,6 @@ quic_custom_tx_callback (void *s, u32 max_burst_size)
QUIC_DBG (3, "Stream TX event");
quic_ack_rx_data (stream_session);
- if (!svm_fifo_max_dequeue (stream_session->tx_fifo))
- return 0;
-
stream = ctx->stream;
if (!quicly_sendstate_is_open (&stream->sendstate))
{
@@ -1700,8 +1711,18 @@ quic_custom_tx_callback (void *s, u32 max_burst_size)
return -1;
}
- if ((rv = quicly_stream_sync_sendbuf (stream, 1)) != 0)
- return rv;
+ stream_data = (quic_stream_data_t *) stream->data;
+ max_deq = svm_fifo_max_dequeue (stream_session->tx_fifo);
+ QUIC_ASSERT (max_deq >= stream_data->app_tx_data_len);
+ if (max_deq == stream_data->app_tx_data_len)
+ {
+ QUIC_DBG (3, "TX but no data %d / %d", max_deq,
+ stream_data->app_tx_data_len);
+ return 0;
+ }
+ stream_data->app_tx_data_len = max_deq;
+ rv = quicly_stream_sync_sendbuf (stream, 1);
+ QUIC_ASSERT (!rv);
tx_end:
quic_send_packets (ctx);
@@ -1878,8 +1899,8 @@ quic_process_one_rx_packet (u64 udp_session_handle, svm_fifo_t * f,
ret = svm_fifo_peek (f, fifo_offset,
SESSION_CONN_HDR_LEN, (u8 *) & pctx->ph);
- ASSERT (ret == SESSION_CONN_HDR_LEN);
- ASSERT (pctx->ph.data_offset == 0);
+ QUIC_ASSERT (ret == SESSION_CONN_HDR_LEN);
+ QUIC_ASSERT (pctx->ph.data_offset == 0);
full_len = pctx->ph.data_length + SESSION_CONN_HDR_LEN;
if (full_len > cur_deq)
{
diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h
index dfcb0e6f17a..9433a895a8b 100644
--- a/src/plugins/quic/quic.h
+++ b/src/plugins/quic/quic.h
@@ -69,11 +69,23 @@
#define QUIC_DBG(_lvl, _fmt, _args...)
#endif
+#if CLIB_ASSERT_ENABLE
+#define QUIC_ASSERT(truth) ASSERT (truth)
+#else
+#define QUIC_ASSERT(truth) \
+ do { \
+ if (PREDICT_FALSE (! (truth))) \
+ QUIC_ERR ("ASSERT(%s) failed", # truth); \
+ } while (0)
+#endif
+
#define QUIC_ERR(_fmt, _args...) \
do { \
clib_warning ("QUIC-ERR: " _fmt, ##_args); \
} while (0)
+
+
extern vlib_node_registration_t quic_input_node;
typedef enum
@@ -167,6 +179,7 @@ typedef struct quic_stream_data_
u32 ctx_id;
u32 thread_index;
u32 app_rx_data_len; /**< bytes received, to be read by external app */
+ u32 app_tx_data_len; /**< bytes sent */
} quic_stream_data_t;
typedef struct quic_worker_ctx_