summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>2019-07-08 18:18:27 +0200
committerDave Wallace <dwallacelf@gmail.com>2019-07-09 17:07:22 +0000
commitace51525552375b4716c01a6f9ba5261fd64f940 (patch)
tree6c4b7b1e240a819f8989ecd47c0e71ec43463d3a
parent50f4a417147ae4aae6ad8cddb0c7709420c712f0 (diff)
quic: echo thread can handle multiple sessions
Type: feature Change-Id: Ibb60d5b46aafe109a81a8604712a917f6e246eaf Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
-rw-r--r--src/plugins/hs_apps/sapi/quic_echo.c395
-rw-r--r--src/plugins/quic/quic.c12
2 files changed, 278 insertions, 129 deletions
diff --git a/src/plugins/hs_apps/sapi/quic_echo.c b/src/plugins/hs_apps/sapi/quic_echo.c
index 3cbb663646a..3be513afa89 100644
--- a/src/plugins/hs_apps/sapi/quic_echo.c
+++ b/src/plugins/hs_apps/sapi/quic_echo.c
@@ -58,13 +58,6 @@
clib_warning ("ECHO-ERROR: "_fmt, ##_args); \
}
-typedef enum echo_session_flag_
-{
- SESSION_FLAG_NOFLAG = 0,
- SESSION_FLAG_SHOULD_CLOSE = 1,
- SESSION_FLAG_SHOULD_FREE = 2,
-} echo_session_flag_t;
-
typedef struct
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -78,8 +71,8 @@ typedef struct
volatile u64 bytes_to_receive;
f64 start;
u32 listener_index; /* listener index in echo session pool */
+ u32 idle_cycles; /* consecutive enq/deq with no data */
volatile u64 accepted_session_count; /* sessions we accepted */
- volatile echo_session_flag_t flags;
} echo_session_t;
typedef enum
@@ -90,6 +83,14 @@ typedef enum
ECHO_INVALID_DATA_SOURCE
} data_source_t;
+enum echo_close_f_t
+{
+ ECHO_CLOSE_F_INVALID = 0,
+ ECHO_CLOSE_F_PASSIVE, /* wait for close msg */
+ ECHO_CLOSE_F_ACTIVE, /* send close msg */
+ ECHO_CLOSE_F_NONE, /* don't bother sending close msg */
+};
+
enum quic_session_type_t
{
QUIC_SESSION_TYPE_QUIC,
@@ -97,6 +98,14 @@ enum quic_session_type_t
QUIC_SESSION_TYPE_LISTEN,
};
+enum quic_session_state_t
+{
+ QUIC_SESSION_STATE_INITIAL,
+ QUIC_SESSION_STATE_AWAIT_CLOSING, /* Data transfer is done, wait for close evt */
+ QUIC_SESSION_STATE_AWAIT_DATA, /* Peer closed, wait for outstanding data */
+ QUIC_SESSION_STATE_CLOSING, /* told vpp to close */
+ QUIC_SESSION_STATE_CLOSED, /* closed in vpp */
+};
typedef enum
{
@@ -194,7 +203,8 @@ typedef struct
u32 rx_buf_size;
u32 tx_buf_size;
data_source_t data_source;
- u8 send_disconnects; /* actively send disconnect */
+ u8 send_quic_disconnects; /* actively send disconnect */
+ u8 send_stream_disconnects; /* actively send disconnect */
u8 *appns_id;
u64 appns_flags;
@@ -206,6 +216,7 @@ typedef struct
u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */
volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */
volatile u32 n_clients_connected; /* Number of STREAM sessions connected */
+ u32 n_rx_threads; /* Number of data threads */
u64 tx_total;
u64 rx_total;
@@ -371,6 +382,21 @@ format_api_error (u8 * s, va_list * args)
}
static uword
+unformat_close (unformat_input_t * input, va_list * args)
+{
+ u8 *a = va_arg (*args, u8 *);
+ if (unformat (input, "Y"))
+ *a = ECHO_CLOSE_F_ACTIVE;
+ else if (unformat (input, "N"))
+ *a = ECHO_CLOSE_F_NONE;
+ else if (unformat (input, "W"))
+ *a = ECHO_CLOSE_F_PASSIVE;
+ else
+ return 0;
+ return 1;
+}
+
+static uword
unformat_data (unformat_input_t * input, va_list * args)
{
u64 _a;
@@ -458,6 +484,18 @@ init_error_string_table (echo_main_t * em)
*
*/
+static echo_session_t *
+echo_session_alloc (echo_main_t * em)
+{
+ echo_session_t *session;
+ pool_get (em->sessions, session);
+ clib_memset (session, 0, sizeof (*session));
+ session->session_index = session - em->sessions;
+ session->listener_index = SESSION_INVALID_INDEX;
+ session->session_state = QUIC_SESSION_STATE_INITIAL;
+ return session;
+}
+
/*
*
* Session API Calls
@@ -754,7 +792,7 @@ echo_free_sessions (echo_main_t * em)
/* *INDENT-OFF* */
pool_foreach (s, em->sessions,
({
- if (s->flags & SESSION_FLAG_SHOULD_FREE)
+ if (s->session_state == QUIC_SESSION_STATE_CLOSED)
vec_add1 (session_indexes, s->session_index);}
));
/* *INDENT-ON* */
@@ -771,17 +809,19 @@ echo_free_sessions (echo_main_t * em)
static void
echo_cleanup_session (echo_main_t * em, echo_session_t * s)
{
+ u64 c;
echo_session_t *ls;
- u64 accepted_session_count;
if (s->listener_index != SESSION_INVALID_INDEX)
{
ls = pool_elt_at_index (em->sessions, s->listener_index);
- accepted_session_count =
- clib_atomic_sub_fetch (&ls->accepted_session_count, 1);
- if (accepted_session_count == 0
- && ls->session_type != QUIC_SESSION_TYPE_LISTEN
- && em->send_disconnects)
- echo_disconnect_session (em, ls);
+ c = clib_atomic_sub_fetch (&ls->accepted_session_count, 1);
+ if (c == 0 && ls->session_type == QUIC_SESSION_TYPE_QUIC)
+ {
+ if (em->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
+ echo_disconnect_session (em, ls);
+ else if (em->send_quic_disconnects == ECHO_CLOSE_F_NONE)
+ echo_cleanup_session (em, ls);
+ }
}
if (s->session_type == QUIC_SESSION_TYPE_QUIC)
clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1);
@@ -791,9 +831,21 @@ echo_cleanup_session (echo_main_t * em, echo_session_t * s)
DBG ("Cleanup sessions (still %uQ %uS)", em->n_quic_clients_connected,
em->n_clients_connected);
hash_unset (em->session_index_by_vpp_handles, s->vpp_session_handle);
+ s->session_state = QUIC_SESSION_STATE_CLOSED;
+}
- /* Mark session as to be freed */
- s->flags |= SESSION_FLAG_SHOULD_FREE;
+static void
+echo_initiate_session_close (echo_main_t * em, echo_session_t * s, u8 active)
+{
+ if (s->session_type == QUIC_SESSION_TYPE_STREAM)
+ {
+ if (!active && s->bytes_to_receive)
+ s->session_state = QUIC_SESSION_STATE_AWAIT_DATA;
+ else
+ s->session_state = QUIC_SESSION_STATE_CLOSING;
+ }
+ else
+ echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */
}
static void
@@ -831,7 +883,6 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
if (em->test_return_packets)
test_recv_bytes (em, s, rx_buf, n_read);
- ASSERT (s->bytes_to_receive >= n_read);
s->bytes_received += n_read;
s->bytes_to_receive -= n_read;
return n_read;
@@ -865,68 +916,131 @@ mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len)
/*
* Rx/Tx polling thread per connection
*/
+static void
+echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
+{
+ int n_read, n_sent;
+
+ n_read = recv_data_chunk (em, s, rx_buf);
+ if (em->data_source == ECHO_TEST_DATA_SOURCE)
+ n_sent =
+ send_data_chunk (s, em->connect_test_data,
+ s->bytes_sent % em->tx_buf_size, em->tx_buf_size);
+ else if (em->data_source == ECHO_RX_DATA_SOURCE)
+ n_sent = mirror_data_chunk (em, s, rx_buf, n_read);
+ else
+ n_sent = 0;
+ if (!s->bytes_to_send && !s->bytes_to_receive)
+ {
+ /* Session is done, need to close */
+ if (s->session_state == QUIC_SESSION_STATE_AWAIT_DATA)
+ s->session_state = QUIC_SESSION_STATE_CLOSING;
+ else
+ {
+ s->session_state = QUIC_SESSION_STATE_AWAIT_CLOSING;
+ if (em->send_stream_disconnects == ECHO_CLOSE_F_ACTIVE)
+ echo_disconnect_session (em, s);
+ else if (em->send_stream_disconnects == ECHO_CLOSE_F_NONE)
+ s->session_state = QUIC_SESSION_STATE_CLOSING;
+ }
+ return;
+ }
+
+ /* check idle clients */
+ if (n_sent || n_read)
+ s->idle_cycles = 0;
+ else if (s->idle_cycles++ == 1e7)
+ {
+ s->idle_cycles = 0;
+ DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send,
+ s->bytes_to_receive);
+ DBG ("Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo),
+ svm_fifo_max_dequeue (s->rx_fifo));
+ }
+}
+
+static void
+echo_thread_handle_closing (echo_main_t * em, echo_session_t * s)
+{
+
+ DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]",
+ s->bytes_received, s->bytes_received + s->bytes_to_receive,
+ s->session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send);
+ clib_atomic_fetch_add (&em->tx_total, s->bytes_sent);
+ clib_atomic_fetch_add (&em->rx_total, s->bytes_received);
+
+ if (PREDICT_FALSE (em->rx_total ==
+ em->n_clients * em->n_stream_clients *
+ em->bytes_to_receive))
+ quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE);
+ echo_cleanup_session (em, s);
+}
+
static void *
echo_thread_fn (void *arg)
{
echo_main_t *em = &echo_main;
+ u32 thread_n_sessions = (u64) arg & 0xFFFFFFFF;
+ u32 session_first_idx = (u64) arg >> 32;
+
+ u32 i = 0;
+ u32 n_closed_sessions = 0;
+ u32 session_index;
u8 *rx_buf = 0;
- u32 session_index = *(u32 *) arg;
echo_session_t *s;
- int n_read, n_sent, idle_loop = 0;
vec_validate (rx_buf, em->rx_buf_size);
while (!em->time_to_stop && em->state != STATE_READY)
;
- s = pool_elt_at_index (em->sessions, session_index);
- while (!em->time_to_stop)
+ for (i = 0; !em->time_to_stop; i++)
{
- n_read = recv_data_chunk (em, s, rx_buf);
- if (em->data_source == ECHO_TEST_DATA_SOURCE)
- n_sent =
- send_data_chunk (s, em->connect_test_data,
- s->bytes_sent % em->tx_buf_size, em->tx_buf_size);
- else if (em->data_source == ECHO_RX_DATA_SOURCE)
- n_sent = mirror_data_chunk (em, s, rx_buf, n_read);
- else
- n_sent = 0;
- if (!s->bytes_to_send && !s->bytes_to_receive)
- break;
- if (s->flags & SESSION_FLAG_SHOULD_CLOSE)
+ if (i % thread_n_sessions == 0)
+ n_closed_sessions = 0;
+ session_index =
+ em->thread_args[session_first_idx + i % thread_n_sessions];
+ s = pool_elt_at_index (em->sessions, session_index);
+ if (s->session_state == QUIC_SESSION_STATE_INITIAL
+ || s->session_state == QUIC_SESSION_STATE_AWAIT_DATA)
+ echo_thread_handle_data (em, s, rx_buf);
+ else if (s->session_state == QUIC_SESSION_STATE_CLOSING)
+ echo_thread_handle_closing (em, s);
+ else if (s->session_state == QUIC_SESSION_STATE_CLOSED)
+ n_closed_sessions++;
+ if (n_closed_sessions == thread_n_sessions)
break;
-
- /* check idle clients */
- if (!n_sent & !n_read)
- idle_loop++;
- else
- idle_loop = 0;
- if (idle_loop == 1e7)
- {
- DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send,
- s->bytes_to_receive);
- DBG ("Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo),
- svm_fifo_max_dequeue (s->rx_fifo));
- }
}
- DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]",
- s->bytes_received, s->bytes_received + s->bytes_to_receive,
- session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send);
- em->tx_total += s->bytes_sent;
- em->rx_total += s->bytes_received;
- clib_warning ("%d/%d", em->rx_total,
- em->n_clients * em->n_stream_clients * em->bytes_to_receive);
- if (em->rx_total ==
- em->n_clients * em->n_stream_clients * em->bytes_to_receive)
- quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE);
- if (em->send_disconnects)
- echo_disconnect_session (em, s);
- else
+ pthread_exit (0);
+}
+
+static int
+echo_start_rx_thread (u32 session_index)
+{
+ /* Each thread owns n consecutive sessions of the total N */
+
+ echo_main_t *em = &echo_main;
+ u32 N = em->n_clients * em->n_stream_clients;
+ u32 nc, n, first_idx, thread_sessions;
+
+ n = (N + em->n_rx_threads - 1) / em->n_rx_threads;
+ nc = em->n_clients_connected;
+
+ ASSERT (nc + 1 <= N);
+ em->thread_args[nc] = session_index;
+
+ if ((nc + 1) % n == 0 || nc + 1 == N)
{
- while (!(s->flags & SESSION_FLAG_SHOULD_CLOSE) & !em->time_to_stop)
- ;
- echo_cleanup_session (em, s);
+ first_idx = n * (nc / n);
+ thread_sessions = (nc + 1) % n == 0 ? n : (nc + 1) % n;
+ DBG ("Start thread %u [%u -> %u]", nc / n, first_idx,
+ first_idx + thread_sessions - 1);
+ return pthread_create (&em->client_thread_handles[nc / n],
+ NULL /*attr */ , echo_thread_fn,
+ (void *) ((u64) first_idx << 32 | (u64)
+ thread_sessions));
}
- pthread_exit (0);
+
+ return 0;
}
static void
@@ -934,8 +1048,6 @@ session_bound_handler (session_bound_msg_t * mp)
{
echo_main_t *em = &echo_main;
echo_session_t *listen_session;
- u32 session_index;
-
if (mp->retval)
{
ECHO_FAIL ("bind failed: %U", format_api_error,
@@ -948,14 +1060,13 @@ session_bound_handler (session_bound_msg_t * mp)
clib_net_to_host_u16 (mp->lcl_port));
/* Allocate local session and set it up */
- pool_get (em->sessions, listen_session);
+ listen_session = echo_session_alloc (em);
listen_session->session_type = QUIC_SESSION_TYPE_LISTEN;
listen_session->accepted_session_count = 0;
- session_index = listen_session - em->sessions;
- listen_session->session_index = session_index;
- hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+ hash_set (em->session_index_by_vpp_handles, mp->handle,
+ listen_session->session_index);
em->state = STATE_LISTEN;
- em->listen_session_index = session_index;
+ em->listen_session_index = listen_session->session_index;
}
static void
@@ -966,11 +1077,9 @@ session_accepted_handler (session_accepted_msg_t * mp)
svm_fifo_t *rx_fifo, *tx_fifo;
echo_main_t *em = &echo_main;
echo_session_t *session, *listen_session;
- u32 session_index;
uword *p;
/* Allocate local session and set it up */
- pool_get (em->sessions, session);
- session_index = session - em->sessions;
+ session = echo_session_alloc (em);
if (wait_for_segment_allocation (mp->segment_handle))
{
@@ -979,13 +1088,12 @@ session_accepted_handler (session_accepted_msg_t * mp)
}
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- rx_fifo->client_session_index = session_index;
+ rx_fifo->client_session_index = session->session_index;
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
- tx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session->session_index;
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
- session->session_index = session_index;
session->vpp_session_handle = mp->handle;
session->start = clib_time_now (&em->clib_time);
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
@@ -1003,9 +1111,9 @@ session_accepted_handler (session_accepted_msg_t * mp)
clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
/* Add it to lookup table */
- DBG ("Accepted session 0x%lx, Listener 0x%lx idx %lu", mp->handle,
- mp->listener_handle, session_index);
- hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+ DBG ("Accepted session 0x%lx -> 0x%lx", mp->handle, mp->listener_handle);
+ hash_set (em->session_index_by_vpp_handles, mp->handle,
+ session->session_index);
app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
SESSION_CTRL_EVT_ACCEPTED_REPLY);
@@ -1018,26 +1126,27 @@ session_accepted_handler (session_accepted_msg_t * mp)
{
session->session_type = QUIC_SESSION_TYPE_QUIC;
if (em->cb_vft.quic_accepted_cb)
- em->cb_vft.quic_accepted_cb (mp, session_index);
+ em->cb_vft.quic_accepted_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_quic_clients_connected, 1);
}
else if (em->i_am_master)
{
session->session_type = QUIC_SESSION_TYPE_STREAM;
if (em->cb_vft.server_stream_accepted_cb)
- em->cb_vft.server_stream_accepted_cb (mp, session_index);
+ em->cb_vft.server_stream_accepted_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_clients_connected, 1);
}
else
{
session->session_type = QUIC_SESSION_TYPE_STREAM;
if (em->cb_vft.client_stream_accepted_cb)
- em->cb_vft.client_stream_accepted_cb (mp, session_index);
+ em->cb_vft.client_stream_accepted_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_clients_connected, 1);
}
if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
{
+ DBG ("App is ready");
em->state = STATE_READY;
quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
}
@@ -1050,7 +1159,6 @@ session_connected_handler (session_connected_msg_t * mp)
{
echo_main_t *em = &echo_main;
echo_session_t *session, *listen_session;
- u32 session_index;
u32 listener_index = htonl (mp->context);
svm_fifo_t *rx_fifo, *tx_fifo;
@@ -1061,10 +1169,7 @@ session_connected_handler (session_connected_msg_t * mp)
return;
}
- pool_get (em->sessions, session);
- clib_memset (session, 0, sizeof (*session));
- session_index = session - em->sessions;
-
+ session = echo_session_alloc (em);
if (wait_for_segment_allocation (mp->segment_handle))
{
ECHO_FAIL ("wait_for_segment_allocation errored");
@@ -1072,53 +1177,57 @@ session_connected_handler (session_connected_msg_t * mp)
}
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- rx_fifo->client_session_index = session_index;
+ rx_fifo->client_session_index = session->session_index;
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
- tx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session->session_index;
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
session->vpp_session_handle = mp->handle;
- session->session_index = session_index;
session->start = clib_time_now (&em->clib_time);
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
svm_msg_q_t *);
- session->listener_index = listener_index;
session->accepted_session_count = 0;
if (listener_index != SESSION_INVALID_INDEX)
{
listen_session = pool_elt_at_index (em->sessions, listener_index);
clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
+ session->listener_index = listen_session->session_index;
}
- DBG ("Connected session 0x%lx", mp->handle, session_index);
- hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
+ DBG ("Connected session 0x%lx -> 0x%lx", mp->handle,
+ listener_index !=
+ SESSION_INVALID_INDEX ? listen_session->vpp_session_handle : (u64) ~
+ 0);
+ hash_set (em->session_index_by_vpp_handles, mp->handle,
+ session->session_index);
if (listener_index == SESSION_INVALID_INDEX)
{
session->session_type = QUIC_SESSION_TYPE_QUIC;
if (em->cb_vft.quic_connected_cb)
- em->cb_vft.quic_connected_cb (mp, session_index);
+ em->cb_vft.quic_connected_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_quic_clients_connected, 1);
}
else if (em->i_am_master)
{
session->session_type = QUIC_SESSION_TYPE_STREAM;
if (em->cb_vft.server_stream_connected_cb)
- em->cb_vft.server_stream_connected_cb (mp, session_index);
+ em->cb_vft.server_stream_connected_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_clients_connected, 1);
}
else
{
session->session_type = QUIC_SESSION_TYPE_STREAM;
if (em->cb_vft.client_stream_connected_cb)
- em->cb_vft.client_stream_connected_cb (mp, session_index);
+ em->cb_vft.client_stream_connected_cb (mp, session->session_index);
clib_atomic_fetch_add (&em->n_clients_connected, 1);
}
if (em->n_clients_connected == em->n_clients * em->n_stream_clients)
{
+ DBG ("App is ready");
em->state = STATE_READY;
quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
}
@@ -1164,16 +1273,7 @@ echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
session->bytes_to_send = em->bytes_to_send;
session->bytes_to_receive = em->bytes_to_receive;
- DBG ("Stream session 0x%lx connected", session->vpp_session_handle);
-
- /*
- * Start RX thread
- */
- em->thread_args[em->n_clients_connected] = session_index;
- rv = pthread_create (&em->client_thread_handles[em->n_clients_connected],
- NULL /*attr */ , echo_thread_fn,
- (void *) &em->thread_args[em->n_clients_connected]);
- if (rv)
+ if ((rv = echo_start_rx_thread (session_index)))
{
ECHO_FAIL ("pthread_create returned %d", rv);
return;
@@ -1198,16 +1298,7 @@ echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
session->bytes_to_send = em->bytes_to_send;
session->bytes_to_receive = em->bytes_to_receive;
- DBG ("Stream session 0x%lx accepted", mp->handle);
-
- /*
- * Start RX thread
- */
- em->thread_args[em->n_clients_connected] = session_index;
- rv = pthread_create (&em->client_thread_handles[em->n_clients_connected],
- NULL /*attr */ , echo_thread_fn,
- (void *) &em->thread_args[em->n_clients_connected]);
- if (rv)
+ if ((rv = echo_start_rx_thread (session_index)))
{
ECHO_FAIL ("pthread_create returned %d", rv);
return;
@@ -1317,11 +1408,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp)
rmp->context = mp->context;
app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
- if (s->session_type == QUIC_SESSION_TYPE_STREAM)
- s->flags |= SESSION_FLAG_SHOULD_CLOSE; /* tell thread to close session */
- else
- echo_cleanup_session (em, s); /* We can clean Qsessions right away */
-
+ echo_initiate_session_close (em, s, 0 /* active */ );
if (s->session_type == QUIC_SESSION_TYPE_STREAM)
session_print_stats (em, s);
}
@@ -1636,7 +1723,7 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
}
em->state = STATE_DISCONNECTED;
listen_session = pool_elt_at_index (em->sessions, em->listen_session_index);
- echo_cleanup_session (em, listen_session);
+ echo_initiate_session_close (em, listen_session, 1 /* active */ );
}
static void
@@ -1662,7 +1749,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
}
s = pool_elt_at_index (em->sessions, p[0]);
- echo_cleanup_session (em, s);
+ echo_initiate_session_close (em, s, 1 /* active */ );
}
static void
@@ -1681,6 +1768,29 @@ static void
ECHO_FAIL ("failed to add tls key");
}
+static void
+vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
+{
+ echo_session_t *session;
+ echo_main_t *em = &echo_main;
+ u8 *uri;
+ if (!mp->retval)
+ return;
+ /* retry connect */
+ if (mp->context == SESSION_INVALID_INDEX)
+ {
+ DBG ("Retrying connect %s", em->uri);
+ echo_send_connect (em, em->uri, SESSION_INVALID_INDEX);
+ }
+ else
+ {
+ session = pool_elt_at_index (em->sessions, mp->context);
+ uri = format (0, "QUIC://session/%lu", session->vpp_session_handle);
+ DBG ("Retrying connect %s", uri);
+ echo_send_connect (em, uri, mp->context);
+ }
+}
+
#define foreach_quic_echo_msg \
_(BIND_URI_REPLY, bind_uri_reply) \
_(UNBIND_URI_REPLY, unbind_uri_reply) \
@@ -1691,6 +1801,7 @@ _(MAP_ANOTHER_SEGMENT, map_another_segment) \
_(UNMAP_SEGMENT, unmap_segment) \
_(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \
_(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \
+_(CONNECT_URI_REPLY, connect_uri_reply) \
void
quic_echo_api_hookup (echo_main_t * em)
@@ -1732,8 +1843,11 @@ print_usage_and_exit (void)
" chroot prefix PATH Use PATH as memory root path\n"
" quic-setup OPT OPT=serverstream : Client open N connections. On each one server opens M streams\n"
" by default : Client open N connections. On each one client opens M streams\n"
+ " sclose=[Y|N|W] When a stream is done (RX & TX), send close[Y] wait for close[W] or pass[N]\n"
+ " qclose=[Y|N|W] When a connection is done (RX & TX), send close[Y] wait for close[W] or pass[N]\n"
"\n"
" nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n"
+ " nthreads N Use N busy loop threads for data [in addition to main & msg queue]\n"
" TX=1337[Kb|Mb|GB] Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n"
" RX=1337[Kb|Mb|GB] Expect 1337 [K|M|G]bytes\n"
"\n"
@@ -1752,6 +1866,7 @@ quic_echo_process_opts (int argc, char **argv)
u32 tmp;
u8 *chroot_prefix;
u8 *uri = 0;
+ u8 default_f_active;
unformat_init_command_line (a, argv);
while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
@@ -1786,6 +1901,8 @@ quic_echo_process_opts (int argc, char **argv)
;
else if (unformat (a, "nclients %d", &em->n_clients))
;
+ else if (unformat (a, "nthreads %d", &em->n_rx_threads))
+ ;
else if (unformat (a, "appns %_%v%_", &em->appns_id))
;
else if (unformat (a, "all-scope"))
@@ -1805,6 +1922,14 @@ quic_echo_process_opts (int argc, char **argv)
;
else if (unformat (a, "RX=%U", unformat_data, &em->bytes_to_receive))
;
+ else
+ if (unformat
+ (a, "sclose=%U", unformat_close, &em->send_stream_disconnects))
+ ;
+ else
+ if (unformat
+ (a, "qclose=%U", unformat_close, &em->send_quic_disconnects))
+ ;
else if (unformat (a, "time %U:%U",
echo_unformat_timing_event, &em->timing_start_event,
echo_unformat_timing_event, &em->timing_end_event))
@@ -1813,6 +1938,9 @@ quic_echo_process_opts (int argc, char **argv)
print_usage_and_exit ();
}
+ /* setting default for unset values
+ *
+ * bytes_to_send / bytes_to_receive & data_source */
if (em->bytes_to_receive == (u64) ~ 0)
em->bytes_to_receive = 64 << 10; /* default */
if (em->bytes_to_send == (u64) ~ 0)
@@ -1828,7 +1956,17 @@ quic_echo_process_opts (int argc, char **argv)
if (em->data_source == ECHO_RX_DATA_SOURCE)
em->bytes_to_send = em->bytes_to_receive;
- em->send_disconnects = !em->i_am_master;
+ /* disconnect flags */
+ if (em->i_am_master)
+ default_f_active =
+ em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
+ else
+ default_f_active =
+ em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
+ if (em->send_stream_disconnects == ECHO_CLOSE_F_INVALID)
+ em->send_stream_disconnects = default_f_active;
+ if (em->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
+ em->send_quic_disconnects = default_f_active;
}
int
@@ -1852,6 +1990,7 @@ main (int argc, char **argv)
em->max_test_msg = 50;
em->time_to_stop = 0;
em->i_am_master = 1;
+ em->n_rx_threads = 4;
em->test_return_packets = RETURN_PACKETS_NOTEST;
em->timing_start_event = ECHO_EVT_FIRST_QCONNECT;
em->timing_end_event = ECHO_EVT_LAST_BYTE;
@@ -1865,7 +2004,7 @@ main (int argc, char **argv)
quic_echo_process_opts (argc, argv);
n_clients = em->n_clients * em->n_stream_clients;
- vec_validate (em->client_thread_handles, n_clients - 1);
+ vec_validate (em->client_thread_handles, em->n_rx_threads - 1);
vec_validate (em->thread_args, n_clients - 1);
clib_time_init (&em->clib_time);
init_error_string_table (em);
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index 8732b7dac72..f38a7ca30a6 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -307,6 +307,14 @@ quic_ctx_get (u32 ctx_index, u32 thread_index)
}
static quic_ctx_t *
+quic_ctx_get_if_valid (u32 ctx_index, u32 thread_index)
+{
+ if (pool_is_free_index (quic_main.ctx_pool[thread_index], ctx_index))
+ return 0;
+ return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
+}
+
+static quic_ctx_t *
quic_get_conn_ctx (quicly_conn_t * conn)
{
u64 conn_data;
@@ -1494,7 +1502,9 @@ quic_connect (transport_endpoint_cfg_t * tep)
static void
quic_proto_on_close (u32 ctx_index, u32 thread_index)
{
- quic_ctx_t *ctx = quic_ctx_get (ctx_index, thread_index);
+ quic_ctx_t *ctx = quic_ctx_get_if_valid (ctx_index, thread_index);
+ if (!ctx)
+ return;
#if QUIC_DEBUG >= 2
session_t *stream_session =
session_get (ctx->c_s_index, ctx->c_thread_index);