aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAloys Augustin <aloaugus@cisco.com>2019-05-14 14:13:51 +0200
committerDave Wallace <dwallacelf@gmail.com>2019-05-14 19:52:29 +0000
commitba123e152bc16197654c8ac85550a6b1f7b8a791 (patch)
tree22df80a7d7f2e1c2232a8dde30d6e09a01b91594 /src
parentf9d4ab42724b260d5c242f7291d05f74cd725d7d (diff)
QUIC multi thread update
* Add support for multiple threads * Replace quicly buffers with fifos * Fix cleanup of sessions * Update quicly release version Change-Id: I551f936bbec05a15703f043ee85c8e1ba0ab9723 Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/plugins/quic/quic.c538
-rw-r--r--src/plugins/quic/quic.h14
-rw-r--r--src/tests/vnet/session/quic_echo.c54
-rw-r--r--src/vnet/session/application_interface.c2
4 files changed, 329 insertions, 279 deletions
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index b77a1d37d8e..4630689df64 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -26,20 +26,20 @@
#include <quic/quic.h>
-#include <quicly/streambuf.h>
+#include <quicly/defaults.h>
#include <picotls/openssl.h>
#include <picotls/pembase64.h>
static quic_main_t quic_main;
static void quic_update_timer (quic_ctx_t * ctx);
-static void quic_connection_closed (u32 conn_index);
+static void quic_connection_closed (u32 conn_index, u32 thread_index);
static void quic_disconnect (u32 ctx_index, u32 thread_index);
static int quic_connect_new_stream (session_endpoint_cfg_t * sep);
static int quic_connect_new_connection (session_endpoint_cfg_t * sep);
-static int64_t quic_get_time (quicly_now_cb * self);
-static quicly_now_cb quicly_vpp_now_cb = { quic_get_time };
+static int64_t quic_get_time (quicly_now_t * self);
+static quicly_now_t quicly_vpp_now_cb = { quic_get_time };
static void quic_transfer_connection (u32 ctx_index, u32 dest_thread);
@@ -57,12 +57,12 @@ static void quic_transfer_connection (u32 ctx_index, u32 dest_thread);
#define QUICLY_PACKET_TYPE_HANDSHAKE (QUICLY_LONG_HEADER_BIT | QUICLY_QUIC_BIT | 0x20)
#define QUICLY_PACKET_TYPE_RETRY (QUICLY_LONG_HEADER_BIT | QUICLY_QUIC_BIT | 0x30)
#define QUICLY_PACKET_TYPE_BITMASK 0xf0
+#define QUIC_FIFO_SIZE (64 << 10)
static u32
-quic_ctx_alloc ()
+quic_ctx_alloc (u32 thread_index)
{
- u8 thread_index = vlib_get_thread_index ();
quic_main_t *qm = &quic_main;
quic_ctx_t *ctx;
@@ -70,6 +70,8 @@ quic_ctx_alloc ()
memset (ctx, 0, sizeof (quic_ctx_t));
ctx->c_thread_index = thread_index;
+ QUIC_DBG (1, "Allocated quic_ctx %u on thread %u",
+ ctx - qm->ctx_pool[thread_index], thread_index);
return ctx - qm->ctx_pool[thread_index];
}
@@ -84,16 +86,24 @@ quic_ctx_free (quic_ctx_t * ctx)
}
static quic_ctx_t *
-quic_ctx_get (u32 ctx_index)
+quic_ctx_get (u32 ctx_index, u32 thread_index)
{
- return pool_elt_at_index (quic_main.ctx_pool[vlib_get_thread_index ()],
- ctx_index);
+ return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
}
static quic_ctx_t *
-quic_ctx_get_w_thread (u32 ctx_index, u32 thread_index)
+quic_get_conn_ctx (quicly_conn_t * conn)
{
- return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
+ u64 conn_data;
+ conn_data = (u64) * quicly_get_data (conn);
+ return quic_ctx_get (conn_data & UINT32_MAX, conn_data >> 32);
+}
+
+static void
+quic_store_conn_ctx (quicly_conn_t * conn, quic_ctx_t * ctx)
+{
+ *quicly_get_data (conn) =
+ (void *) (((u64) ctx->c_thread_index) << 32 | (u64) ctx->c_c_index);
}
static void
@@ -165,13 +175,13 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
ret = svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
if (ret != sizeof (hdr))
{
- QUIC_DBG (2, "Not enough space to enqueue header");
+ QUIC_DBG (1, "Not enough space to enqueue header");
return 1;
}
ret = svm_fifo_enqueue (f, len, packet->data.base);
if (ret != len)
{
- QUIC_DBG (2, "Not enough space to enqueue payload");
+ QUIC_DBG (1, "Not enough space to enqueue payload");
return 1;
}
return 0;
@@ -196,9 +206,15 @@ quic_send_packets (quic_ctx_t * ctx)
session_t *udp_session;
quicly_conn_t *conn;
size_t num_packets, i, max_packets;
+ quicly_context_t *quicly_context;
+ app_worker_t *app_wrk;
+ application_t *app;
+
/* We have sctx, get qctx */
if (ctx->c_quic_ctx_id.is_stream)
- ctx = quic_ctx_get (ctx->c_quic_ctx_id.quic_connection_ctx_id);
+ ctx =
+ quic_ctx_get (ctx->c_quic_ctx_id.quic_connection_ctx_id,
+ ctx->c_thread_index);
ASSERT (!ctx->c_quic_ctx_id.is_stream);
@@ -213,6 +229,17 @@ quic_send_packets (quic_ctx_t * ctx)
if (quic_sendable_packet_count (udp_session) < 2)
goto stop_sending;
+ app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_id);
+ if (!app_wrk)
+ {
+ clib_warning ("Tried to send packets on non existing app worker %u",
+ ctx->c_quic_ctx_id.parent_app_wrk_id);
+ quic_connection_closed (ctx->c_c_index, ctx->c_thread_index);
+ return 1;
+ }
+ app = application_get (app_wrk->app_index);
+
+ quicly_context = (quicly_context_t *) app->quicly_ctx;
do
{
max_packets = quic_sendable_packet_count (udp_session);
@@ -227,10 +254,9 @@ quic_send_packets (quic_ctx_t * ctx)
if (quic_send_datagram (udp_session, packets[i]))
goto quicly_error;
- quicly_default_free_packet_cb.cb
- (&quicly_default_free_packet_cb, packets[i]);
+ quicly_context->packet_allocator->
+ free_packet (quicly_context->packet_allocator, packets[i]);
}
-
}
while (num_packets > 0 && num_packets == max_packets);
@@ -243,7 +269,7 @@ stop_sending:
quicly_error:
QUIC_DBG (1, "Error sending packets closing connection");
- quic_connection_closed (ctx->c_c_index);
+ quic_connection_closed (ctx->c_c_index, ctx->c_thread_index);
return 1;
}
@@ -253,15 +279,18 @@ quicly_error:
*****************************************************************************/
static void
-quic_on_stream_destroy (quicly_stream_t * stream)
+quic_on_stream_destroy (quicly_stream_t * stream, int err)
{
- quic_stream_data_t *stream_data = stream->data;
+ quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
u32 sctx_id = stream_data->ctx_id;
- quic_ctx_t *sctx = quic_ctx_get (sctx_id);
+ session_t *stream_session;
+ quic_ctx_t *sctx = quic_ctx_get (sctx_id, stream_data->thread_index);
QUIC_DBG (2, "Stream %ld (ctx %u) destroyed", stream->stream_id, sctx_id);
+ stream_session = session_get (sctx->c_s_index, sctx->c_thread_index);
+ stream_session->session_state = SESSION_STATE_CLOSED;
session_transport_delete_notify (&sctx->connection);
quic_ctx_free (sctx);
- quicly_streambuf_destroy (stream);
+ free (stream->data);
}
static int
@@ -278,97 +307,125 @@ quic_on_receive_reset (quicly_stream_t * stream, int error_code)
return 0;
}
+static session_t *
+get_stream_session_from_stream (quicly_stream_t * stream)
+{
+ quic_ctx_t *ctx;
+ quic_stream_data_t *stream_data;
+
+ stream_data = (quic_stream_data_t *) stream->data;
+ ctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
+ return session_get (ctx->c_s_index, stream_data->thread_index);
+}
+
static int
quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
-/* QUIC_DBG (2, "received data: %lu bytes", len); */
- u32 max_enq, ctx_id, ret;
+ QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
+ u32 max_enq;
quic_ctx_t *sctx;
session_t *stream_session;
- svm_fifo_t *rx_fifo;
app_worker_t *app_wrk;
- int rv;
- ptls_iovec_t input;
+ svm_fifo_t *f;
+ quic_stream_data_t *stream_data;
+ int rlen;
- if ((rv = quicly_streambuf_ingress_receive (stream, off, src, len)) != 0)
- {
- clib_warning ("quicly_streambuf_ingress_receive Failed");
- return rv;
- }
+ stream_data = (quic_stream_data_t *) stream->data;
+ sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
+ stream_session = session_get (sctx->c_s_index, stream_data->thread_index);
+ f = stream_session->rx_fifo;
- if ((input = quicly_streambuf_ingress_get (stream)).len == 0)
+ max_enq = svm_fifo_max_enqueue_prod (f);
+ QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq);
+ if (off + len > max_enq)
{
- QUIC_DBG (2, "No data, ignoring");
+ /* TODO : can we find a better solution, listening on RX fifo evts ? */
+ QUIC_DBG (3, "Ingoring packet, RX fifo is full");
return QUICLY_ERROR_PACKET_IGNORED;
}
-
- QUIC_DBG (1, "Got %lu bytes in RX", input.len);
-
- ctx_id = ((quic_stream_data_t *) stream->data)->ctx_id;
- sctx = quic_ctx_get (ctx_id);
- stream_session = session_get (sctx->c_s_index, vlib_get_thread_index ());
- rx_fifo = stream_session->rx_fifo;
- max_enq = svm_fifo_max_enqueue (rx_fifo);
-
- if (!max_enq)
+ if (off == 0)
{
- QUIC_DBG (2, "RX FIFO full on stream receive");
- rv = QUICLY_ERROR_PACKET_IGNORED;
- goto notify_app;
+ rlen = svm_fifo_enqueue (f, len, (u8 *) src);
+ ASSERT (rlen >= len);
+
+ quicly_stream_sync_recvbuf (stream, rlen);
+ app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
+ if (PREDICT_TRUE (app_wrk != 0))
+ app_worker_lock_and_send_event (app_wrk, stream_session,
+ SESSION_IO_EVT_RX);
}
- len = clib_min (max_enq, input.len);
- ret = svm_fifo_enqueue (rx_fifo, len, input.base);
- if (ret != len)
+ else
{
- QUIC_DBG (2, "Not enough data to dequeue");
- return 1;
+ rlen = svm_fifo_enqueue_with_offset (f, off, len, (u8 *) src);
+ ASSERT (rlen == 0);
}
- quicly_streambuf_ingress_shift (stream, len);
- rv = 0;
-
-notify_app:
- app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
- if (PREDICT_TRUE (app_wrk != 0))
- app_worker_lock_and_send_event (app_wrk, stream_session,
- SESSION_IO_EVT_RX);
- return rv;
+ return 0;
}
void
-quic_streambuf_egress_shift (quicly_stream_t * stream, size_t delta)
+quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
{
- quicly_streambuf_egress_shift (stream, delta);
+ session_t *stream_session;
+ svm_fifo_t *f;
+
+ stream_session = get_stream_session_from_stream (stream);
+ f = stream_session->tx_fifo;
+
+ ASSERT (svm_fifo_dequeue_drop (f, delta) == delta);
+ quicly_stream_sync_sendbuf (stream, 0);
}
int
-quic_streambuf_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
- size_t * len, int *wrote_all)
+quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
+ size_t * len, int *wrote_all)
{
- quicly_streambuf_t *sbuf = stream->data;
+ session_t *stream_session;
+ svm_fifo_t *f;
+ u32 deq_max, first_deq, max_rd_chunk, rem_offset;
+
+ stream_session = get_stream_session_from_stream (stream);
+ f = stream_session->tx_fifo;
- assert (off < sbuf->egress.buf.off);
+ QUIC_DBG (3, "Emitting %u, offset %u", *len, off);
- if (off + *len < sbuf->egress.buf.off)
+ deq_max = svm_fifo_max_dequeue_cons (f);
+ ASSERT (off <= deq_max);
+ if (off + *len < deq_max)
{
*wrote_all = 0;
}
else
{
- *len = sbuf->egress.buf.off - off;
+ QUIC_DBG (3, "Wrote ALL");
*wrote_all = 1;
+ *len = deq_max - off;
}
- QUIC_DBG (1, "EGRESS %lu bytes in TX", *len);
- memcpy (dst, sbuf->egress.buf.base + off, *len);
+ /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
+ max_rd_chunk = svm_fifo_max_read_chunk (f);
+
+ first_deq = 0;
+ if (off < max_rd_chunk)
+ {
+ first_deq = clib_min (*len, max_rd_chunk - off);
+ clib_memcpy_fast (dst, svm_fifo_head (f) + off, first_deq);
+ }
+
+ if (max_rd_chunk < off + *len)
+ {
+ rem_offset = max_rd_chunk < off ? off - max_rd_chunk : 0;
+ clib_memcpy_fast (dst + first_deq, f->head_chunk->data + rem_offset,
+ *len - first_deq);
+ }
return 0;
}
static const quicly_stream_callbacks_t quic_stream_callbacks = {
.on_destroy = quic_on_stream_destroy,
- .on_send_shift = quic_streambuf_egress_shift,
- .on_send_emit = quic_streambuf_egress_emit,
+ .on_send_shift = quic_fifo_egress_shift,
+ .on_send_emit = quic_fifo_egress_emit,
.on_send_stop = quic_on_stop_sending,
.on_receive = quic_on_receive,
.on_receive_reset = quic_on_receive_reset
@@ -382,20 +439,19 @@ quic_accept_stream (void *s)
quic_stream_data_t *stream_data;
app_worker_t *app_wrk;
quic_ctx_t *qctx, *sctx;
- u32 qctx_id, sctx_id;
+ u32 sctx_id;
quic_main_t *qm = &quic_main;
int rv;
- sctx_id = quic_ctx_alloc ();
+ sctx_id = quic_ctx_alloc (vlib_get_thread_index ());
- qctx_id = (u64) * quicly_get_data (stream->conn);
- qctx = quic_ctx_get (qctx_id);
+ qctx = quic_get_conn_ctx (stream->conn);
stream_session = session_alloc (qctx->c_thread_index);
- QUIC_DBG (1, "Created stream_session, id %u ctx %u",
- stream_session->session_index, sctx_id);
-
- sctx = quic_ctx_get (sctx_id);
+ QUIC_DBG (2, "Allocated stream_session, id %u, thread %u ctx %u",
+ stream_session->session_index, stream_session->thread_index,
+ sctx_id);
+ sctx = quic_ctx_get (sctx_id, qctx->c_thread_index);
sctx->c_quic_ctx_id.parent_app_wrk_id =
qctx->c_quic_ctx_id.parent_app_wrk_id;
sctx->c_quic_ctx_id.parent_app_id = qctx->c_quic_ctx_id.parent_app_id;
@@ -404,10 +460,10 @@ quic_accept_stream (void *s)
sctx->c_quic_ctx_id.is_stream = 1;
sctx->c_s_index = stream_session->session_index;
sctx->c_quic_ctx_id.stream = stream;
- sctx->c_quic_ctx_id.stream_session_handle = session_handle (stream_session);
stream_data = (quic_stream_data_t *) stream->data;
stream_data->ctx_id = sctx_id;
+ stream_data->thread_index = sctx->c_thread_index;
sctx->c_s_index = stream_session->session_index;
stream_session->session_state = SESSION_STATE_CREATED;
@@ -442,15 +498,10 @@ quic_accept_stream (void *s)
}
static int
-quic_on_stream_open (quicly_stream_open_cb * self, quicly_stream_t * stream)
+quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
{
QUIC_DBG (2, "on_stream_open called");
- int ret;
- if ((ret =
- quicly_streambuf_create (stream, sizeof (quic_stream_data_t))) != 0)
- {
- return ret;
- }
+ stream->data = malloc (sizeof (quic_stream_data_t));
stream->callbacks = &quic_stream_callbacks;
/* Notify accept on parent qsession, but only if this is not a locally
* initiated stream */
@@ -461,20 +512,19 @@ quic_on_stream_open (quicly_stream_open_cb * self, quicly_stream_t * stream)
return 0;
}
-static quicly_stream_open_cb on_stream_open = { &quic_on_stream_open };
+static quicly_stream_open_t on_stream_open = { &quic_on_stream_open };
static void
-quic_on_conn_close (quicly_closed_by_peer_cb * self, quicly_conn_t * conn,
+quic_on_conn_close (quicly_closed_by_peer_t * self, quicly_conn_t * conn,
int code, uint64_t frame_type,
const char *reason, size_t reason_len)
{
QUIC_DBG (2, "connection closed, reason: %.*s", reason, reason_len);
- u32 ctx_index = (u64) * quicly_get_data (conn);
- quic_ctx_t *ctx = quic_ctx_get (ctx_index);
+ quic_ctx_t *ctx = quic_get_conn_ctx (conn);
session_transport_closing_notify (&ctx->connection);
}
-static quicly_closed_by_peer_cb on_closed_by_peer = { &quic_on_conn_close };
+static quicly_closed_by_peer_t on_closed_by_peer = { &quic_on_conn_close };
/*****************************************************************************
@@ -763,7 +813,7 @@ quic_make_connection_key (clib_bihash_kv_16_8_t * kv,
}
static void
-quic_connection_closed (u32 ctx_index)
+quic_connection_closed (u32 ctx_index, u32 thread_index)
{
/* TODO : free fifos */
QUIC_DBG (2, "QUIC connection closed");
@@ -772,25 +822,26 @@ quic_connection_closed (u32 ctx_index)
quicly_conn_t *conn;
quic_ctx_t *ctx;
- ctx = quic_ctx_get (ctx_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
ASSERT (!ctx->c_quic_ctx_id.is_stream);
/* TODO if connection is not established, just delete the session? */
- /* TODO: close all streams? or is the streams closed cb called by quicly? */
-
/* Stop the timer */
if (ctx->timer_handle != QUIC_TIMER_HANDLE_INVALID)
{
- tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel;
+ tw = &quic_main.wrk_ctx[thread_index].timer_wheel;
tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle);
}
/* Delete the connection from the connection map */
conn = ctx->c_quic_ctx_id.conn;
quic_make_connection_key (&kv, quicly_get_master_id (conn));
+ QUIC_DBG (2, "Deleting conn with id %lu %lu", kv.key[0], kv.key[1]);
clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 0 /* is_add */ );
+ // session_close (session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle));
+ quic_disconnect_transport (ctx);
session_transport_delete_notify (&ctx->connection);
/* Do not try to send anything anymore */
quicly_free (ctx->c_quic_ctx_id.conn);
@@ -801,19 +852,25 @@ quic_connection_closed (u32 ctx_index)
static void
allocate_quicly_ctx (application_t * app, u8 is_client)
{
- QUIC_DBG (2, "Called allocate_quicly_ctx");
struct
{
quicly_context_t _;
char cid_key[17];
} *ctx_data;
quicly_context_t *quicly_ctx;
- char *cid_key;
+ ptls_iovec_t key_vec;
+ QUIC_DBG (2, "Called allocate_quicly_ctx");
+
+ if (app->quicly_ctx)
+ {
+ QUIC_DBG (1, "Trying to reallocate quicly_ctx");
+ return;
+ }
ctx_data = malloc (sizeof (*ctx_data));
quicly_ctx = &ctx_data->_;
app->quicly_ctx = (u64 *) quicly_ctx;
- memcpy (quicly_ctx, &quicly_default_context, sizeof (quicly_context_t));
+ memcpy (quicly_ctx, &quicly_spec_context, sizeof (quicly_context_t));
quicly_ctx->max_packet_size = QUIC_MAX_PACKET_SIZE;
quicly_ctx->tls = &quic_tlsctx;
@@ -824,34 +881,25 @@ allocate_quicly_ctx (application_t * app, u8 is_client)
quicly_amend_ptls_context (quicly_ctx->tls);
quicly_ctx->event_log.mask = 0; /* logs */
- quicly_ctx->event_log.cb = quicly_new_default_event_log_cb (stderr);
+ quicly_ctx->event_log.cb = quicly_new_default_event_logger (stderr);
quicly_ctx->transport_params.max_data = QUIC_INT_MAX;
quicly_ctx->transport_params.max_streams_uni = QUIC_INT_MAX;
quicly_ctx->transport_params.max_streams_bidi = QUIC_INT_MAX;
- quicly_ctx->transport_params.max_stream_data.bidi_local = QUIC_INT_MAX;
- quicly_ctx->transport_params.max_stream_data.bidi_remote = QUIC_INT_MAX;
+ quicly_ctx->transport_params.max_stream_data.bidi_local = (QUIC_FIFO_SIZE - 1); /* max_enq is SIZE - 1 */
+ quicly_ctx->transport_params.max_stream_data.bidi_remote = (QUIC_FIFO_SIZE - 1); /* max_enq is SIZE - 1 */
quicly_ctx->transport_params.max_stream_data.uni = QUIC_INT_MAX;
- if (!is_client)
+ quicly_ctx->tls->random_bytes (ctx_data->cid_key, 16);
+ ctx_data->cid_key[16] = 0;
+ key_vec = ptls_iovec_init (ctx_data->cid_key, strlen (ctx_data->cid_key));
+ quicly_ctx->cid_encryptor =
+ quicly_new_default_cid_encryptor (&ptls_openssl_bfecb,
+ &ptls_openssl_sha256, key_vec);
+ if (!is_client && app->tls_key != NULL && app->tls_cert != NULL)
{
load_bio_private_key (quicly_ctx->tls, (char *) app->tls_key);
load_bio_certificate_chain (quicly_ctx->tls, (char *) app->tls_cert);
- cid_key = ctx_data->cid_key;
- quicly_ctx->tls->random_bytes (cid_key, 16);
- cid_key[16] = 0;
- quicly_ctx->encrypt_cid =
- quicly_new_default_encrypt_cid_cb (&ptls_openssl_bfecb,
- &ptls_openssl_sha256,
- ptls_iovec_init (cid_key,
- strlen
- (cid_key)));
- quicly_ctx->decrypt_cid =
- quicly_new_default_decrypt_cid_cb (&ptls_openssl_bfecb,
- &ptls_openssl_sha256,
- ptls_iovec_init (cid_key,
- strlen
- (cid_key)));
}
}
@@ -867,7 +915,7 @@ quic_get_thread_time (u8 thread_index)
}
static int64_t
-quic_get_time (quicly_now_cb * self)
+quic_get_time (quicly_now_t * self)
{
u8 thread_index = vlib_get_thread_index ();
return quic_get_thread_time (thread_index);
@@ -897,9 +945,9 @@ static void
quic_timer_expired (u32 conn_index)
{
quic_ctx_t *ctx;
- QUIC_DBG (5, "Timer expired for conn %u at %ld", conn_index,
+ QUIC_DBG (4, "Timer expired for conn %u at %ld", conn_index,
quic_get_time (NULL));
- ctx = quic_ctx_get (conn_index);
+ ctx = quic_ctx_get (conn_index, vlib_get_thread_index ());
ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
quic_send_packets (ctx);
}
@@ -934,14 +982,14 @@ quic_update_timer (quic_ctx_t * ctx)
tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel;
- QUIC_DBG (5, "Timer set to %ld (int %ld) for ctx %u", next_timeout,
+ QUIC_DBG (4, "Timer set to %ld (int %ld) for ctx %u", next_timeout,
next_interval, ctx->c_c_index);
if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID)
{
if (next_timeout == INT64_MAX)
{
- QUIC_DBG (5, "timer for ctx %u already stopped", ctx->c_c_index);
+ QUIC_DBG (4, "timer for ctx %u already stopped", ctx->c_c_index);
return;
}
ctx->timer_handle =
@@ -953,7 +1001,7 @@ quic_update_timer (quic_ctx_t * ctx)
{
tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle);
ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
- QUIC_DBG (5, "Stopping timer for ctx %u", ctx->c_c_index);
+ QUIC_DBG (4, "Stopping timer for ctx %u", ctx->c_c_index);
}
else
tw_timer_update_1t_3w_1024sl_ov (tw, ctx->timer_handle,
@@ -1038,9 +1086,10 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
return -1;
}
- sctx_index = quic_ctx_alloc (); /* Allocate before we get pointers */
- sctx = quic_ctx_get (sctx_index);
- qctx = quic_ctx_get (quic_session->connection_index);
+ sctx_index = quic_ctx_alloc (quic_session->thread_index); /* Allocate before we get pointers */
+ sctx = quic_ctx_get (sctx_index, quic_session->thread_index);
+ qctx =
+ quic_ctx_get (quic_session->connection_index, quic_session->thread_index);
if (qctx->c_quic_ctx_id.is_stream)
{
QUIC_DBG (1, "session is a stream");
@@ -1060,7 +1109,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
if (!conn || !quicly_connection_is_ready (conn))
return -1;
- if ((rv = quicly_open_stream (conn, &stream, 0)))
+ if ((rv = quicly_open_stream (conn, &stream, 0 /* uni */ )))
{
QUIC_DBG (2, "Stream open failed with %d", rv);
return -1;
@@ -1070,8 +1119,9 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
QUIC_DBG (2, "Opened stream %d, creating session", stream->stream_id);
stream_session = session_alloc (qctx->c_thread_index);
- QUIC_DBG (1, "Created stream_session, id %u ctx %u",
- stream_session->session_index, sctx_index);
+ QUIC_DBG (2, "Allocated stream_session, id %u, thread %u ctx %u",
+ stream_session->session_index, stream_session->thread_index,
+ sctx_index);
stream_session->flags |= SESSION_F_QUIC_STREAM;
stream_session->app_wrk_index = app_wrk->wrk_index;
stream_session->connection_index = sctx_index;
@@ -1081,7 +1131,6 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
qctx->c_quic_ctx_id.udp_is_ip4);
sctx->c_s_index = stream_session->session_index;
- sctx->c_quic_ctx_id.stream_session_handle = session_handle (stream_session);
if (app_worker_init_connected (app_wrk, stream_session))
{
@@ -1105,6 +1154,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep)
session_handle (stream_session));
stream_data = (quic_stream_data_t *) stream->data;
stream_data->ctx_id = sctx->c_c_index;
+ stream_data->thread_index = sctx->c_thread_index;
return 0;
}
@@ -1119,8 +1169,8 @@ quic_connect_new_connection (session_endpoint_cfg_t * sep)
u32 ctx_index;
int error;
- ctx_index = quic_ctx_alloc ();
- ctx = quic_ctx_get (ctx_index);
+ ctx_index = quic_ctx_alloc (vlib_get_thread_index ());
+ ctx = quic_ctx_get (ctx_index, vlib_get_thread_index ());
ctx->c_quic_ctx_id.parent_app_wrk_id = sep->app_wrk_index;
ctx->c_s_index = QUIC_SESSION_INVALID;
ctx->c_c_index = ctx_index;
@@ -1164,26 +1214,24 @@ quic_disconnect (u32 ctx_index, u32 thread_index)
QUIC_DBG (2, "Called quic_disconnect");
quic_ctx_t *ctx;
- ctx = quic_ctx_get (ctx_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
if (ctx->c_quic_ctx_id.is_stream)
{
- QUIC_DBG (1, "Closing stream %x", ctx_index);
+ QUIC_DBG (2, "Closing stream %x, session %x", ctx_index,
+ ctx->c_s_index);
quicly_stream_t *stream = ctx->c_quic_ctx_id.stream;
quicly_reset_stream (stream, 0x30000);
}
else
{
- QUIC_DBG (1, "Closing connection %x", ctx_index);
+ QUIC_DBG (2, "Closing connection %x, session %x", ctx_index,
+ ctx->c_s_index);
quicly_conn_t *conn = ctx->c_quic_ctx_id.conn;
/* Start connection closing. Keep sending packets until quicly_send
returns QUICLY_ERROR_FREE_CONNECTION */
quicly_close (conn, 0, "");
/* This also causes all streams to be closed (and the cb called) */
- if (quic_send_packets (ctx))
- {
- QUIC_DBG (2, "closing connection in disconnect");
- quic_connection_closed (ctx->c_c_index);
- }
+ quic_send_packets (ctx);
}
}
@@ -1194,7 +1242,7 @@ quic_start_listen (u32 quic_listen_session_index, transport_endpoint_t * tep)
quic_main_t *qm = &quic_main;
session_handle_t udp_handle;
session_endpoint_cfg_t *sep;
- session_t *udp_listen_session, *quic_listen_session;
+ session_t *udp_listen_session;
app_worker_t *app_wrk;
application_t *app;
quic_ctx_t *lctx;
@@ -1219,24 +1267,21 @@ quic_start_listen (u32 quic_listen_session_index, transport_endpoint_t * tep)
if (vnet_listen (args))
return -1;
- lctx_index = quic_ctx_alloc (); /* listener */
+ lctx_index = quic_ctx_alloc (0); /* listener */
udp_handle = args->handle;
app_listener = app_listener_get_w_handle (udp_handle);
udp_listen_session = app_listener_get_session (app_listener);
udp_listen_session->opaque = lctx_index;
- quic_listen_session = listen_session_get (quic_listen_session_index);
-
- lctx = quic_ctx_get (lctx_index); /* listener */
+ lctx = quic_ctx_get (lctx_index, 0); /* listener */
lctx->is_listener = 1;
lctx->c_quic_ctx_id.parent_app_wrk_id = sep->app_wrk_index;
lctx->c_quic_ctx_id.parent_app_id = app_wrk->app_index;
lctx->c_quic_ctx_id.udp_session_handle = udp_handle;
- lctx->c_quic_ctx_id.quic_session_handle =
- listen_session_get_handle (quic_listen_session);
lctx->c_quic_ctx_id.udp_is_ip4 = sep->is_ip4;
+ lctx->c_s_index = quic_listen_session_index;
- QUIC_DBG (1, "Started listening %d", lctx_index);
+ QUIC_DBG (2, "Started listening %d", lctx_index);
return lctx_index;
}
@@ -1246,7 +1291,7 @@ quic_stop_listen (u32 lctx_index)
QUIC_DBG (2, "Called quic_stop_listen");
quic_ctx_t *lctx;
- lctx = quic_ctx_get (lctx_index); /* listener */
+ lctx = quic_ctx_get (lctx_index, 0); /* listener */
vnet_unlisten_args_t a = {
.handle = lctx->c_quic_ctx_id.udp_session_handle,
.app_index = quic_main.app_index,
@@ -1266,7 +1311,7 @@ quic_connection_get (u32 ctx_index, u32 thread_index)
{
QUIC_DBG (2, "Called quic_connection_get");
quic_ctx_t *ctx;
- ctx = quic_ctx_get_w_thread (ctx_index, thread_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
return &ctx->connection;
}
@@ -1275,7 +1320,7 @@ quic_listener_get (u32 listener_index)
{
QUIC_DBG (2, "Called quic_listener_get");
quic_ctx_t *ctx;
- ctx = quic_ctx_get (listener_index);
+ ctx = quic_ctx_get (listener_index, 0);
return &ctx->connection;
}
@@ -1290,7 +1335,7 @@ static u8 *
format_quic_half_open (u8 * s, va_list * args)
{
u32 qc_index = va_arg (*args, u32);
- quic_ctx_t *ctx = quic_ctx_get (qc_index);
+ quic_ctx_t *ctx = quic_ctx_get (qc_index, vlib_get_thread_index ());
s = format (s, "[QUIC] half-open app %u", ctx->c_quic_ctx_id.parent_app_id);
return s;
}
@@ -1339,6 +1384,7 @@ quic_notify_app_connected (quic_ctx_t * ctx)
session_t *quic_session;
app_worker_t *app_wrk;
u32 ctx_id = ctx->c_c_index;
+ u32 thread_index = ctx->c_thread_index;
quic_main_t *qm = &quic_main;
app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_id);
@@ -1348,9 +1394,10 @@ quic_notify_app_connected (quic_ctx_t * ctx)
return -1;
}
- quic_session = session_alloc (ctx->c_thread_index);
+ quic_session = session_alloc (thread_index);
- QUIC_DBG (1, "Created quic_session, id %u", quic_session->session_index);
+ QUIC_DBG (2, "Allocated quic_session, id %u, thread %u",
+ quic_session->session_index, quic_session->thread_index);
ctx->c_s_index = quic_session->session_index;
quic_session->app_wrk_index = ctx->c_quic_ctx_id.parent_app_wrk_id;
quic_session->connection_index = ctx->c_c_index;
@@ -1362,7 +1409,7 @@ quic_notify_app_connected (quic_ctx_t * ctx)
if (app_worker_init_connected (app_wrk, quic_session))
{
QUIC_DBG (1, "failed to app_worker_init_connected");
- quic_disconnect (ctx->c_c_index, vlib_get_thread_index ());
+ quic_disconnect (ctx_id, thread_index);
return app_worker_connect_notify (app_wrk, NULL, ctx->client_opaque);
}
@@ -1370,13 +1417,12 @@ quic_notify_app_connected (quic_ctx_t * ctx)
if (app_worker_connect_notify (app_wrk, quic_session, ctx->client_opaque))
{
QUIC_DBG (1, "failed to notify app");
- quic_disconnect (ctx->c_c_index, vlib_get_thread_index ());
+ quic_disconnect (ctx_id, thread_index);
return -1;
}
/* If the app opens a stream in its callback it may invalidate ctx */
- ctx = quic_ctx_get (ctx_id);
- ctx->c_quic_ctx_id.quic_session_handle = session_handle (quic_session);
+ ctx = quic_ctx_get (ctx_id, thread_index);
quic_session->session_state = SESSION_STATE_LISTENING;
session_lookup_add_connection (&ctx->connection,
session_handle (quic_session));
@@ -1401,9 +1447,10 @@ quic_session_connected_callback (u32 quic_app_index, u32 ctx_index,
quicly_conn_t *conn;
application_t *app;
quic_ctx_t *ctx;
+ u32 thread_index = vlib_get_thread_index ();
int ret;
- ctx = quic_ctx_get (ctx_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
if (is_fail)
{
u32 api_context;
@@ -1427,11 +1474,11 @@ quic_session_connected_callback (u32 quic_app_index, u32 ctx_index,
}
app = application_get (app_wrk->app_index);
- ctx->c_thread_index = vlib_get_thread_index ();
+ ctx->c_thread_index = thread_index;
ctx->c_c_index = ctx_index;
- QUIC_DBG (1, "Quic connect returned %u. New ctx [%u]%x",
- is_fail, vlib_get_thread_index (), (ctx) ? ctx_index : ~0);
+ QUIC_DBG (2, "Quic connect returned %u. New ctx [%u]%x",
+ is_fail, thread_index, (ctx) ? ctx_index : ~0);
ctx->c_quic_ctx_id.udp_session_handle = session_handle (udp_session);
udp_session->opaque = ctx->c_quic_ctx_id.parent_app_id;
@@ -1449,20 +1496,20 @@ quic_session_connected_callback (u32 quic_app_index, u32 ctx_index,
&quic_main.next_cid, &quic_main.hs_properties, NULL);
++quic_main.next_cid.master_id;
/* Save context handle in quicly connection */
- *quicly_get_data (ctx->c_quic_ctx_id.conn) = (void *) (u64) ctx_index;
+ quic_store_conn_ctx (ctx->c_quic_ctx_id.conn, ctx);
assert (ret == 0);
/* Register connection in connections map */
conn = ctx->c_quic_ctx_id.conn;
quic_make_connection_key (&kv, quicly_get_master_id (conn));
- kv.value = ((u64) vlib_get_thread_index ()) << 32 | (u64) ctx_index;
- QUIC_DBG (1, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
+ kv.value = ((u64) thread_index) << 32 | (u64) ctx_index;
+ QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
quic_send_packets (ctx);
/* UDP stack quirk? preemptively transfer connection if that happens */
- if (udp_session->thread_index != vlib_get_thread_index ())
+ if (udp_session->thread_index != thread_index)
quic_transfer_connection (ctx_index, udp_session->thread_index);
return ret;
@@ -1477,10 +1524,10 @@ quic_receive_connection (void *arg)
quicly_conn_t *conn;
temp_ctx = arg;
- new_ctx_id = quic_ctx_alloc ();
- new_ctx = quic_ctx_get (new_ctx_id);
+ new_ctx_id = quic_ctx_alloc (thread_index);
+ new_ctx = quic_ctx_get (new_ctx_id, thread_index);
- QUIC_DBG (3, "Received conn %u (now %u)", temp_ctx->c_thread_index,
+ QUIC_DBG (2, "Received conn %u (now %u)", temp_ctx->c_thread_index,
new_ctx_id);
@@ -1491,9 +1538,10 @@ quic_receive_connection (void *arg)
new_ctx->c_c_index = new_ctx_id;
conn = new_ctx->c_quic_ctx_id.conn;
+ quic_store_conn_ctx (conn, new_ctx);
quic_make_connection_key (&kv, quicly_get_master_id (conn));
kv.value = ((u64) thread_index) << 32 | (u64) new_ctx_id;
- QUIC_DBG (1, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
+ QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
new_ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
quic_update_timer (new_ctx);
@@ -1508,12 +1556,13 @@ quic_transfer_connection (u32 ctx_index, u32 dest_thread)
quic_ctx_t *ctx, *temp_ctx;
clib_bihash_kv_16_8_t kv;
quicly_conn_t *conn;
+ u32 thread_index = vlib_get_thread_index ();
- QUIC_DBG (3, "Transferring conn %u to thread %u", ctx_index, dest_thread);
+ QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread);
temp_ctx = malloc (sizeof (quic_ctx_t));
ASSERT (temp_ctx);
- ctx = quic_ctx_get (ctx_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
memcpy (temp_ctx, ctx, sizeof (quic_ctx_t));
@@ -1523,7 +1572,7 @@ quic_transfer_connection (u32 ctx_index, u32 dest_thread)
clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 0 /* is_add */ );
if (ctx->timer_handle != QUIC_TIMER_HANDLE_INVALID)
{
- tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel;
+ tw = &quic_main.wrk_ctx[thread_index].timer_wheel;
tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle);
}
quic_ctx_free (ctx);
@@ -1553,7 +1602,7 @@ static void
quic_move_connection_to_thread (u32 ctx_index, u32 owner_thread,
u32 to_thread)
{
- QUIC_DBG (3, "Requesting transfer of conn %u from thread %u", ctx_index,
+ QUIC_DBG (2, "Requesting transfer of conn %u from thread %u", ctx_index,
owner_thread);
u64 arg = ((u64) ctx_index) << 32 | to_thread;
session_send_rpc_evt_to_thread (owner_thread, quic_transfer_connection_rpc,
@@ -1576,23 +1625,24 @@ int
quic_session_accepted_callback (session_t * udp_session)
{
/* New UDP connection, try to accept it */
- QUIC_DBG (1, "UDP session accepted");
+ QUIC_DBG (2, "UDP session accepted");
u32 ctx_index;
u32 *pool_index;
quic_ctx_t *ctx, *lctx;
session_t *udp_listen_session;
+ u32 thread_index = vlib_get_thread_index ();
udp_listen_session = listen_session_get (udp_session->listener_index);
- ctx_index = quic_ctx_alloc ();
- ctx = quic_ctx_get (ctx_index);
+ ctx_index = quic_ctx_alloc (thread_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
ctx->c_thread_index = udp_session->thread_index;
ctx->c_c_index = ctx_index;
ctx->c_s_index = QUIC_SESSION_INVALID;
ctx->c_quic_ctx_id.udp_session_handle = session_handle (udp_session);
ctx->c_quic_ctx_id.listener_ctx_id = udp_listen_session->opaque;
- lctx = quic_ctx_get_w_thread (udp_listen_session->opaque,
- udp_listen_session->thread_index);
+ lctx = quic_ctx_get (udp_listen_session->opaque,
+ udp_listen_session->thread_index);
ctx->c_quic_ctx_id.udp_is_ip4 = lctx->c_quic_ctx_id.udp_is_ip4;
ctx->c_quic_ctx_id.parent_app_id = lctx->c_quic_ctx_id.parent_app_id;
ctx->c_quic_ctx_id.parent_app_wrk_id =
@@ -1633,52 +1683,31 @@ static int
quic_custom_tx_callback (void *s)
{
session_t *stream_session = (session_t *) s;
- quic_ctx_t *ctx;
- svm_fifo_t *f;
quicly_stream_t *stream;
- u32 deq_max, rv;
- u8 *data;
+ quic_ctx_t *ctx;
+ int rv;
+ svm_fifo_unset_event (stream_session->tx_fifo);
if (PREDICT_FALSE
(stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING))
return 0;
- ctx = quic_ctx_get (stream_session->connection_index);
+ ctx =
+ quic_ctx_get (stream_session->connection_index,
+ stream_session->thread_index);
if (PREDICT_FALSE (!ctx->c_quic_ctx_id.is_stream))
{
goto tx_end; /* Most probably a reschedule */
}
- if (!ctx->c_quic_ctx_id.stream->sendstate.is_open)
+ stream = ctx->c_quic_ctx_id.stream;
+ if (!quicly_sendstate_is_open (&stream->sendstate))
{
- QUIC_DBG (3, "Warning: tried to send on closed stream");
+ QUIC_DBG (1, "Warning: tried to send on closed stream");
return -1;
}
- f = stream_session->tx_fifo;
- deq_max = svm_fifo_max_dequeue (f);
- if (!deq_max)
- goto tx_end;
-
- data = malloc (deq_max);
- rv = svm_fifo_peek (f, 0, deq_max, data);
- if (rv != deq_max)
- {
- QUIC_DBG (2, "Not enough data dequeued in TX");
- return 1;
- }
- stream = ctx->c_quic_ctx_id.stream;
-
- rv = quicly_streambuf_egress_write (stream, data, deq_max);
- free (data);
- if (rv)
+ if ((rv = quicly_stream_sync_sendbuf (stream, 1)) != 0)
return rv;
- rv = svm_fifo_dequeue_drop (f, deq_max);
- if (rv != deq_max)
- {
- QUIC_DBG (2, "Not enough data dropped in TX");
- return 1;
- }
- QUIC_DBG (2, "Sent %u bytes", deq_max);
tx_end:
quic_send_packets (ctx);
@@ -1704,26 +1733,27 @@ quic_find_packet_ctx (u32 * ctx_thread, u32 * ctx_index,
h = &quic_main.connection_hash;
quic_make_connection_key (&kv, &packet->cid.dest.plaintext);
+ QUIC_DBG (3, "Searching conn with id %lu %lu", kv.key[0], kv.key[1]);
if (clib_bihash_search_16_8 (h, &kv, &kv) == 0)
{
u32 index = kv.value & UINT32_MAX;
- u32 thread_id = kv.value >> 32;
+ u8 thread_id = kv.value >> 32;
/* Check if this connection belongs to this thread, otherwise
* ask for it to be moved */
if (thread_id != caller_thread_index)
{
- QUIC_DBG (3, "Connection is on wrong thread");
+ QUIC_DBG (2, "Connection is on wrong thread");
/* Cannot make full check with quicly_is_destination... */
*ctx_index = index;
*ctx_thread = thread_id;
return -1;
}
- ctx_ = quic_ctx_get (index);
+ ctx_ = quic_ctx_get (index, vlib_get_thread_index ());
conn_ = ctx_->c_quic_ctx_id.conn;
if (conn_ && quicly_is_destination (conn_, sa, salen, packet))
{
- QUIC_DBG (4, "Connection found");
+ QUIC_DBG (3, "Connection found");
*ctx_index = index;
*ctx_thread = thread_id;
return 0;
@@ -1739,14 +1769,16 @@ quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn,
{
int rv;
u32 ctx_id = ctx->c_c_index;
+ u32 thread_index = ctx->c_thread_index;
+ /* TODO : QUICLY_ERROR_PACKET_IGNORED sould be handled */
rv = quicly_receive (conn, &packet);
- if (rv) /* TOOD : QUICLY_ERROR_PACKET_IGNORED sould be handled */
+ if (rv)
{
- QUIC_DBG (1, "Quicly receive ignored packet code : %u", rv);
+ QUIC_DBG (2, "Quicly receive ignored packet code : %u", rv);
return 0;
}
/* ctx pointer may change if a new stream is opened */
- ctx = quic_ctx_get (ctx_id);
+ ctx = quic_ctx_get (ctx_id, thread_index);
/* Conn may be set to null if the connection is terminated */
if (ctx->c_quic_ctx_id.conn && ctx->conn_state == QUIC_CONN_STATE_HANDSHAKE)
{
@@ -1756,7 +1788,7 @@ quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn,
if (quicly_is_client (conn))
{
quic_notify_app_connected (ctx);
- ctx = quic_ctx_get (ctx_id);
+ ctx = quic_ctx_get (ctx_id, thread_index);
}
}
}
@@ -1766,27 +1798,27 @@ quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn,
static int
quic_create_quic_session (quic_ctx_t * ctx)
{
- session_t *quic_session, *quic_listen_session;
+ session_t *quic_session;
app_worker_t *app_wrk;
quic_ctx_t *lctx;
+ quic_main_t *qm = &quic_main;
int rv;
quic_session = session_alloc (ctx->c_thread_index);
- QUIC_DBG (1, "Created quic session, id %u ctx %u",
- quic_session->session_index, ctx->c_c_index);
+ QUIC_DBG (2, "Allocated quic_session, id %u, thread %u ctx %u",
+ quic_session->session_index, quic_session->thread_index,
+ ctx->c_c_index);
quic_session->session_state = SESSION_STATE_LISTENING;
ctx->c_s_index = quic_session->session_index;
- lctx = quic_ctx_get (ctx->c_quic_ctx_id.listener_ctx_id);
+ lctx = quic_ctx_get (ctx->c_quic_ctx_id.listener_ctx_id, 0);
- quic_listen_session =
- listen_session_get_from_handle (lctx->c_quic_ctx_id.quic_session_handle);
quic_session->app_wrk_index = lctx->c_quic_ctx_id.parent_app_wrk_id;
quic_session->connection_index = ctx->c_c_index;
quic_session->session_type =
session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC,
ctx->c_quic_ctx_id.udp_is_ip4);
- quic_session->listener_index = quic_listen_session->session_index;
+ quic_session->listener_index = qm->fake_app_listener_index;
quic_session->app_index = quic_main.app_index;
/* TODO: don't alloc fifos when we don't transfer data on this session
@@ -1797,7 +1829,6 @@ quic_create_quic_session (quic_ctx_t * ctx)
session_free (quic_session);
return rv;
}
- ctx->c_quic_ctx_id.quic_session_handle = session_handle (quic_session);
session_lookup_add_connection (&ctx->connection,
session_handle (quic_session));
app_wrk = app_worker_get (quic_session->app_wrk_index);
@@ -1819,6 +1850,7 @@ quic_create_connection (quicly_context_t * quicly_ctx,
clib_bihash_kv_16_8_t kv;
quic_ctx_t *ctx;
quicly_conn_t *conn;
+ u32 thread_index = vlib_get_thread_index ();
int rv;
/* new connection, accept and create context if packet is valid
@@ -1829,16 +1861,16 @@ quic_create_connection (quicly_context_t * quicly_ctx,
{
/* Invalid packet, pass */
assert (conn == NULL);
- QUIC_DBG (2, "Accept failed with %d", rv);
+ QUIC_DBG (1, "Accept failed with %d", rv);
/* TODO: cleanup created quic ctx and UDP session */
return 0;
}
assert (conn != NULL);
++quic_main.next_cid.master_id;
- ctx = quic_ctx_get (ctx_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
/* Save ctx handle in quicly connection */
- *quicly_get_data (conn) = (void *) (u64) ctx_index;
+ quic_store_conn_ctx (conn, ctx);
ctx->c_quic_ctx_id.conn = conn;
ctx->conn_state = QUIC_CONN_STATE_HANDSHAKE;
@@ -1846,9 +1878,9 @@ quic_create_connection (quicly_context_t * quicly_ctx,
/* Register connection in connections map */
quic_make_connection_key (&kv, quicly_get_master_id (conn));
- kv.value = ((u64) vlib_get_thread_index ()) << 32 | (u64) ctx_index;
+ kv.value = ((u64) thread_index) << 32 | (u64) ctx_index;
clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
- QUIC_DBG (1, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
+ QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
return quic_send_packets (ctx);
}
@@ -1871,6 +1903,8 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle,
{
dgram = quicly_send_stateless_reset (quicly_ctx, sa, salen,
&packet.cid.dest.plaintext);
+ if (dgram == NULL)
+ return 1;
udp_session = session_get_from_handle (udp_session_handle);
return quic_send_datagram (udp_session, dgram); /* TODO : set event on fifo */
}
@@ -1890,7 +1924,8 @@ quic_app_rx_callback (session_t * udp_session)
struct sockaddr_in6 sa6;
struct sockaddr *sa = (struct sockaddr *) &sa6;
socklen_t salen;
- u32 max_deq, len, full_len, ctx_index, ctx_thread = UINT32_MAX, ret;
+ u32 max_deq, len, full_len, ctx_index = UINT32_MAX, ctx_thread =
+ UINT32_MAX, ret;
u8 *data;
int err;
u32 *opening_ctx_pool, *ctx_index_ptr;
@@ -1919,19 +1954,19 @@ quic_app_rx_callback (session_t * udp_session)
ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph);
if (ret != SESSION_CONN_HDR_LEN)
{
- QUIC_DBG (2, "Not enough data for header in RX");
+ QUIC_DBG (1, "Not enough data for header in RX");
return 1;
}
if (ph.data_length < ph.data_offset)
{
- QUIC_DBG (2, "Not enough data vs offset in RX");
+ QUIC_DBG (1, "Not enough data vs offset in RX");
return 1;
}
len = ph.data_length - ph.data_offset;
full_len = ph.data_length + ph.data_offset + SESSION_CONN_HDR_LEN;
if (full_len > max_deq)
{
- QUIC_DBG (2, "Not enough data in fifo RX");
+ QUIC_DBG (1, "Not enough data in fifo RX");
return 1;
}
@@ -1943,9 +1978,9 @@ quic_app_rx_callback (session_t * udp_session)
ph.data_length, data);
if (ret != ph.data_length)
{
- QUIC_DBG (2, "Not enough data peeked in RX");
- return 1;
+ QUIC_DBG (1, "Not enough data peeked in RX");
free (data);
+ return 1;
}
plen =
@@ -1965,7 +2000,7 @@ quic_app_rx_callback (session_t * udp_session)
&packet, thread_index);
if (err == 0)
{
- ctx = quic_ctx_get_w_thread (ctx_index, thread_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
quic_receive (ctx, ctx->c_quic_ctx_id.conn, packet);
}
else if (ctx_thread != UINT32_MAX)
@@ -1984,7 +2019,7 @@ quic_app_rx_callback (session_t * udp_session)
/* *INDENT-OFF* */
pool_foreach (ctx_index_ptr, opening_ctx_pool,
({
- ctx = quic_ctx_get_w_thread (*ctx_index_ptr, thread_index);
+ ctx = quic_ctx_get (*ctx_index_ptr, thread_index);
if (ctx->c_quic_ctx_id.udp_session_handle == udp_session_handle)
{
/* Right ctx found, create conn & remove from pool */
@@ -2019,7 +2054,6 @@ quic_common_get_transport_endpoint (quic_ctx_t * ctx,
transport_endpoint_t * tep, u8 is_lcl)
{
session_t *udp_session;
- QUIC_DBG (2, "Called quic_get_transport_endpoint");
if (ctx->c_quic_ctx_id.is_stream)
{
tep->is_ip4 = 255; /* well this is ugly */
@@ -2037,7 +2071,16 @@ quic_get_transport_listener_endpoint (u32 listener_index,
transport_endpoint_t * tep, u8 is_lcl)
{
quic_ctx_t *ctx;
- ctx = quic_ctx_get (listener_index);
+ app_listener_t *app_listener;
+ session_t *udp_listen_session;
+ ctx = quic_ctx_get (listener_index, vlib_get_thread_index ());
+ if (ctx->is_listener)
+ {
+ app_listener =
+ app_listener_get_w_handle (ctx->c_quic_ctx_id.udp_session_handle);
+ udp_listen_session = app_listener_get_session (app_listener);
+ return session_get_endpoint (udp_listen_session, tep, is_lcl);
+ }
quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}
@@ -2046,7 +2089,7 @@ quic_get_transport_endpoint (u32 ctx_index, u32 thread_index,
transport_endpoint_t * tep, u8 is_lcl)
{
quic_ctx_t *ctx;
- ctx = quic_ctx_get_w_thread (ctx_index, thread_index);
+ ctx = quic_ctx_get (ctx_index, thread_index);
quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}
@@ -2087,14 +2130,13 @@ static const transport_proto_vft_t quic_proto = {
static clib_error_t *
quic_init (vlib_main_t * vm)
{
- QUIC_DBG (2, "Called quic_init");
u32 add_segment_size = (4096ULL << 20) - 1, segment_size = 512 << 20;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
vnet_app_attach_args_t _a, *a = &_a;
u64 options[APP_OPTIONS_N_OPTIONS];
quic_main_t *qm = &quic_main;
- u32 fifo_size = 64 << 10;
+ u32 fifo_size = QUIC_FIFO_SIZE;
u32 num_threads, i;
application_t *app;
diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h
index 512ffd95eeb..759dc9f0a4f 100644
--- a/src/plugins/quic/quic.h
+++ b/src/plugins/quic/quic.h
@@ -23,9 +23,15 @@
#include <vppinfra/bihash_16_8.h>
#include <quicly.h>
-#include <quicly/streambuf.h>
-#define QUIC_DEBUG 0
+/* QUIC log levels
+ * 1 - errors
+ * 2 - connection/stream events
+ * 3 - packet events
+ * 4 - timer events
+ **/
+
+#define QUIC_DEBUG 2
#define QUIC_DEBUG_LEVEL_CLIENT 0
#define QUIC_DEBUG_LEVEL_SERVER 0
@@ -53,14 +59,12 @@ typedef CLIB_PACKED (struct quic_ctx_id_
u32 parent_app_id;
union {
CLIB_PACKED (struct {
- session_handle_t quic_session_handle; /* TODO: remove */
session_handle_t udp_session_handle;
quicly_conn_t *conn;
u32 listener_ctx_id;
u8 udp_is_ip4;
});
CLIB_PACKED (struct {
- session_handle_t stream_session_handle; /* TODO: remove */
quicly_stream_t *stream;
u32 quic_connection_ctx_id;
});
@@ -89,8 +93,8 @@ typedef struct quic_ctx_
typedef struct quic_stream_data_
{
- quicly_streambuf_t streambuf;
u32 ctx_id;
+ u32 thread_index;
} quic_stream_data_t;
typedef struct quic_worker_ctx_
diff --git a/src/tests/vnet/session/quic_echo.c b/src/tests/vnet/session/quic_echo.c
index af6666193f6..a0bd9690246 100644
--- a/src/tests/vnet/session/quic_echo.c
+++ b/src/tests/vnet/session/quic_echo.c
@@ -38,7 +38,7 @@
#include <vpp/api/vpe_all_api_h.h>
#undef vl_printfun
-#define QUIC_ECHO_DBG 0
+#define QUIC_ECHO_DBG 1
#define DBG(_fmt, _args...) \
if (QUIC_ECHO_DBG) \
clib_warning (_fmt, ##_args)
@@ -93,8 +93,8 @@ typedef struct
uword *session_index_by_vpp_handles;
/* Hash table for shared segment_names */
- uword *shared_segment_names;
- clib_spinlock_t segment_names_lock;
+ uword *shared_segment_handles;
+ clib_spinlock_t segment_handles_lock;
/* intermediate rx buffer */
u8 *rx_buf;
@@ -211,18 +211,18 @@ wait_for_segment_allocation (u64 segment_handle)
f64 timeout;
timeout = clib_time_now (&em->clib_time) + TIMEOUT;
uword *segment_present;
- DBG ("ASKING for %lu", segment_handle);
+ DBG ("Waiting for segment %lx...", segment_handle);
while (clib_time_now (&em->clib_time) < timeout)
{
- clib_spinlock_lock (&em->segment_names_lock);
- segment_present = hash_get (em->shared_segment_names, segment_handle);
- clib_spinlock_unlock (&em->segment_names_lock);
+ clib_spinlock_lock (&em->segment_handles_lock);
+ segment_present = hash_get (em->shared_segment_handles, segment_handle);
+ clib_spinlock_unlock (&em->segment_handles_lock);
if (segment_present != 0)
return 0;
if (em->time_to_stop == 1)
return 0;
}
- DBG ("timeout waiting for segment_allocation %lu", segment_handle);
+ DBG ("timeout waiting for segment_allocation %lx", segment_handle);
return -1;
}
@@ -422,10 +422,10 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
-1))
goto failed;
}
- DBG ("SETTING for %lu", segment_handle);
- clib_spinlock_lock (&em->segment_names_lock);
- hash_set (em->shared_segment_names, segment_handle, 1);
- clib_spinlock_unlock (&em->segment_names_lock);
+ clib_spinlock_lock (&em->segment_handles_lock);
+ hash_set (em->shared_segment_handles, segment_handle, 1);
+ clib_spinlock_unlock (&em->segment_handles_lock);
+ DBG ("Mapped new segment %lx", segment_handle);
em->state = STATE_ATTACHED;
return;
@@ -524,6 +524,8 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
echo_main_t *em = &echo_main;
int rv;
int *fds = 0;
+ u64 segment_handle;
+ segment_handle = clib_net_to_host_u64 (mp->segment_handle);
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
{
@@ -534,11 +536,11 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
clib_warning
("svm_fifo_segment_attach ('%s') failed on SSVM_SEGMENT_MEMFD",
mp->segment_name);
- DBG ("SETTING for %lu", mp->segment_name);
- clib_spinlock_lock (&em->segment_names_lock);
- hash_set (em->shared_segment_names, mp->segment_name, 1);
- clib_spinlock_unlock (&em->segment_names_lock);
+ clib_spinlock_lock (&em->segment_handles_lock);
+ hash_set (em->shared_segment_handles, segment_handle, 1);
+ clib_spinlock_unlock (&em->segment_handles_lock);
vec_free (fds);
+ DBG ("Mapped new segment %lx", segment_handle);
return;
}
@@ -553,11 +555,11 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
mp->segment_name);
return;
}
+ clib_spinlock_lock (&em->segment_handles_lock);
+ hash_set (em->shared_segment_handles, mp->segment_name, 1);
+ clib_spinlock_unlock (&em->segment_handles_lock);
clib_warning ("Mapped new segment '%s' size %d", mp->segment_name,
mp->segment_size);
- clib_spinlock_lock (&em->segment_names_lock);
- hash_set (em->shared_segment_names, mp->segment_name, 1);
- clib_spinlock_unlock (&em->segment_names_lock);
}
static void
@@ -703,6 +705,7 @@ client_send_disconnect (echo_main_t * em, echo_session_t * s)
dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
dmp->client_index = em->my_client_index;
dmp->handle = s->vpp_session_handle;
+ DBG ("Sending Session disonnect handle %lu", dmp->handle);
vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp);
}
@@ -766,7 +769,6 @@ session_accepted_handler (session_accepted_msg_t * mp)
/* Allocate local session and set it up */
pool_get (em->sessions, session);
session_index = session - em->sessions;
- DBG ("Setting session_index %lu", session_index);
if (wait_for_segment_allocation (segment_handle))
{
@@ -786,6 +788,7 @@ session_accepted_handler (session_accepted_msg_t * mp)
svm_msg_q_t *);
/* Add it to lookup table */
+ DBG ("Accepted session handle %lx, idx %lu", mp->handle, session_index);
hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
/*
@@ -873,6 +876,7 @@ session_connected_handler (session_connected_msg_t * mp)
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
svm_msg_q_t *);
+ DBG ("Connected session handle %lx, idx %lu", mp->handle, session_index);
hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
if (mp->context == QUIC_SESSION_TYPE_QUIC)
@@ -908,8 +912,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp)
echo_session_t *session = 0;
uword *p;
int rv = 0;
- DBG ("Got a SESSION_CTRL_EVT_DISCONNECTED for session %lu", mp->handle);
-
+ DBG ("Disonnected session handle %lx", mp->handle);
p = hash_get (em->session_index_by_vpp_handles, mp->handle);
if (!p)
{
@@ -943,6 +946,7 @@ session_reset_handler (session_reset_msg_t * mp)
uword *p;
int rv = 0;
+ DBG ("Reset session handle %lx", mp->handle);
p = hash_get (em->session_index_by_vpp_handles, mp->handle);
if (p)
@@ -1379,7 +1383,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
{
echo_main_t *em = &echo_main;
uword *p;
- DBG ("Got disonnected reply for session %lu", mp->handle);
+ DBG ("Got disonnected reply for session handle %lu", mp->handle);
if (mp->retval)
{
@@ -1460,8 +1464,8 @@ main (int argc, char **argv)
clib_memset (em, 0, sizeof (*em));
em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
- em->shared_segment_names = hash_create (0, sizeof (uword));
- clib_spinlock_init (&em->segment_names_lock);
+ em->shared_segment_handles = hash_create (0, sizeof (uword));
+ clib_spinlock_init (&em->segment_handles_lock);
em->my_pid = getpid ();
em->socket_name = 0;
em->use_sock_api = 1;
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index 2bd3ceb2785..2283b0f6f94 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -78,7 +78,7 @@ unformat_vnet_uri (unformat_input_t * input, va_list * args)
sep->is_ip4 = 0;
return 1;
}
- else if (unformat (input, "%U://session/%u", unformat_transport_proto,
+ else if (unformat (input, "%U://session/%lu", unformat_transport_proto,
&transport_proto, &sep->transport_opts))
{
sep->transport_proto = transport_proto;