From c9cb8f6d9c6e1f51dfdf3e9f4ba0c9f38d1ca6d6 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Tue, 3 Dec 2019 15:08:27 +0100 Subject: quic: fix stream tx_fifo race condition Type: fix There is a race condition in when receiving TX from a client application : As egress_emit writes as much data as possible to the stream, if during egress_emit the app writes to the fifo, the data will be directly passed to quicly. Then TX callback happens and triggers a scheduler update telling quilcy the stream has data to send. When the next egress_emit is called and no more data has come, we have nothing to write, we return len = 0 to quicly which breaks an assert if a loss happens later on. Change-Id: I47e00a14dfc9068b5dac7b5c090a89124aea004f Signed-off-by: Nathan Skrzypczak --- src/plugins/quic/quic.c | 75 +++++++++++++++++++++++++++++++------------------ src/plugins/quic/quic.h | 13 +++++++++ 2 files changed, 61 insertions(+), 27 deletions(-) (limited to 'src/plugins/quic') diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index d1f188443d6..f51881e508a 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -169,7 +169,7 @@ quic_ctx_free (quic_ctx_t * ctx) { QUIC_DBG (2, "Free ctx %u %x", ctx->c_thread_index, ctx->c_c_index); u32 thread_index = ctx->c_thread_index; - ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID); + QUIC_ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID); if (CLIB_DEBUG) clib_memset (ctx, 0xfb, sizeof (*ctx)); pool_put (quic_main.ctx_pool[thread_index], ctx); @@ -295,14 +295,14 @@ quic_ack_rx_data (session_t * stream_session) sctx = quic_ctx_get (stream_session->connection_index, stream_session->thread_index); - ASSERT (quic_ctx_is_stream (sctx)); + QUIC_ASSERT (quic_ctx_is_stream (sctx)); stream = sctx->stream; stream_data = (quic_stream_data_t *) stream->data; f = stream_session->rx_fifo; max_deq = svm_fifo_max_dequeue (f); - ASSERT (stream_data->app_rx_data_len >= max_deq); + QUIC_ASSERT (stream_data->app_rx_data_len >= max_deq); quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq); QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq); stream_data->app_rx_data_len = max_deq; @@ -330,7 +330,7 @@ quic_connection_delete (quic_ctx_t * ctx) QUIC_DBG (2, "Deleting connection %u", ctx->c_c_index); - ASSERT (!quic_ctx_is_stream (ctx)); + QUIC_ASSERT (!quic_ctx_is_stream (ctx)); quic_stop_ctx_timer (ctx); /* Delete the connection from the connection map */ @@ -428,14 +428,14 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet) /* Read dest address from quicly-provided sockaddr */ if (hdr.is_ip4) { - ASSERT (packet->dest.sa.sa_family == AF_INET); + QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET); struct sockaddr_in *sa4 = (struct sockaddr_in *) &packet->dest.sa; hdr.rmt_port = sa4->sin_port; hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr; } else { - ASSERT (packet->dest.sa.sa_family == AF_INET6); + QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET6); struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &packet->dest.sa; hdr.rmt_port = sa6->sin6_port; clib_memcpy (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16); @@ -474,7 +474,7 @@ quic_send_packets (quic_ctx_t * ctx) if (quic_ctx_is_stream (ctx)) ctx = quic_ctx_get (ctx->quic_connection_ctx_id, ctx->c_thread_index); - ASSERT (!quic_ctx_is_stream (ctx)); + QUIC_ASSERT (!quic_ctx_is_stream (ctx)); udp_session = session_get_from_handle_if_valid (ctx->udp_session_handle); if (!udp_session) @@ -590,13 +590,12 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, size_t len) { QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off); - u32 max_enq; + u32 max_enq, rlen, rv; quic_ctx_t *sctx; session_t *stream_session; app_worker_t *app_wrk; svm_fifo_t *f; quic_stream_data_t *stream_data; - int rlen, rv; stream_data = (quic_stream_data_t *) stream->data; sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index); @@ -640,7 +639,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, stream_session->app_wrk_index, stream_session->thread_index, f, len, rlen, off, max_enq); stream_data->app_rx_data_len += rlen; - ASSERT (rlen >= len); + QUIC_ASSERT (rlen >= len); app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) { @@ -656,7 +655,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, rlen = svm_fifo_enqueue_with_offset (f, off - stream_data->app_rx_data_len, len, (u8 *) src); - ASSERT (rlen == 0); + QUIC_ASSERT (rlen == 0); } return 0; } @@ -664,16 +663,22 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, void quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta) { + quic_stream_data_t *stream_data; session_t *stream_session; svm_fifo_t *f; - int rv; + u32 rv; + stream_data = (quic_stream_data_t *) stream->data; stream_session = get_stream_session_from_stream (stream); f = stream_session->tx_fifo; + QUIC_ASSERT (stream_data->app_tx_data_len >= delta); + stream_data->app_tx_data_len -= delta; rv = svm_fifo_dequeue_drop (f, delta); - ASSERT (rv == delta); - quicly_stream_sync_sendbuf (stream, 0); + QUIC_ASSERT (rv == delta); + + rv = quicly_stream_sync_sendbuf (stream, 0); + QUIC_ASSERT (!rv); } int @@ -681,16 +686,18 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst, size_t * len, int *wrote_all) { u32 deq_max, first_deq, max_rd_chunk, rem_offset; + quic_stream_data_t *stream_data; session_t *stream_session; svm_fifo_t *f; + stream_data = (quic_stream_data_t *) stream->data; stream_session = get_stream_session_from_stream (stream); f = stream_session->tx_fifo; QUIC_DBG (3, "Emitting %u, offset %u", *len, off); deq_max = svm_fifo_max_dequeue_cons (f); - ASSERT (off <= deq_max); + QUIC_ASSERT (off <= deq_max); if (off + *len < deq_max) { *wrote_all = 0; @@ -699,8 +706,11 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst, { *wrote_all = 1; *len = deq_max - off; - QUIC_DBG (3, "Wrote ALL, %u", *len); } + QUIC_ASSERT (*len > 0); + + if (off + *len > stream_data->app_tx_data_len) + stream_data->app_tx_data_len = off + *len; /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */ max_rd_chunk = svm_fifo_max_read_chunk (f); @@ -780,6 +790,7 @@ quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream) stream_data->ctx_id = sctx_id; stream_data->thread_index = sctx->c_thread_index; stream_data->app_rx_data_len = 0; + stream_data->app_tx_data_len = 0; sctx->c_s_index = stream_session->session_index; stream_session->session_state = SESSION_STATE_CREATED; @@ -1024,6 +1035,7 @@ quic_connect_stream (session_t * quic_session, u32 opaque) stream_data->ctx_id = sctx->c_c_index; stream_data->thread_index = sctx->c_thread_index; stream_data->app_rx_data_len = 0; + stream_data->app_tx_data_len = 0; stream_session->session_state = SESSION_STATE_READY; /* For now we only reset streams. Cleanup will be triggered by timers */ @@ -1235,7 +1247,7 @@ quic_stop_listen (u32 lctx_index) QUIC_DBG (2, "Called quic_stop_listen"); quic_ctx_t *lctx; lctx = quic_ctx_get (lctx_index, 0); - ASSERT (quic_ctx_is_listener (lctx)); + QUIC_ASSERT (quic_ctx_is_listener (lctx)); vnet_unlisten_args_t a = { .handle = lctx->udp_session_handle, .app_index = quic_main.app_index, @@ -1470,7 +1482,7 @@ quic_transfer_connection (u32 ctx_index, u32 dest_thread) QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread); temp_ctx = clib_mem_alloc (sizeof (quic_ctx_t)); - ASSERT (temp_ctx); + QUIC_ASSERT (temp_ctx != NULL); ctx = quic_ctx_get (ctx_index, thread_index); clib_memcpy (temp_ctx, ctx, sizeof (quic_ctx_t)); @@ -1590,9 +1602,9 @@ quic_udp_session_migrate_callback (session_t * s, session_handle_t new_sh) quic_ctx_t *ctx; QUIC_ERR ("Session %x migrated to %lx", s->session_index, new_sh); - ASSERT (vlib_get_thread_index () == s->thread_index); + QUIC_ASSERT (vlib_get_thread_index () == s->thread_index); ctx = quic_ctx_get (s->opaque, s->thread_index); - ASSERT (ctx->udp_session_handle == session_handle (s)); + QUIC_ASSERT (ctx->udp_session_handle == session_handle (s)); ctx->udp_session_handle = new_sh; #if QUIC_DEBUG >= 1 @@ -1674,8 +1686,10 @@ static int quic_custom_tx_callback (void *s, u32 max_burst_size) { session_t *stream_session = (session_t *) s; + quic_stream_data_t *stream_data; quicly_stream_t *stream; quic_ctx_t *ctx; + u32 max_deq; int rv; if (PREDICT_FALSE @@ -1690,9 +1704,6 @@ quic_custom_tx_callback (void *s, u32 max_burst_size) QUIC_DBG (3, "Stream TX event"); quic_ack_rx_data (stream_session); - if (!svm_fifo_max_dequeue (stream_session->tx_fifo)) - return 0; - stream = ctx->stream; if (!quicly_sendstate_is_open (&stream->sendstate)) { @@ -1700,8 +1711,18 @@ quic_custom_tx_callback (void *s, u32 max_burst_size) return -1; } - if ((rv = quicly_stream_sync_sendbuf (stream, 1)) != 0) - return rv; + stream_data = (quic_stream_data_t *) stream->data; + max_deq = svm_fifo_max_dequeue (stream_session->tx_fifo); + QUIC_ASSERT (max_deq >= stream_data->app_tx_data_len); + if (max_deq == stream_data->app_tx_data_len) + { + QUIC_DBG (3, "TX but no data %d / %d", max_deq, + stream_data->app_tx_data_len); + return 0; + } + stream_data->app_tx_data_len = max_deq; + rv = quicly_stream_sync_sendbuf (stream, 1); + QUIC_ASSERT (!rv); tx_end: quic_send_packets (ctx); @@ -1878,8 +1899,8 @@ quic_process_one_rx_packet (u64 udp_session_handle, svm_fifo_t * f, ret = svm_fifo_peek (f, fifo_offset, SESSION_CONN_HDR_LEN, (u8 *) & pctx->ph); - ASSERT (ret == SESSION_CONN_HDR_LEN); - ASSERT (pctx->ph.data_offset == 0); + QUIC_ASSERT (ret == SESSION_CONN_HDR_LEN); + QUIC_ASSERT (pctx->ph.data_offset == 0); full_len = pctx->ph.data_length + SESSION_CONN_HDR_LEN; if (full_len > cur_deq) { diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h index dfcb0e6f17a..9433a895a8b 100644 --- a/src/plugins/quic/quic.h +++ b/src/plugins/quic/quic.h @@ -69,11 +69,23 @@ #define QUIC_DBG(_lvl, _fmt, _args...) #endif +#if CLIB_ASSERT_ENABLE +#define QUIC_ASSERT(truth) ASSERT (truth) +#else +#define QUIC_ASSERT(truth) \ + do { \ + if (PREDICT_FALSE (! (truth))) \ + QUIC_ERR ("ASSERT(%s) failed", # truth); \ + } while (0) +#endif + #define QUIC_ERR(_fmt, _args...) \ do { \ clib_warning ("QUIC-ERR: " _fmt, ##_args); \ } while (0) + + extern vlib_node_registration_t quic_input_node; typedef enum @@ -167,6 +179,7 @@ typedef struct quic_stream_data_ u32 ctx_id; u32 thread_index; u32 app_rx_data_len; /**< bytes received, to be read by external app */ + u32 app_tx_data_len; /**< bytes sent */ } quic_stream_data_t; typedef struct quic_worker_ctx_ -- cgit 1.2.3-korg