From ace51525552375b4716c01a6f9ba5261fd64f940 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Mon, 8 Jul 2019 18:18:27 +0200 Subject: quic: echo thread can handle multiple sessions Type: feature Change-Id: Ibb60d5b46aafe109a81a8604712a917f6e246eaf Signed-off-by: Nathan Skrzypczak --- src/plugins/hs_apps/sapi/quic_echo.c | 395 +++++++++++++++++++++++------------ src/plugins/quic/quic.c | 12 +- 2 files changed, 278 insertions(+), 129 deletions(-) diff --git a/src/plugins/hs_apps/sapi/quic_echo.c b/src/plugins/hs_apps/sapi/quic_echo.c index 3cbb663646a..3be513afa89 100644 --- a/src/plugins/hs_apps/sapi/quic_echo.c +++ b/src/plugins/hs_apps/sapi/quic_echo.c @@ -58,13 +58,6 @@ clib_warning ("ECHO-ERROR: "_fmt, ##_args); \ } -typedef enum echo_session_flag_ -{ - SESSION_FLAG_NOFLAG = 0, - SESSION_FLAG_SHOULD_CLOSE = 1, - SESSION_FLAG_SHOULD_FREE = 2, -} echo_session_flag_t; - typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -78,8 +71,8 @@ typedef struct volatile u64 bytes_to_receive; f64 start; u32 listener_index; /* listener index in echo session pool */ + u32 idle_cycles; /* consecutive enq/deq with no data */ volatile u64 accepted_session_count; /* sessions we accepted */ - volatile echo_session_flag_t flags; } echo_session_t; typedef enum @@ -90,6 +83,14 @@ typedef enum ECHO_INVALID_DATA_SOURCE } data_source_t; +enum echo_close_f_t +{ + ECHO_CLOSE_F_INVALID = 0, + ECHO_CLOSE_F_PASSIVE, /* wait for close msg */ + ECHO_CLOSE_F_ACTIVE, /* send close msg */ + ECHO_CLOSE_F_NONE, /* don't bother sending close msg */ +}; + enum quic_session_type_t { QUIC_SESSION_TYPE_QUIC, @@ -97,6 +98,14 @@ enum quic_session_type_t QUIC_SESSION_TYPE_LISTEN, }; +enum quic_session_state_t +{ + QUIC_SESSION_STATE_INITIAL, + QUIC_SESSION_STATE_AWAIT_CLOSING, /* Data transfer is done, wait for close evt */ + QUIC_SESSION_STATE_AWAIT_DATA, /* Peer closed, wait for outstanding data */ + QUIC_SESSION_STATE_CLOSING, /* told vpp to close */ + QUIC_SESSION_STATE_CLOSED, /* closed in vpp */ +}; typedef enum { @@ -194,7 +203,8 @@ typedef struct u32 rx_buf_size; u32 tx_buf_size; data_source_t data_source; - u8 send_disconnects; /* actively send disconnect */ + u8 send_quic_disconnects; /* actively send disconnect */ + u8 send_stream_disconnects; /* actively send disconnect */ u8 *appns_id; u64 appns_flags; @@ -206,6 +216,7 @@ typedef struct u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ volatile u32 n_clients_connected; /* Number of STREAM sessions connected */ + u32 n_rx_threads; /* Number of data threads */ u64 tx_total; u64 rx_total; @@ -370,6 +381,21 @@ format_api_error (u8 * s, va_list * args) return s; } +static uword +unformat_close (unformat_input_t * input, va_list * args) +{ + u8 *a = va_arg (*args, u8 *); + if (unformat (input, "Y")) + *a = ECHO_CLOSE_F_ACTIVE; + else if (unformat (input, "N")) + *a = ECHO_CLOSE_F_NONE; + else if (unformat (input, "W")) + *a = ECHO_CLOSE_F_PASSIVE; + else + return 0; + return 1; +} + static uword unformat_data (unformat_input_t * input, va_list * args) { @@ -458,6 +484,18 @@ init_error_string_table (echo_main_t * em) * */ +static echo_session_t * +echo_session_alloc (echo_main_t * em) +{ + echo_session_t *session; + pool_get (em->sessions, session); + clib_memset (session, 0, sizeof (*session)); + session->session_index = session - em->sessions; + session->listener_index = SESSION_INVALID_INDEX; + session->session_state = QUIC_SESSION_STATE_INITIAL; + return session; +} + /* * * Session API Calls @@ -754,7 +792,7 @@ echo_free_sessions (echo_main_t * em) /* *INDENT-OFF* */ pool_foreach (s, em->sessions, ({ - if (s->flags & SESSION_FLAG_SHOULD_FREE) + if (s->session_state == QUIC_SESSION_STATE_CLOSED) vec_add1 (session_indexes, s->session_index);} )); /* *INDENT-ON* */ @@ -771,17 +809,19 @@ echo_free_sessions (echo_main_t * em) static void echo_cleanup_session (echo_main_t * em, echo_session_t * s) { + u64 c; echo_session_t *ls; - u64 accepted_session_count; if (s->listener_index != SESSION_INVALID_INDEX) { ls = pool_elt_at_index (em->sessions, s->listener_index); - accepted_session_count = - clib_atomic_sub_fetch (&ls->accepted_session_count, 1); - if (accepted_session_count == 0 - && ls->session_type != QUIC_SESSION_TYPE_LISTEN - && em->send_disconnects) - echo_disconnect_session (em, ls); + c = clib_atomic_sub_fetch (&ls->accepted_session_count, 1); + if (c == 0 && ls->session_type == QUIC_SESSION_TYPE_QUIC) + { + if (em->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE) + echo_disconnect_session (em, ls); + else if (em->send_quic_disconnects == ECHO_CLOSE_F_NONE) + echo_cleanup_session (em, ls); + } } if (s->session_type == QUIC_SESSION_TYPE_QUIC) clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1); @@ -791,9 +831,21 @@ echo_cleanup_session (echo_main_t * em, echo_session_t * s) DBG ("Cleanup sessions (still %uQ %uS)", em->n_quic_clients_connected, em->n_clients_connected); hash_unset (em->session_index_by_vpp_handles, s->vpp_session_handle); + s->session_state = QUIC_SESSION_STATE_CLOSED; +} - /* Mark session as to be freed */ - s->flags |= SESSION_FLAG_SHOULD_FREE; +static void +echo_initiate_session_close (echo_main_t * em, echo_session_t * s, u8 active) +{ + if (s->session_type == QUIC_SESSION_TYPE_STREAM) + { + if (!active && s->bytes_to_receive) + s->session_state = QUIC_SESSION_STATE_AWAIT_DATA; + else + s->session_state = QUIC_SESSION_STATE_CLOSING; + } + else + echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */ } static void @@ -831,7 +883,6 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf) if (em->test_return_packets) test_recv_bytes (em, s, rx_buf, n_read); - ASSERT (s->bytes_to_receive >= n_read); s->bytes_received += n_read; s->bytes_to_receive -= n_read; return n_read; @@ -865,68 +916,131 @@ mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len) /* * Rx/Tx polling thread per connection */ +static void +echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf) +{ + int n_read, n_sent; + + n_read = recv_data_chunk (em, s, rx_buf); + if (em->data_source == ECHO_TEST_DATA_SOURCE) + n_sent = + send_data_chunk (s, em->connect_test_data, + s->bytes_sent % em->tx_buf_size, em->tx_buf_size); + else if (em->data_source == ECHO_RX_DATA_SOURCE) + n_sent = mirror_data_chunk (em, s, rx_buf, n_read); + else + n_sent = 0; + if (!s->bytes_to_send && !s->bytes_to_receive) + { + /* Session is done, need to close */ + if (s->session_state == QUIC_SESSION_STATE_AWAIT_DATA) + s->session_state = QUIC_SESSION_STATE_CLOSING; + else + { + s->session_state = QUIC_SESSION_STATE_AWAIT_CLOSING; + if (em->send_stream_disconnects == ECHO_CLOSE_F_ACTIVE) + echo_disconnect_session (em, s); + else if (em->send_stream_disconnects == ECHO_CLOSE_F_NONE) + s->session_state = QUIC_SESSION_STATE_CLOSING; + } + return; + } + + /* check idle clients */ + if (n_sent || n_read) + s->idle_cycles = 0; + else if (s->idle_cycles++ == 1e7) + { + s->idle_cycles = 0; + DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send, + s->bytes_to_receive); + DBG ("Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo), + svm_fifo_max_dequeue (s->rx_fifo)); + } +} + +static void +echo_thread_handle_closing (echo_main_t * em, echo_session_t * s) +{ + + DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]", + s->bytes_received, s->bytes_received + s->bytes_to_receive, + s->session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send); + clib_atomic_fetch_add (&em->tx_total, s->bytes_sent); + clib_atomic_fetch_add (&em->rx_total, s->bytes_received); + + if (PREDICT_FALSE (em->rx_total == + em->n_clients * em->n_stream_clients * + em->bytes_to_receive)) + quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); + echo_cleanup_session (em, s); +} + static void * echo_thread_fn (void *arg) { echo_main_t *em = &echo_main; + u32 thread_n_sessions = (u64) arg & 0xFFFFFFFF; + u32 session_first_idx = (u64) arg >> 32; + + u32 i = 0; + u32 n_closed_sessions = 0; + u32 session_index; u8 *rx_buf = 0; - u32 session_index = *(u32 *) arg; echo_session_t *s; - int n_read, n_sent, idle_loop = 0; vec_validate (rx_buf, em->rx_buf_size); while (!em->time_to_stop && em->state != STATE_READY) ; - s = pool_elt_at_index (em->sessions, session_index); - while (!em->time_to_stop) + for (i = 0; !em->time_to_stop; i++) { - n_read = recv_data_chunk (em, s, rx_buf); - if (em->data_source == ECHO_TEST_DATA_SOURCE) - n_sent = - send_data_chunk (s, em->connect_test_data, - s->bytes_sent % em->tx_buf_size, em->tx_buf_size); - else if (em->data_source == ECHO_RX_DATA_SOURCE) - n_sent = mirror_data_chunk (em, s, rx_buf, n_read); - else - n_sent = 0; - if (!s->bytes_to_send && !s->bytes_to_receive) - break; - if (s->flags & SESSION_FLAG_SHOULD_CLOSE) + if (i % thread_n_sessions == 0) + n_closed_sessions = 0; + session_index = + em->thread_args[session_first_idx + i % thread_n_sessions]; + s = pool_elt_at_index (em->sessions, session_index); + if (s->session_state == QUIC_SESSION_STATE_INITIAL + || s->session_state == QUIC_SESSION_STATE_AWAIT_DATA) + echo_thread_handle_data (em, s, rx_buf); + else if (s->session_state == QUIC_SESSION_STATE_CLOSING) + echo_thread_handle_closing (em, s); + else if (s->session_state == QUIC_SESSION_STATE_CLOSED) + n_closed_sessions++; + if (n_closed_sessions == thread_n_sessions) break; - - /* check idle clients */ - if (!n_sent & !n_read) - idle_loop++; - else - idle_loop = 0; - if (idle_loop == 1e7) - { - DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send, - s->bytes_to_receive); - DBG ("Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo), - svm_fifo_max_dequeue (s->rx_fifo)); - } } - DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]", - s->bytes_received, s->bytes_received + s->bytes_to_receive, - session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send); - em->tx_total += s->bytes_sent; - em->rx_total += s->bytes_received; - clib_warning ("%d/%d", em->rx_total, - em->n_clients * em->n_stream_clients * em->bytes_to_receive); - if (em->rx_total == - em->n_clients * em->n_stream_clients * em->bytes_to_receive) - quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); - if (em->send_disconnects) - echo_disconnect_session (em, s); - else + pthread_exit (0); +} + +static int +echo_start_rx_thread (u32 session_index) +{ + /* Each thread owns n consecutive sessions of the total N */ + + echo_main_t *em = &echo_main; + u32 N = em->n_clients * em->n_stream_clients; + u32 nc, n, first_idx, thread_sessions; + + n = (N + em->n_rx_threads - 1) / em->n_rx_threads; + nc = em->n_clients_connected; + + ASSERT (nc + 1 <= N); + em->thread_args[nc] = session_index; + + if ((nc + 1) % n == 0 || nc + 1 == N) { - while (!(s->flags & SESSION_FLAG_SHOULD_CLOSE) & !em->time_to_stop) - ; - echo_cleanup_session (em, s); + first_idx = n * (nc / n); + thread_sessions = (nc + 1) % n == 0 ? n : (nc + 1) % n; + DBG ("Start thread %u [%u -> %u]", nc / n, first_idx, + first_idx + thread_sessions - 1); + return pthread_create (&em->client_thread_handles[nc / n], + NULL /*attr */ , echo_thread_fn, + (void *) ((u64) first_idx << 32 | (u64) + thread_sessions)); } - pthread_exit (0); + + return 0; } static void @@ -934,8 +1048,6 @@ session_bound_handler (session_bound_msg_t * mp) { echo_main_t *em = &echo_main; echo_session_t *listen_session; - u32 session_index; - if (mp->retval) { ECHO_FAIL ("bind failed: %U", format_api_error, @@ -948,14 +1060,13 @@ session_bound_handler (session_bound_msg_t * mp) clib_net_to_host_u16 (mp->lcl_port)); /* Allocate local session and set it up */ - pool_get (em->sessions, listen_session); + listen_session = echo_session_alloc (em); listen_session->session_type = QUIC_SESSION_TYPE_LISTEN; listen_session->accepted_session_count = 0; - session_index = listen_session - em->sessions; - listen_session->session_index = session_index; - hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + hash_set (em->session_index_by_vpp_handles, mp->handle, + listen_session->session_index); em->state = STATE_LISTEN; - em->listen_session_index = session_index; + em->listen_session_index = listen_session->session_index; } static void @@ -966,11 +1077,9 @@ session_accepted_handler (session_accepted_msg_t * mp) svm_fifo_t *rx_fifo, *tx_fifo; echo_main_t *em = &echo_main; echo_session_t *session, *listen_session; - u32 session_index; uword *p; /* Allocate local session and set it up */ - pool_get (em->sessions, session); - session_index = session - em->sessions; + session = echo_session_alloc (em); if (wait_for_segment_allocation (mp->segment_handle)) { @@ -979,13 +1088,12 @@ session_accepted_handler (session_accepted_msg_t * mp) } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); - rx_fifo->client_session_index = session_index; + rx_fifo->client_session_index = session->session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); - tx_fifo->client_session_index = session_index; + tx_fifo->client_session_index = session->session_index; session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; - session->session_index = session_index; session->vpp_session_handle = mp->handle; session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, @@ -1003,9 +1111,9 @@ session_accepted_handler (session_accepted_msg_t * mp) clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); /* Add it to lookup table */ - DBG ("Accepted session 0x%lx, Listener 0x%lx idx %lu", mp->handle, - mp->listener_handle, session_index); - hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + DBG ("Accepted session 0x%lx -> 0x%lx", mp->handle, mp->listener_handle); + hash_set (em->session_index_by_vpp_handles, mp->handle, + session->session_index); app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY); @@ -1018,26 +1126,27 @@ session_accepted_handler (session_accepted_msg_t * mp) { session->session_type = QUIC_SESSION_TYPE_QUIC; if (em->cb_vft.quic_accepted_cb) - em->cb_vft.quic_accepted_cb (mp, session_index); + em->cb_vft.quic_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } else if (em->i_am_master) { session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.server_stream_accepted_cb) - em->cb_vft.server_stream_accepted_cb (mp, session_index); + em->cb_vft.server_stream_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } else { session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.client_stream_accepted_cb) - em->cb_vft.client_stream_accepted_cb (mp, session_index); + em->cb_vft.client_stream_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients) { + DBG ("App is ready"); em->state = STATE_READY; quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); } @@ -1050,7 +1159,6 @@ session_connected_handler (session_connected_msg_t * mp) { echo_main_t *em = &echo_main; echo_session_t *session, *listen_session; - u32 session_index; u32 listener_index = htonl (mp->context); svm_fifo_t *rx_fifo, *tx_fifo; @@ -1061,10 +1169,7 @@ session_connected_handler (session_connected_msg_t * mp) return; } - pool_get (em->sessions, session); - clib_memset (session, 0, sizeof (*session)); - session_index = session - em->sessions; - + session = echo_session_alloc (em); if (wait_for_segment_allocation (mp->segment_handle)) { ECHO_FAIL ("wait_for_segment_allocation errored"); @@ -1072,53 +1177,57 @@ session_connected_handler (session_connected_msg_t * mp) } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); - rx_fifo->client_session_index = session_index; + rx_fifo->client_session_index = session->session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); - tx_fifo->client_session_index = session_index; + tx_fifo->client_session_index = session->session_index; session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; session->vpp_session_handle = mp->handle; - session->session_index = session_index; session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - session->listener_index = listener_index; session->accepted_session_count = 0; if (listener_index != SESSION_INVALID_INDEX) { listen_session = pool_elt_at_index (em->sessions, listener_index); clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); + session->listener_index = listen_session->session_index; } - DBG ("Connected session 0x%lx", mp->handle, session_index); - hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + DBG ("Connected session 0x%lx -> 0x%lx", mp->handle, + listener_index != + SESSION_INVALID_INDEX ? listen_session->vpp_session_handle : (u64) ~ + 0); + hash_set (em->session_index_by_vpp_handles, mp->handle, + session->session_index); if (listener_index == SESSION_INVALID_INDEX) { session->session_type = QUIC_SESSION_TYPE_QUIC; if (em->cb_vft.quic_connected_cb) - em->cb_vft.quic_connected_cb (mp, session_index); + em->cb_vft.quic_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } else if (em->i_am_master) { session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.server_stream_connected_cb) - em->cb_vft.server_stream_connected_cb (mp, session_index); + em->cb_vft.server_stream_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } else { session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.client_stream_connected_cb) - em->cb_vft.client_stream_connected_cb (mp, session_index); + em->cb_vft.client_stream_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients) { + DBG ("App is ready"); em->state = STATE_READY; quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); } @@ -1164,16 +1273,7 @@ echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; - DBG ("Stream session 0x%lx connected", session->vpp_session_handle); - - /* - * Start RX thread - */ - em->thread_args[em->n_clients_connected] = session_index; - rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], - NULL /*attr */ , echo_thread_fn, - (void *) &em->thread_args[em->n_clients_connected]); - if (rv) + if ((rv = echo_start_rx_thread (session_index))) { ECHO_FAIL ("pthread_create returned %d", rv); return; @@ -1198,16 +1298,7 @@ echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index) session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; - DBG ("Stream session 0x%lx accepted", mp->handle); - - /* - * Start RX thread - */ - em->thread_args[em->n_clients_connected] = session_index; - rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], - NULL /*attr */ , echo_thread_fn, - (void *) &em->thread_args[em->n_clients_connected]); - if (rv) + if ((rv = echo_start_rx_thread (session_index))) { ECHO_FAIL ("pthread_create returned %d", rv); return; @@ -1317,11 +1408,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp) rmp->context = mp->context; app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt); - if (s->session_type == QUIC_SESSION_TYPE_STREAM) - s->flags |= SESSION_FLAG_SHOULD_CLOSE; /* tell thread to close session */ - else - echo_cleanup_session (em, s); /* We can clean Qsessions right away */ - + echo_initiate_session_close (em, s, 0 /* active */ ); if (s->session_type == QUIC_SESSION_TYPE_STREAM) session_print_stats (em, s); } @@ -1636,7 +1723,7 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) } em->state = STATE_DISCONNECTED; listen_session = pool_elt_at_index (em->sessions, em->listen_session_index); - echo_cleanup_session (em, listen_session); + echo_initiate_session_close (em, listen_session, 1 /* active */ ); } static void @@ -1662,7 +1749,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * } s = pool_elt_at_index (em->sessions, p[0]); - echo_cleanup_session (em, s); + echo_initiate_session_close (em, s, 1 /* active */ ); } static void @@ -1681,6 +1768,29 @@ static void ECHO_FAIL ("failed to add tls key"); } +static void +vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp) +{ + echo_session_t *session; + echo_main_t *em = &echo_main; + u8 *uri; + if (!mp->retval) + return; + /* retry connect */ + if (mp->context == SESSION_INVALID_INDEX) + { + DBG ("Retrying connect %s", em->uri); + echo_send_connect (em, em->uri, SESSION_INVALID_INDEX); + } + else + { + session = pool_elt_at_index (em->sessions, mp->context); + uri = format (0, "QUIC://session/%lu", session->vpp_session_handle); + DBG ("Retrying connect %s", uri); + echo_send_connect (em, uri, mp->context); + } +} + #define foreach_quic_echo_msg \ _(BIND_URI_REPLY, bind_uri_reply) \ _(UNBIND_URI_REPLY, unbind_uri_reply) \ @@ -1691,6 +1801,7 @@ _(MAP_ANOTHER_SEGMENT, map_another_segment) \ _(UNMAP_SEGMENT, unmap_segment) \ _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \ _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \ +_(CONNECT_URI_REPLY, connect_uri_reply) \ void quic_echo_api_hookup (echo_main_t * em) @@ -1732,8 +1843,11 @@ print_usage_and_exit (void) " chroot prefix PATH Use PATH as memory root path\n" " quic-setup OPT OPT=serverstream : Client open N connections. On each one server opens M streams\n" " by default : Client open N connections. On each one client opens M streams\n" + " sclose=[Y|N|W] When a stream is done (RX & TX), send close[Y] wait for close[W] or pass[N]\n" + " qclose=[Y|N|W] When a connection is done (RX & TX), send close[Y] wait for close[W] or pass[N]\n" "\n" " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n" + " nthreads N Use N busy loop threads for data [in addition to main & msg queue]\n" " TX=1337[Kb|Mb|GB] Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n" " RX=1337[Kb|Mb|GB] Expect 1337 [K|M|G]bytes\n" "\n" @@ -1752,6 +1866,7 @@ quic_echo_process_opts (int argc, char **argv) u32 tmp; u8 *chroot_prefix; u8 *uri = 0; + u8 default_f_active; unformat_init_command_line (a, argv); while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT) @@ -1786,6 +1901,8 @@ quic_echo_process_opts (int argc, char **argv) ; else if (unformat (a, "nclients %d", &em->n_clients)) ; + else if (unformat (a, "nthreads %d", &em->n_rx_threads)) + ; else if (unformat (a, "appns %_%v%_", &em->appns_id)) ; else if (unformat (a, "all-scope")) @@ -1805,6 +1922,14 @@ quic_echo_process_opts (int argc, char **argv) ; else if (unformat (a, "RX=%U", unformat_data, &em->bytes_to_receive)) ; + else + if (unformat + (a, "sclose=%U", unformat_close, &em->send_stream_disconnects)) + ; + else + if (unformat + (a, "qclose=%U", unformat_close, &em->send_quic_disconnects)) + ; else if (unformat (a, "time %U:%U", echo_unformat_timing_event, &em->timing_start_event, echo_unformat_timing_event, &em->timing_end_event)) @@ -1813,6 +1938,9 @@ quic_echo_process_opts (int argc, char **argv) print_usage_and_exit (); } + /* setting default for unset values + * + * bytes_to_send / bytes_to_receive & data_source */ if (em->bytes_to_receive == (u64) ~ 0) em->bytes_to_receive = 64 << 10; /* default */ if (em->bytes_to_send == (u64) ~ 0) @@ -1828,7 +1956,17 @@ quic_echo_process_opts (int argc, char **argv) if (em->data_source == ECHO_RX_DATA_SOURCE) em->bytes_to_send = em->bytes_to_receive; - em->send_disconnects = !em->i_am_master; + /* disconnect flags */ + if (em->i_am_master) + default_f_active = + em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE; + else + default_f_active = + em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE; + if (em->send_stream_disconnects == ECHO_CLOSE_F_INVALID) + em->send_stream_disconnects = default_f_active; + if (em->send_quic_disconnects == ECHO_CLOSE_F_INVALID) + em->send_quic_disconnects = default_f_active; } int @@ -1852,6 +1990,7 @@ main (int argc, char **argv) em->max_test_msg = 50; em->time_to_stop = 0; em->i_am_master = 1; + em->n_rx_threads = 4; em->test_return_packets = RETURN_PACKETS_NOTEST; em->timing_start_event = ECHO_EVT_FIRST_QCONNECT; em->timing_end_event = ECHO_EVT_LAST_BYTE; @@ -1865,7 +2004,7 @@ main (int argc, char **argv) quic_echo_process_opts (argc, argv); n_clients = em->n_clients * em->n_stream_clients; - vec_validate (em->client_thread_handles, n_clients - 1); + vec_validate (em->client_thread_handles, em->n_rx_threads - 1); vec_validate (em->thread_args, n_clients - 1); clib_time_init (&em->clib_time); init_error_string_table (em); diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index 8732b7dac72..f38a7ca30a6 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -306,6 +306,14 @@ quic_ctx_get (u32 ctx_index, u32 thread_index) return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index); } +static quic_ctx_t * +quic_ctx_get_if_valid (u32 ctx_index, u32 thread_index) +{ + if (pool_is_free_index (quic_main.ctx_pool[thread_index], ctx_index)) + return 0; + return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index); +} + static quic_ctx_t * quic_get_conn_ctx (quicly_conn_t * conn) { @@ -1494,7 +1502,9 @@ quic_connect (transport_endpoint_cfg_t * tep) static void quic_proto_on_close (u32 ctx_index, u32 thread_index) { - quic_ctx_t *ctx = quic_ctx_get (ctx_index, thread_index); + quic_ctx_t *ctx = quic_ctx_get_if_valid (ctx_index, thread_index); + if (!ctx) + return; #if QUIC_DEBUG >= 2 session_t *stream_session = session_get (ctx->c_s_index, ctx->c_thread_index); -- cgit 1.2.3-korg