summaryrefslogtreecommitdiffstats
path: root/src/plugins/quic
diff options
context:
space:
mode:
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>2019-05-22 18:41:50 +0200
committerFlorin Coras <florin.coras@gmail.com>2019-06-28 19:28:17 +0000
commite82a7ade8a9667f1e49067bf59010f60beda5452 (patch)
tree619ead31dfa9a20e1fcd0043e79da5fb564b9a9f /src/plugins/quic
parentf73d4c2084c9cb6df4a1f8582acef523e4ba0cb2 (diff)
quic : Use TX event for app read notification
Type: feature Change-Id: I1846cdeb35f079249f66a0351aa244c540923a43 Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
Diffstat (limited to 'src/plugins/quic')
-rw-r--r--src/plugins/quic/quic.c158
-rw-r--r--src/plugins/quic/quic.h3
2 files changed, 103 insertions, 58 deletions
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index f099d074211..93c01628e80 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -184,18 +184,10 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
tc = session_get_transport (udp_session);
max_enqueue = svm_fifo_max_enqueue (f);
- if (max_enqueue <= sizeof (session_dgram_hdr_t))
- {
- QUIC_DBG (1, "Not enough space to enqueue header");
- return QUIC_ERROR_FULL_FIFO;
- }
-
- max_enqueue -= sizeof (session_dgram_hdr_t);
-
- if (max_enqueue < len)
+ if (max_enqueue < SESSION_CONN_HDR_LEN + len)
{
QUIC_DBG (1, "Too much data to send, max_enqueue %u, len %u",
- max_enqueue, len);
+ max_enqueue, len + SESSION_CONN_HDR_LEN);
return QUIC_ERROR_FULL_FIFO;
}
@@ -243,10 +235,9 @@ static int
quic_sendable_packet_count (session_t * udp_session)
{
u32 max_enqueue;
+ u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN;
max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo);
- return clib_min (max_enqueue /
- (QUIC_MAX_PACKET_SIZE + sizeof (session_dgram_hdr_t)),
- QUIC_SEND_PACKET_VEC_SIZE);
+ return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE);
}
static int
@@ -259,7 +250,7 @@ quic_send_packets (quic_ctx_t * ctx)
quicly_context_t *quicly_context;
app_worker_t *app_wrk;
application_t *app;
- int err;
+ int err = 0;
/* We have sctx, get qctx */
if (ctx->c_quic_ctx_id.is_stream)
@@ -270,7 +261,10 @@ quic_send_packets (quic_ctx_t * ctx)
ASSERT (!ctx->c_quic_ctx_id.is_stream);
udp_session =
- session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle);
+ session_get_from_handle_if_valid (ctx->c_quic_ctx_id.udp_session_handle);
+ if (!udp_session)
+ goto quicly_error;
+
conn = ctx->c_quic_ctx_id.conn;
if (!conn)
@@ -311,16 +305,21 @@ quic_send_packets (quic_ctx_t * ctx)
}
while (num_packets > 0 && num_packets == max_packets);
+stop_sending:
if (svm_fifo_set_event (udp_session->tx_fifo))
- session_send_io_evt_to_thread (udp_session->tx_fifo, SESSION_IO_EVT_TX);
+ if ((err =
+ session_send_io_evt_to_thread (udp_session->tx_fifo,
+ SESSION_IO_EVT_TX)))
+ clib_warning ("Event enqueue errored %d", err);
-stop_sending:
+ QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo),
+ svm_fifo_max_dequeue (udp_session->rx_fifo));
quic_update_timer (ctx);
return 0;
quicly_error:
- if ((err != QUICLY_ERROR_PACKET_IGNORED) & (err !=
- QUICLY_ERROR_FREE_CONNECTION))
+ if (err && err != QUICLY_ERROR_PACKET_IGNORED
+ && err != QUICLY_ERROR_FREE_CONNECTION)
clib_warning ("Quic error '%s'.", quic_format_err (err));
quic_connection_closed (ctx->c_c_index, ctx->c_thread_index);
return 1;
@@ -393,6 +392,31 @@ get_stream_session_from_stream (quicly_stream_t * stream)
return session_get (ctx->c_s_index, stream_data->thread_index);
}
+static void
+quic_ack_rx_data (session_t * stream_session)
+{
+ u32 max_deq;
+ quic_ctx_t *sctx;
+ svm_fifo_t *f;
+ quicly_stream_t *stream;
+ quic_stream_data_t *stream_data;
+
+ sctx =
+ quic_ctx_get (stream_session->connection_index,
+ stream_session->thread_index);
+ ASSERT (sctx->c_quic_ctx_id.is_stream);
+ stream = sctx->c_quic_ctx_id.stream;
+ stream_data = (quic_stream_data_t *) stream->data;
+
+ f = stream_session->rx_fifo;
+ max_deq = svm_fifo_max_dequeue (f);
+
+ ASSERT (stream_data->app_rx_data_len >= max_deq);
+ quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
+ QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
+ stream_data->app_rx_data_len = max_deq;
+}
+
static int
quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
@@ -413,26 +437,28 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
max_enq = svm_fifo_max_enqueue_prod (f);
QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq);
- if (off + len > max_enq)
+ if (off - stream_data->app_rx_data_len + len > max_enq)
{
- /* TODO : can we find a better solution, listening on RX fifo evts ? */
- QUIC_DBG (3, "Ingoring packet, RX fifo is full");
- return QUICLY_ERROR_PACKET_IGNORED;
+ QUIC_DBG (1, "Error RX fifo is full");
+ return 1;
}
- if (off == 0)
+ if (off == stream_data->app_rx_data_len)
{
+ /* Streams live on the same thread so (f, stream_data) should stay consistent */
rlen = svm_fifo_enqueue (f, len, (u8 *) src);
+ stream_data->app_rx_data_len += rlen;
ASSERT (rlen >= len);
-
- quicly_stream_sync_recvbuf (stream, rlen);
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
app_worker_lock_and_send_event (app_wrk, stream_session,
SESSION_IO_EVT_RX);
+ quic_ack_rx_data (stream_session);
}
else
{
- rlen = svm_fifo_enqueue_with_offset (f, off, len, (u8 *) src);
+ rlen =
+ svm_fifo_enqueue_with_offset (f, off - stream_data->app_rx_data_len,
+ len, (u8 *) src);
ASSERT (rlen == 0);
}
return 0;
@@ -443,11 +469,13 @@ quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
{
session_t *stream_session;
svm_fifo_t *f;
+ int rv;
stream_session = get_stream_session_from_stream (stream);
f = stream_session->tx_fifo;
- ASSERT (svm_fifo_dequeue_drop (f, delta) == delta);
+ rv = svm_fifo_dequeue_drop (f, delta);
+ ASSERT (rv == delta);
quicly_stream_sync_sendbuf (stream, 0);
}
@@ -472,9 +500,9 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
}
else
{
- QUIC_DBG (3, "Wrote ALL");
*wrote_all = 1;
*len = deq_max - off;
+ QUIC_DBG (3, "Wrote ALL, %u", *len);
}
/* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
@@ -538,6 +566,7 @@ quic_accept_stream (void *s)
stream_data = (quic_stream_data_t *) stream->data;
stream_data->ctx_id = sctx_id;
stream_data->thread_index = sctx->c_thread_index;
+ stream_data->app_rx_data_len = 0;
sctx->c_s_index = stream_session->session_index;
stream_session->session_state = SESSION_STATE_CREATED;
@@ -557,6 +586,9 @@ quic_accept_stream (void *s)
quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR);
return;
}
+ svm_fifo_add_want_tx_ntf (stream_session->rx_fifo,
+ SVM_FIFO_WANT_TX_NOTIF_IF_FULL |
+ SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY);
rv = app_worker_accept_notify (app_wrk, stream_session);
if (rv)
@@ -1211,6 +1243,10 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
return app_worker_connect_notify (app_wrk, NULL, sep->opaque);
}
+ svm_fifo_add_want_tx_ntf (stream_session->rx_fifo,
+ SVM_FIFO_WANT_TX_NOTIF_IF_FULL |
+ SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY);
+
stream_session->session_state = SESSION_STATE_READY;
if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque))
{
@@ -1225,6 +1261,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
stream_data = (quic_stream_data_t *) stream->data;
stream_data->ctx_id = sctx->c_c_index;
stream_data->thread_index = sctx->c_thread_index;
+ stream_data->app_rx_data_len = 0;
return 0;
}
@@ -1798,6 +1835,17 @@ quic_del_segment_callback (u32 client_index, u64 seg_handle)
return 0;
}
+
+static int
+quic_custom_app_rx_callback (transport_connection_t * tc)
+{
+ session_t *stream_session = session_get (tc->s_index, tc->thread_index);
+ QUIC_DBG (2, "Received app READ notification");
+ quic_ack_rx_data (stream_session);
+ svm_fifo_reset_tx_ntf (stream_session->rx_fifo);
+ return 0;
+}
+
static int
quic_custom_tx_callback (void *s)
{
@@ -1806,7 +1854,6 @@ quic_custom_tx_callback (void *s)
quic_ctx_t *ctx;
int rv;
- svm_fifo_unset_event (stream_session->tx_fifo);
if (PREDICT_FALSE
(stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING))
return 0;
@@ -1818,6 +1865,11 @@ quic_custom_tx_callback (void *s)
goto tx_end; /* Most probably a reschedule */
}
+ QUIC_DBG (3, "Stream TX event");
+ quic_ack_rx_data (stream_session);
+ if (!svm_fifo_max_dequeue (stream_session->tx_fifo))
+ return 0;
+
stream = ctx->c_quic_ctx_id.stream;
if (!quicly_sendstate_is_open (&stream->sendstate))
{
@@ -2013,6 +2065,7 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle,
* reset, then the next CID is highly likely to contain a non-authenticating
* CID, ... */
QUIC_DBG (2, "Sending stateless reset");
+ int rv;
quicly_datagram_t *dgram;
session_t *udp_session;
if (packet.cid.dest.plaintext.node_id == 0
@@ -2023,7 +2076,11 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle,
if (dgram == NULL)
return 1;
udp_session = session_get_from_handle (udp_session_handle);
- return quic_send_datagram (udp_session, dgram); /* TODO : set event on fifo */
+ rv = quic_send_datagram (udp_session, dgram);
+ if (svm_fifo_set_event (udp_session->tx_fifo))
+ session_send_io_evt_to_thread (udp_session->tx_fifo,
+ SESSION_IO_EVT_TX);
+ return rv;
}
return 0;
}
@@ -2041,8 +2098,7 @@ quic_app_rx_callback (session_t * udp_session)
struct sockaddr_in6 sa6;
struct sockaddr *sa = (struct sockaddr *) &sa6;
socklen_t salen;
- u32 max_deq, len, full_len, ctx_index = UINT32_MAX, ctx_thread =
- UINT32_MAX, ret;
+ u32 max_deq, full_len, ctx_index = UINT32_MAX, ctx_thread = UINT32_MAX, ret;
u8 *data;
int err;
u32 *opening_ctx_pool, *ctx_index_ptr;
@@ -2050,7 +2106,6 @@ quic_app_rx_callback (session_t * udp_session)
u64 udp_session_handle = session_handle (udp_session);
int rv = 0;
u32 thread_index = vlib_get_thread_index ();
-
app = application_get_if_valid (app_index);
if (!app)
{
@@ -2063,24 +2118,23 @@ quic_app_rx_callback (session_t * udp_session)
{
udp_session = session_get_from_handle (udp_session_handle); /* session alloc might have happened */
f = udp_session->rx_fifo;
- svm_fifo_unset_event (f);
max_deq = svm_fifo_max_dequeue (f);
- if (max_deq < sizeof (session_dgram_hdr_t))
+ if (max_deq == 0)
return 0;
- ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph);
- if (ret != SESSION_CONN_HDR_LEN)
+ if (max_deq < SESSION_CONN_HDR_LEN)
{
- QUIC_DBG (1, "Not enough data for header in RX");
+ QUIC_DBG (1, "Not enough data for even a header in RX");
return 1;
}
- if (ph.data_length < ph.data_offset)
+ ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph);
+ if (ret != SESSION_CONN_HDR_LEN)
{
- QUIC_DBG (1, "Not enough data vs offset in RX");
+ QUIC_DBG (1, "Not enough data for header in RX");
return 1;
}
- len = ph.data_length - ph.data_offset;
- full_len = ph.data_length + ph.data_offset + SESSION_CONN_HDR_LEN;
+ ASSERT (ph.data_offset == 0);
+ full_len = ph.data_length + SESSION_CONN_HDR_LEN;
if (full_len > max_deq)
{
QUIC_DBG (1, "Not enough data in fifo RX");
@@ -2090,9 +2144,7 @@ quic_app_rx_callback (session_t * udp_session)
/* Quicly can read len bytes from the fifo at offset:
* ph.data_offset + SESSION_CONN_HDR_LEN */
data = malloc (ph.data_length);
- ret =
- svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN,
- ph.data_length, data);
+ ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN, ph.data_length, data);
if (ret != ph.data_length)
{
QUIC_DBG (1, "Not enough data peeked in RX");
@@ -2100,15 +2152,10 @@ quic_app_rx_callback (session_t * udp_session)
return 1;
}
- plen =
- quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet,
- data, len);
-
rv = 0;
quic_build_sockaddr (sa, &salen, &ph.rmt_ip, ph.rmt_port, ph.is_ip4);
- plen =
- quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet,
- data, len);
+ plen = quicly_decode_packet ((quicly_context_t *) app->quicly_ctx,
+ &packet, data, ph.data_length);
if (plen != SIZE_MAX)
{
@@ -2157,9 +2204,7 @@ quic_app_rx_callback (session_t * udp_session)
}
}
ctx_search_done:
- svm_fifo_dequeue_drop (f,
- ph.data_length + ph.data_offset +
- SESSION_CONN_HDR_LEN);
+ svm_fifo_dequeue_drop (f, full_len);
free (data);
}
while (1);
@@ -2233,6 +2278,7 @@ static const transport_proto_vft_t quic_proto = {
.get_connection = quic_connection_get,
.get_listener = quic_listener_get,
.update_time = quic_update_time,
+ .app_rx_evt = quic_custom_app_rx_callback,
.custom_tx = quic_custom_tx_callback,
.format_connection = format_quic_connection,
.format_half_open = format_quic_half_open,
diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h
index 3ba0455d733..3ecb04f0c63 100644
--- a/src/plugins/quic/quic.h
+++ b/src/plugins/quic/quic.h
@@ -32,8 +32,6 @@
**/
#define QUIC_DEBUG 0
-#define QUIC_DEBUG_LEVEL_CLIENT 0
-#define QUIC_DEBUG_LEVEL_SERVER 0
#define QUIC_DEFAULT_CA_CERT_PATH "/etc/ssl/certs/ca-certificates.crt"
@@ -95,6 +93,7 @@ typedef struct quic_stream_data_
{
u32 ctx_id;
u32 thread_index;
+ u32 app_rx_data_len; /* bytes received, to be read by external app */
} quic_stream_data_t;
typedef struct quic_worker_ctx_