From e82a7ade8a9667f1e49067bf59010f60beda5452 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Wed, 22 May 2019 18:41:50 +0200 Subject: quic : Use TX event for app read notification Type: feature Change-Id: I1846cdeb35f079249f66a0351aa244c540923a43 Signed-off-by: Nathan Skrzypczak --- src/plugins/quic/quic.c | 158 +++++++++++++++++++++++++++++++----------------- 1 file changed, 102 insertions(+), 56 deletions(-) (limited to 'src/plugins/quic/quic.c') diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index f099d074211..93c01628e80 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -184,18 +184,10 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet) tc = session_get_transport (udp_session); max_enqueue = svm_fifo_max_enqueue (f); - if (max_enqueue <= sizeof (session_dgram_hdr_t)) - { - QUIC_DBG (1, "Not enough space to enqueue header"); - return QUIC_ERROR_FULL_FIFO; - } - - max_enqueue -= sizeof (session_dgram_hdr_t); - - if (max_enqueue < len) + if (max_enqueue < SESSION_CONN_HDR_LEN + len) { QUIC_DBG (1, "Too much data to send, max_enqueue %u, len %u", - max_enqueue, len); + max_enqueue, len + SESSION_CONN_HDR_LEN); return QUIC_ERROR_FULL_FIFO; } @@ -243,10 +235,9 @@ static int quic_sendable_packet_count (session_t * udp_session) { u32 max_enqueue; + u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN; max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo); - return clib_min (max_enqueue / - (QUIC_MAX_PACKET_SIZE + sizeof (session_dgram_hdr_t)), - QUIC_SEND_PACKET_VEC_SIZE); + return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE); } static int @@ -259,7 +250,7 @@ quic_send_packets (quic_ctx_t * ctx) quicly_context_t *quicly_context; app_worker_t *app_wrk; application_t *app; - int err; + int err = 0; /* We have sctx, get qctx */ if (ctx->c_quic_ctx_id.is_stream) @@ -270,7 +261,10 @@ quic_send_packets (quic_ctx_t * ctx) ASSERT (!ctx->c_quic_ctx_id.is_stream); udp_session = - session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle); + session_get_from_handle_if_valid (ctx->c_quic_ctx_id.udp_session_handle); + if (!udp_session) + goto quicly_error; + conn = ctx->c_quic_ctx_id.conn; if (!conn) @@ -311,16 +305,21 @@ quic_send_packets (quic_ctx_t * ctx) } while (num_packets > 0 && num_packets == max_packets); +stop_sending: if (svm_fifo_set_event (udp_session->tx_fifo)) - session_send_io_evt_to_thread (udp_session->tx_fifo, SESSION_IO_EVT_TX); + if ((err = + session_send_io_evt_to_thread (udp_session->tx_fifo, + SESSION_IO_EVT_TX))) + clib_warning ("Event enqueue errored %d", err); -stop_sending: + QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo), + svm_fifo_max_dequeue (udp_session->rx_fifo)); quic_update_timer (ctx); return 0; quicly_error: - if ((err != QUICLY_ERROR_PACKET_IGNORED) & (err != - QUICLY_ERROR_FREE_CONNECTION)) + if (err && err != QUICLY_ERROR_PACKET_IGNORED + && err != QUICLY_ERROR_FREE_CONNECTION) clib_warning ("Quic error '%s'.", quic_format_err (err)); quic_connection_closed (ctx->c_c_index, ctx->c_thread_index); return 1; @@ -393,6 +392,31 @@ get_stream_session_from_stream (quicly_stream_t * stream) return session_get (ctx->c_s_index, stream_data->thread_index); } +static void +quic_ack_rx_data (session_t * stream_session) +{ + u32 max_deq; + quic_ctx_t *sctx; + svm_fifo_t *f; + quicly_stream_t *stream; + quic_stream_data_t *stream_data; + + sctx = + quic_ctx_get (stream_session->connection_index, + stream_session->thread_index); + ASSERT (sctx->c_quic_ctx_id.is_stream); + stream = sctx->c_quic_ctx_id.stream; + stream_data = (quic_stream_data_t *) stream->data; + + f = stream_session->rx_fifo; + max_deq = svm_fifo_max_dequeue (f); + + ASSERT (stream_data->app_rx_data_len >= max_deq); + quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq); + QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq); + stream_data->app_rx_data_len = max_deq; +} + static int quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, size_t len) @@ -413,26 +437,28 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, max_enq = svm_fifo_max_enqueue_prod (f); QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq); - if (off + len > max_enq) + if (off - stream_data->app_rx_data_len + len > max_enq) { - /* TODO : can we find a better solution, listening on RX fifo evts ? */ - QUIC_DBG (3, "Ingoring packet, RX fifo is full"); - return QUICLY_ERROR_PACKET_IGNORED; + QUIC_DBG (1, "Error RX fifo is full"); + return 1; } - if (off == 0) + if (off == stream_data->app_rx_data_len) { + /* Streams live on the same thread so (f, stream_data) should stay consistent */ rlen = svm_fifo_enqueue (f, len, (u8 *) src); + stream_data->app_rx_data_len += rlen; ASSERT (rlen >= len); - - quicly_stream_sync_recvbuf (stream, rlen); app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) app_worker_lock_and_send_event (app_wrk, stream_session, SESSION_IO_EVT_RX); + quic_ack_rx_data (stream_session); } else { - rlen = svm_fifo_enqueue_with_offset (f, off, len, (u8 *) src); + rlen = + svm_fifo_enqueue_with_offset (f, off - stream_data->app_rx_data_len, + len, (u8 *) src); ASSERT (rlen == 0); } return 0; @@ -443,11 +469,13 @@ quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta) { session_t *stream_session; svm_fifo_t *f; + int rv; stream_session = get_stream_session_from_stream (stream); f = stream_session->tx_fifo; - ASSERT (svm_fifo_dequeue_drop (f, delta) == delta); + rv = svm_fifo_dequeue_drop (f, delta); + ASSERT (rv == delta); quicly_stream_sync_sendbuf (stream, 0); } @@ -472,9 +500,9 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst, } else { - QUIC_DBG (3, "Wrote ALL"); *wrote_all = 1; *len = deq_max - off; + QUIC_DBG (3, "Wrote ALL, %u", *len); } /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */ @@ -538,6 +566,7 @@ quic_accept_stream (void *s) stream_data = (quic_stream_data_t *) stream->data; stream_data->ctx_id = sctx_id; stream_data->thread_index = sctx->c_thread_index; + stream_data->app_rx_data_len = 0; sctx->c_s_index = stream_session->session_index; stream_session->session_state = SESSION_STATE_CREATED; @@ -557,6 +586,9 @@ quic_accept_stream (void *s) quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR); return; } + svm_fifo_add_want_tx_ntf (stream_session->rx_fifo, + SVM_FIFO_WANT_TX_NOTIF_IF_FULL | + SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY); rv = app_worker_accept_notify (app_wrk, stream_session); if (rv) @@ -1211,6 +1243,10 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep) return app_worker_connect_notify (app_wrk, NULL, sep->opaque); } + svm_fifo_add_want_tx_ntf (stream_session->rx_fifo, + SVM_FIFO_WANT_TX_NOTIF_IF_FULL | + SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY); + stream_session->session_state = SESSION_STATE_READY; if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque)) { @@ -1225,6 +1261,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep) stream_data = (quic_stream_data_t *) stream->data; stream_data->ctx_id = sctx->c_c_index; stream_data->thread_index = sctx->c_thread_index; + stream_data->app_rx_data_len = 0; return 0; } @@ -1798,6 +1835,17 @@ quic_del_segment_callback (u32 client_index, u64 seg_handle) return 0; } + +static int +quic_custom_app_rx_callback (transport_connection_t * tc) +{ + session_t *stream_session = session_get (tc->s_index, tc->thread_index); + QUIC_DBG (2, "Received app READ notification"); + quic_ack_rx_data (stream_session); + svm_fifo_reset_tx_ntf (stream_session->rx_fifo); + return 0; +} + static int quic_custom_tx_callback (void *s) { @@ -1806,7 +1854,6 @@ quic_custom_tx_callback (void *s) quic_ctx_t *ctx; int rv; - svm_fifo_unset_event (stream_session->tx_fifo); if (PREDICT_FALSE (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING)) return 0; @@ -1818,6 +1865,11 @@ quic_custom_tx_callback (void *s) goto tx_end; /* Most probably a reschedule */ } + QUIC_DBG (3, "Stream TX event"); + quic_ack_rx_data (stream_session); + if (!svm_fifo_max_dequeue (stream_session->tx_fifo)) + return 0; + stream = ctx->c_quic_ctx_id.stream; if (!quicly_sendstate_is_open (&stream->sendstate)) { @@ -2013,6 +2065,7 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle, * reset, then the next CID is highly likely to contain a non-authenticating * CID, ... */ QUIC_DBG (2, "Sending stateless reset"); + int rv; quicly_datagram_t *dgram; session_t *udp_session; if (packet.cid.dest.plaintext.node_id == 0 @@ -2023,7 +2076,11 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle, if (dgram == NULL) return 1; udp_session = session_get_from_handle (udp_session_handle); - return quic_send_datagram (udp_session, dgram); /* TODO : set event on fifo */ + rv = quic_send_datagram (udp_session, dgram); + if (svm_fifo_set_event (udp_session->tx_fifo)) + session_send_io_evt_to_thread (udp_session->tx_fifo, + SESSION_IO_EVT_TX); + return rv; } return 0; } @@ -2041,8 +2098,7 @@ quic_app_rx_callback (session_t * udp_session) struct sockaddr_in6 sa6; struct sockaddr *sa = (struct sockaddr *) &sa6; socklen_t salen; - u32 max_deq, len, full_len, ctx_index = UINT32_MAX, ctx_thread = - UINT32_MAX, ret; + u32 max_deq, full_len, ctx_index = UINT32_MAX, ctx_thread = UINT32_MAX, ret; u8 *data; int err; u32 *opening_ctx_pool, *ctx_index_ptr; @@ -2050,7 +2106,6 @@ quic_app_rx_callback (session_t * udp_session) u64 udp_session_handle = session_handle (udp_session); int rv = 0; u32 thread_index = vlib_get_thread_index (); - app = application_get_if_valid (app_index); if (!app) { @@ -2063,24 +2118,23 @@ quic_app_rx_callback (session_t * udp_session) { udp_session = session_get_from_handle (udp_session_handle); /* session alloc might have happened */ f = udp_session->rx_fifo; - svm_fifo_unset_event (f); max_deq = svm_fifo_max_dequeue (f); - if (max_deq < sizeof (session_dgram_hdr_t)) + if (max_deq == 0) return 0; - ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph); - if (ret != SESSION_CONN_HDR_LEN) + if (max_deq < SESSION_CONN_HDR_LEN) { - QUIC_DBG (1, "Not enough data for header in RX"); + QUIC_DBG (1, "Not enough data for even a header in RX"); return 1; } - if (ph.data_length < ph.data_offset) + ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph); + if (ret != SESSION_CONN_HDR_LEN) { - QUIC_DBG (1, "Not enough data vs offset in RX"); + QUIC_DBG (1, "Not enough data for header in RX"); return 1; } - len = ph.data_length - ph.data_offset; - full_len = ph.data_length + ph.data_offset + SESSION_CONN_HDR_LEN; + ASSERT (ph.data_offset == 0); + full_len = ph.data_length + SESSION_CONN_HDR_LEN; if (full_len > max_deq) { QUIC_DBG (1, "Not enough data in fifo RX"); @@ -2090,9 +2144,7 @@ quic_app_rx_callback (session_t * udp_session) /* Quicly can read len bytes from the fifo at offset: * ph.data_offset + SESSION_CONN_HDR_LEN */ data = malloc (ph.data_length); - ret = - svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN, - ph.data_length, data); + ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN, ph.data_length, data); if (ret != ph.data_length) { QUIC_DBG (1, "Not enough data peeked in RX"); @@ -2100,15 +2152,10 @@ quic_app_rx_callback (session_t * udp_session) return 1; } - plen = - quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet, - data, len); - rv = 0; quic_build_sockaddr (sa, &salen, &ph.rmt_ip, ph.rmt_port, ph.is_ip4); - plen = - quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet, - data, len); + plen = quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, + &packet, data, ph.data_length); if (plen != SIZE_MAX) { @@ -2157,9 +2204,7 @@ quic_app_rx_callback (session_t * udp_session) } } ctx_search_done: - svm_fifo_dequeue_drop (f, - ph.data_length + ph.data_offset + - SESSION_CONN_HDR_LEN); + svm_fifo_dequeue_drop (f, full_len); free (data); } while (1); @@ -2233,6 +2278,7 @@ static const transport_proto_vft_t quic_proto = { .get_connection = quic_connection_get, .get_listener = quic_listener_get, .update_time = quic_update_time, + .app_rx_evt = quic_custom_app_rx_callback, .custom_tx = quic_custom_tx_callback, .format_connection = format_quic_connection, .format_half_open = format_quic_half_open, -- cgit 1.2.3-korg