From 46eb1950a13b7b01afcc83cb3d8ce59012dfee46 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Mon, 22 Jul 2019 16:30:40 +0200 Subject: quic: Improve quic echo connect threading Type: fix Change-Id: I64f90139ad70e722f1ecbc4e0c6c1e723ec0f054 Signed-off-by: Nathan Skrzypczak --- src/plugins/hs_apps/sapi/echo_common.c | 158 +++++ src/plugins/hs_apps/sapi/quic_echo.c | 1183 +++++++++++++------------------- src/plugins/hs_apps/sapi/quic_echo.h | 246 +++++++ src/plugins/quic/quic.c | 15 +- 4 files changed, 870 insertions(+), 732 deletions(-) create mode 100644 src/plugins/hs_apps/sapi/echo_common.c create mode 100644 src/plugins/hs_apps/sapi/quic_echo.h diff --git a/src/plugins/hs_apps/sapi/echo_common.c b/src/plugins/hs_apps/sapi/echo_common.c new file mode 100644 index 00000000000..a5c28145ba8 --- /dev/null +++ b/src/plugins/hs_apps/sapi/echo_common.c @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +typedef struct +{ + /* VNET_API_ERROR_FOO -> "Foo" hash table */ + uword *error_string_by_error_number; +} echo_common_main_t; + +echo_common_main_t echo_common_main; + +u8 * +format_ip4_address (u8 * s, va_list * args) +{ + u8 *a = va_arg (*args, u8 *); + return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]); +} + +u8 * +format_ip6_address (u8 * s, va_list * args) +{ + ip6_address_t *a = va_arg (*args, ip6_address_t *); + u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon; + + i_max_n_zero = ARRAY_LEN (a->as_u16); + max_n_zeros = 0; + i_first_zero = i_max_n_zero; + n_zeros = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + u32 is_zero = a->as_u16[i] == 0; + if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16)) + { + i_first_zero = i; + n_zeros = 0; + } + n_zeros += is_zero; + if ((!is_zero && n_zeros > max_n_zeros) + || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros)) + { + i_max_n_zero = i_first_zero; + max_n_zeros = n_zeros; + i_first_zero = ARRAY_LEN (a->as_u16); + n_zeros = 0; + } + } + + last_double_colon = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + if (i == i_max_n_zero && max_n_zeros > 1) + { + s = format (s, "::"); + i += max_n_zeros - 1; + last_double_colon = 1; + } + else + { + s = format (s, "%s%x", + (last_double_colon || i == 0) ? "" : ":", + clib_net_to_host_u16 (a->as_u16[i])); + last_double_colon = 0; + } + } + + return s; +} + +/* Format an IP46 address. */ +u8 * +format_ip46_address (u8 * s, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + ip46_type_t type = va_arg (*args, ip46_type_t); + int is_ip4 = 1; + + switch (type) + { + case IP46_TYPE_ANY: + is_ip4 = ip46_address_is_ip4 (ip46); + break; + case IP46_TYPE_IP4: + is_ip4 = 1; + break; + case IP46_TYPE_IP6: + is_ip4 = 0; + break; + } + + return is_ip4 ? + format (s, "%U", format_ip4_address, &ip46->ip4) : + format (s, "%U", format_ip6_address, &ip46->ip6); +} + +static uword +unformat_data (unformat_input_t * input, va_list * args) +{ + u64 _a; + u64 *a = va_arg (*args, u64 *); + if (unformat (input, "%lluGb", &_a)) + *a = _a << 30; + else if (unformat (input, "%lluMb", &_a)) + *a = _a << 20; + else if (unformat (input, "%lluKb", &_a)) + *a = _a << 10; + else if (unformat (input, "%llu", a)) + ; + else + return 0; + return 1; +} + +static u8 * +format_api_error (u8 * s, va_list * args) +{ + echo_common_main_t *ecm = &echo_common_main; + i32 error = va_arg (*args, u32); + uword *p; + + p = hash_get (ecm->error_string_by_error_number, -error); + + if (p) + s = format (s, "%s", p[0]); + else + s = format (s, "%d", error); + return s; +} + +static void +init_error_string_table () +{ + echo_common_main_t *ecm = &echo_common_main; + ecm->error_string_by_error_number = hash_create (0, sizeof (uword)); + +#define _(n,v,s) hash_set (ecm->error_string_by_error_number, -v, s); + foreach_vnet_api_error; +#undef _ + + hash_set (ecm->error_string_by_error_number, 99, "Misc"); +} + diff --git a/src/plugins/hs_apps/sapi/quic_echo.c b/src/plugins/hs_apps/sapi/quic_echo.c index 17a9e113a00..312b703d7e1 100644 --- a/src/plugins/hs_apps/sapi/quic_echo.c +++ b/src/plugins/hs_apps/sapi/quic_echo.c @@ -16,337 +16,21 @@ #include #include -#include #include #include #include - -#define vl_typedefs /* define message structures */ -#include -#undef vl_typedefs - -/* declare message handlers for each api */ - -#define vl_endianfun /* define message structures */ -#include -#undef vl_endianfun - -/* instantiate all the print functions we know about */ -#define vl_print(handle, ...) -#define vl_printfun -#include -#undef vl_printfun - -#define CHECK(expected, result, _fmt, _args...) \ - if (expected != result) \ - ECHO_FAIL ("expected %d, got %d : " _fmt, expected, result, ##_args); - -#define ECHO_FAIL(_fmt,_args...) \ - { \ - echo_main_t *em = &echo_main; \ - em->has_failed = 1; \ - em->time_to_stop = 1; \ - if (em->log_lvl > 1) \ - clib_warning ("ECHO-ERROR: "_fmt, ##_args); \ - } - -#define ECHO_LOG(lvl, _fmt,_args...) \ - { \ - echo_main_t *em = &echo_main; \ - if (em->log_lvl > lvl) \ - clib_warning (_fmt, ##_args); \ - } - -typedef struct -{ - CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); -#define _(type, name) type name; - foreach_app_session_field -#undef _ - u64 vpp_session_handle; - u64 bytes_sent; - u64 bytes_to_send; - volatile u64 bytes_received; - volatile u64 bytes_to_receive; - f64 start; - u32 listener_index; /* listener index in echo session pool */ - u32 idle_cycles; /* consecutive enq/deq with no data */ - volatile u64 accepted_session_count; /* sessions we accepted */ -} echo_session_t; - -typedef enum -{ - ECHO_NO_DATA_SOURCE, - ECHO_TEST_DATA_SOURCE, - ECHO_RX_DATA_SOURCE, - ECHO_INVALID_DATA_SOURCE -} data_source_t; - -enum echo_close_f_t -{ - ECHO_CLOSE_F_INVALID = 0, - ECHO_CLOSE_F_PASSIVE, /* wait for close msg */ - ECHO_CLOSE_F_ACTIVE, /* send close msg */ - ECHO_CLOSE_F_NONE, /* don't bother sending close msg */ -}; - -enum quic_session_type_t -{ - QUIC_SESSION_TYPE_QUIC, - QUIC_SESSION_TYPE_STREAM, - QUIC_SESSION_TYPE_LISTEN, -}; - -enum quic_session_state_t -{ - QUIC_SESSION_STATE_INITIAL, - QUIC_SESSION_STATE_AWAIT_CLOSING, /* Data transfer is done, wait for close evt */ - QUIC_SESSION_STATE_AWAIT_DATA, /* Peer closed, wait for outstanding data */ - QUIC_SESSION_STATE_CLOSING, /* told vpp to close */ - QUIC_SESSION_STATE_CLOSED, /* closed in vpp */ -}; - -typedef enum -{ - STATE_START, - STATE_ATTACHED, - STATE_LISTEN, - STATE_READY, - STATE_DISCONNECTED, - STATE_DETACHED -} connection_state_t; - -typedef enum echo_test_evt_ -{ - ECHO_EVT_START = 1, /* app starts */ - ECHO_EVT_FIRST_QCONNECT = (1 << 1), /* First connect Quic session sent */ - ECHO_EVT_LAST_QCONNECTED = (1 << 2), /* All Quic session are connected */ - ECHO_EVT_FIRST_SCONNECT = (1 << 3), /* First connect Stream session sent */ - ECHO_EVT_LAST_SCONNECTED = (1 << 4), /* All Stream session are connected */ - ECHO_EVT_LAST_BYTE = (1 << 5), /* Last byte received */ - ECHO_EVT_EXIT = (1 << 6), /* app exits */ -} echo_test_evt_t; - -typedef struct _quic_echo_cb_vft -{ - void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index); - void (*client_stream_connected_cb) (session_connected_msg_t * mp, - u32 session_index); - void (*server_stream_connected_cb) (session_connected_msg_t * mp, - u32 session_index); - void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); - void (*client_stream_accepted_cb) (session_accepted_msg_t * mp, - u32 session_index); - void (*server_stream_accepted_cb) (session_accepted_msg_t * mp, - u32 session_index); -} quic_echo_cb_vft_t; - - -typedef enum -{ - RETURN_PACKETS_NOTEST, - RETURN_PACKETS_LOG_WRONG, - RETURN_PACKETS_ASSERT, -} test_return_packets_t; - -typedef struct -{ - /* vpe input queue */ - svm_queue_t *vl_input_queue; - - /* API client handle */ - u32 my_client_index; - - /* The URI we're playing with */ - u8 *uri; - - /* Session pool */ - echo_session_t *sessions; - - /* Hash table for disconnect processing */ - uword *session_index_by_vpp_handles; - /* Index of vpp listener session */ - u32 listen_session_index; - - /* Hash table for shared segment_names */ - uword *shared_segment_handles; - clib_spinlock_t segment_handles_lock; - - int i_am_master; - - /* Our event queue */ - svm_msg_q_t *our_event_queue; - - u8 *socket_name; - - pid_t my_pid; - - /* For deadman timers */ - clib_time_t clib_time; - - /* State of the connection, shared between msg RX thread and main thread */ - volatile connection_state_t state; - - /* Signal variables */ - volatile u8 time_to_stop; - u8 has_failed; - - /* VNET_API_ERROR_FOO -> "Foo" hash table */ - uword *error_string_by_error_number; - - u8 *connect_test_data; - u8 test_return_packets; - u64 bytes_to_send; - u64 bytes_to_receive; - u32 fifo_size; - u32 rx_buf_size; - u32 tx_buf_size; - data_source_t data_source; - u8 send_quic_disconnects; /* actively send disconnect */ - u8 send_stream_disconnects; /* actively send disconnect */ - u8 output_json; - u8 log_lvl; - - u8 *appns_id; - u64 appns_flags; - u64 appns_secret; - - pthread_t *client_thread_handles; - u32 *thread_args; - u32 n_clients; /* Target number of QUIC sessions */ - u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ - volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ - volatile u32 n_clients_connected; /* Number of STREAM sessions connected */ - u32 n_rx_threads; /* Number of data threads */ - - u64 tx_total; - u64 rx_total; - - /* Event based timing : start & end depend on CLI specified events */ - u8 events_sent; - f64 start_time; - f64 end_time; - u8 timing_start_event; - u8 timing_end_event; - - /* cb vft for QUIC scenarios */ - quic_echo_cb_vft_t cb_vft; - - /** Flag that decides if socket, instead of svm, api is used to connect to - * vpp. If sock api is used, shm binary api is subsequently bootstrapped - * and all other messages are exchanged using shm IPC. */ - u8 use_sock_api; - - /* Limit the number of incorrect data messages */ - int max_test_msg; - - fifo_segment_main_t segment_main; -} echo_main_t; +#include +#include echo_main_t echo_main; -#if CLIB_DEBUG > 0 -#define NITER 10000 -#else -#define NITER 4000000 -#endif - -#if CLIB_DEBUG > 0 -#define TIMEOUT 10.0 -#else -#define TIMEOUT 10.0 -#endif - /* * * Format functions * */ -u8 * -format_ip4_address (u8 * s, va_list * args) -{ - u8 *a = va_arg (*args, u8 *); - return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]); -} - -u8 * -format_ip6_address (u8 * s, va_list * args) -{ - ip6_address_t *a = va_arg (*args, ip6_address_t *); - u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon; - - i_max_n_zero = ARRAY_LEN (a->as_u16); - max_n_zeros = 0; - i_first_zero = i_max_n_zero; - n_zeros = 0; - for (i = 0; i < ARRAY_LEN (a->as_u16); i++) - { - u32 is_zero = a->as_u16[i] == 0; - if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16)) - { - i_first_zero = i; - n_zeros = 0; - } - n_zeros += is_zero; - if ((!is_zero && n_zeros > max_n_zeros) - || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros)) - { - i_max_n_zero = i_first_zero; - max_n_zeros = n_zeros; - i_first_zero = ARRAY_LEN (a->as_u16); - n_zeros = 0; - } - } - - last_double_colon = 0; - for (i = 0; i < ARRAY_LEN (a->as_u16); i++) - { - if (i == i_max_n_zero && max_n_zeros > 1) - { - s = format (s, "::"); - i += max_n_zeros - 1; - last_double_colon = 1; - } - else - { - s = format (s, "%s%x", - (last_double_colon || i == 0) ? "" : ":", - clib_net_to_host_u16 (a->as_u16[i])); - last_double_colon = 0; - } - } - - return s; -} - -/* Format an IP46 address. */ -u8 * -format_ip46_address (u8 * s, va_list * args) -{ - ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); - ip46_type_t type = va_arg (*args, ip46_type_t); - int is_ip4 = 1; - - switch (type) - { - case IP46_TYPE_ANY: - is_ip4 = ip46_address_is_ip4 (ip46); - break; - case IP46_TYPE_IP4: - is_ip4 = 1; - break; - case IP46_TYPE_IP6: - is_ip4 = 0; - break; - } - - return is_ip4 ? - format (s, "%U", format_ip4_address, &ip46->ip4) : - format (s, "%U", format_ip6_address, &ip46->ip6); -} - u8 * format_quic_echo_state (u8 * s, va_list * args) { @@ -359,6 +43,8 @@ format_quic_echo_state (u8 * s, va_list * args) return format (s, "STATE_LISTEN"); if (state == STATE_READY) return format (s, "STATE_READY"); + if (state == STATE_DATA_DONE) + return format (s, "STATE_DATA_DONE"); if (state == STATE_DISCONNECTED) return format (s, "STATE_DISCONNECTED"); if (state == STATE_DETACHED) @@ -367,22 +53,6 @@ format_quic_echo_state (u8 * s, va_list * args) return format (s, "unknown state"); } -static u8 * -format_api_error (u8 * s, va_list * args) -{ - echo_main_t *em = &echo_main; - i32 error = va_arg (*args, u32); - uword *p; - - p = hash_get (em->error_string_by_error_number, -error); - - if (p) - s = format (s, "%s", p[0]); - else - s = format (s, "%d", error); - return s; -} - static uword unformat_close (unformat_input_t * input, va_list * args) { @@ -398,35 +68,10 @@ unformat_close (unformat_input_t * input, va_list * args) return 1; } -static uword -unformat_data (unformat_input_t * input, va_list * args) -{ - u64 _a; - u64 *a = va_arg (*args, u64 *); - if (unformat (input, "%lluGb", &_a)) - { - *a = _a << 30; - return 1; - } - else if (unformat (input, "%lluMb", &_a)) - { - *a = _a << 20; - return 1; - } - else if (unformat (input, "%lluKb", &_a)) - { - *a = _a << 10; - return 1; - } - else if (unformat (input, "%llu", a)) - return 1; - return 0; -} - static uword echo_unformat_timing_event (unformat_input_t * input, va_list * args) { - echo_test_evt_t *a = va_arg (*args, echo_test_evt_t *); + u8 *a = va_arg (*args, u8 *); if (unformat (input, "start")) *a = ECHO_EVT_START; else if (unformat (input, "qconnected")) @@ -468,34 +113,107 @@ echo_format_timing_event (u8 * s, va_list * args) return format (s, "unknown timing event"); } -static void -init_error_string_table (echo_main_t * em) -{ - em->error_string_by_error_number = hash_create (0, sizeof (uword)); - -#define _(n,v,s) hash_set (em->error_string_by_error_number, -v, s); - foreach_vnet_api_error; -#undef _ - - hash_set (em->error_string_by_error_number, 99, "Misc"); -} - /* * * End of format functions * */ -static echo_session_t * -echo_session_alloc (echo_main_t * em) +static void +echo_session_prealloc (echo_main_t * em) { + /* We need to prealloc to avoid vec resize in threads */ echo_session_t *session; - pool_get (em->sessions, session); - clib_memset (session, 0, sizeof (*session)); - session->session_index = session - em->sessions; - session->listener_index = SESSION_INVALID_INDEX; - session->session_state = QUIC_SESSION_STATE_INITIAL; - return session; + int n_sessions = em->n_clients * (em->n_stream_clients + 1) + + em->i_am_master; + int i; + for (i = 0; i < n_sessions; i++) + { + pool_get (em->sessions, session); + clib_memset (session, 0, sizeof (*session)); + session->session_index = session - em->sessions; + session->listener_index = SESSION_INVALID_INDEX; + session->session_state = QUIC_SESSION_STATE_INITIAL; + } +} + +static echo_session_t * +echo_session_new (echo_main_t * em) +{ + /* thread safe new prealloced session */ + return pool_elt_at_index (em->sessions, + clib_atomic_fetch_add (&em->nxt_available_sidx, + 1)); +} + + +static int +echo_send_rpc (echo_main_t * em, void *fp, void *arg, u32 opaque) +{ + svm_msg_q_msg_t msg; + echo_rpc_msg_t *evt; + if (PREDICT_FALSE (svm_msg_q_lock (em->rpc_msq_queue))) + { + ECHO_LOG (1, "RPC lock failed"); + return -1; + } + if (PREDICT_FALSE (svm_msg_q_ring_is_full (em->rpc_msq_queue, 0))) + { + svm_msg_q_unlock (em->rpc_msq_queue); + ECHO_LOG (1, "RPC ring is full"); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (em->rpc_msq_queue, 0); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + ECHO_LOG (1, "RPC msg is invalid"); + svm_msg_q_unlock (em->rpc_msq_queue); + return -2; + } + evt = (echo_rpc_msg_t *) svm_msg_q_msg_data (em->rpc_msq_queue, &msg); + evt->arg = arg; + evt->opaque = opaque; + evt->fp = fp; + + svm_msg_q_add_and_unlock (em->rpc_msq_queue, &msg); + return 0; +} + +static inline void +echo_segment_handle_add_del (echo_main_t * em, u64 segment_handle, u8 add) +{ + clib_spinlock_lock (&em->segment_handles_lock); + if (add) + hash_set (em->shared_segment_handles, segment_handle, 1); + else + hash_unset (em->shared_segment_handles, segment_handle); + clib_spinlock_unlock (&em->segment_handles_lock); +} + +static inline void +echo_session_handle_add_del (echo_main_t * em, u64 handle, u32 sid) +{ + clib_spinlock_lock (&em->sid_vpp_handles_lock); + if (sid == SESSION_INVALID_INDEX) + hash_unset (em->session_index_by_vpp_handles, handle); + else + hash_set (em->session_index_by_vpp_handles, handle, sid); + clib_spinlock_unlock (&em->sid_vpp_handles_lock); +} + +static inline echo_session_t * +echo_get_session_from_handle (echo_main_t * em, u64 handle) +{ + uword *p; + clib_spinlock_lock (&em->sid_vpp_handles_lock); + p = hash_get (em->session_index_by_vpp_handles, handle); + clib_spinlock_unlock (&em->sid_vpp_handles_lock); + if (!p) + { + ECHO_FAIL ("unknown handle 0x%lx", handle); + return 0; + } + return pool_elt_at_index (em->sessions, p[0]); } /* @@ -596,12 +314,12 @@ server_send_unbind (echo_main_t * em) } static void -echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque) +echo_send_connect (u8 * uri, u32 opaque) { + echo_main_t *em = &echo_main; vl_api_connect_uri_t *cmp; cmp = vl_msg_api_alloc (sizeof (*cmp)); clib_memset (cmp, 0, sizeof (*cmp)); - cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI); cmp->client_index = em->my_client_index; cmp->context = ntohl (opaque); @@ -610,8 +328,9 @@ echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque) } static void -echo_disconnect_session (echo_main_t * em, echo_session_t * s) +echo_disconnect_session (echo_session_t * s, u32 opaque) { + echo_main_t *em = &echo_main; vl_api_disconnect_session_t *dmp; dmp = vl_msg_api_alloc (sizeof (*dmp)); clib_memset (dmp, 0, sizeof (*dmp)); @@ -653,37 +372,39 @@ wait_for_segment_allocation (u64 segment_handle) static void quic_echo_notify_event (echo_main_t * em, echo_test_evt_t e) { - if (em->events_sent & e) + if (em->timing.events_sent & e) return; - if (em->timing_start_event == e) - em->start_time = clib_time_now (&em->clib_time); - else if (em->timing_end_event == e) - em->end_time = clib_time_now (&em->clib_time); - em->events_sent |= e; + if (em->timing.start_event == e) + em->timing.start_time = clib_time_now (&em->clib_time); + else if (em->timing.end_event == e) + em->timing.end_time = clib_time_now (&em->clib_time); + em->timing.events_sent |= e; } static void echo_assert_test_suceeded (echo_main_t * em) { - CHECK (em->rx_total, - em->n_stream_clients * em->n_clients * em->bytes_to_receive, - "Not enough data received"); - CHECK (em->tx_total, - em->n_stream_clients * em->n_clients * em->bytes_to_send, - "Not enough data sent"); + CHECK (em->n_stream_clients * em->n_clients * em->bytes_to_receive, + em->stats.rx_total, "Not enough data received"); + CHECK (em->n_stream_clients * em->n_clients * em->bytes_to_send, + em->stats.tx_total, "Not enough data sent"); + clib_spinlock_lock (&em->sid_vpp_handles_lock); CHECK (0, hash_elts (em->session_index_by_vpp_handles), "Some sessions are still open"); + clib_spinlock_unlock (&em->sid_vpp_handles_lock); } always_inline void echo_session_dequeue_notify (echo_session_t * s) { int rv; - rv = app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index, - SESSION_IO_EVT_RX, SVM_Q_WAIT); - svm_fifo_clear_deq_ntf (s->rx_fifo); - if (rv) + if (!svm_fifo_set_event (s->rx_fifo)) + return; + if ((rv = + app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index, + SESSION_IO_EVT_RX, SVM_Q_WAIT))) ECHO_FAIL ("app_send_io_evt_to_vpp errored %d", rv); + svm_fifo_clear_deq_ntf (s->rx_fifo); } static int @@ -709,17 +430,8 @@ ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd) static void stop_signal (int signum) { - echo_main_t *um = &echo_main; - um->time_to_stop = 1; -} - -static clib_error_t * -setup_signal_handlers (void) -{ - signal (SIGINT, stop_signal); - signal (SIGQUIT, stop_signal); - signal (SIGTERM, stop_signal); - return 0; + echo_main_t *em = &echo_main; + em->time_to_stop = 1; } int @@ -778,21 +490,31 @@ echo_event_didnt_happen (u8 e) static void print_global_json_stats (echo_main_t * em) { - if (!(em->events_sent & em->timing_start_event)) - return echo_event_didnt_happen (em->timing_start_event); - if (!(em->events_sent & em->timing_end_event)) - return echo_event_didnt_happen (em->timing_end_event); - f64 deltat = em->end_time - em->start_time; + if (!(em->timing.events_sent & em->timing.start_event)) + return echo_event_didnt_happen (em->timing.start_event); + if (!(em->timing.events_sent & em->timing.end_event)) + return echo_event_didnt_happen (em->timing.end_event); + f64 deltat = em->timing.end_time - em->timing.start_time; u8 *start_evt = - format (0, "%U", echo_format_timing_event, em->timing_start_event); + format (0, "%U", echo_format_timing_event, em->timing.start_event); u8 *end_evt = - format (0, "%U", echo_format_timing_event, em->timing_end_event); + format (0, "%U", echo_format_timing_event, em->timing.end_event); fformat (stdout, "{\n"); fformat (stdout, "\"time\": \"%.9f\",\n", deltat); fformat (stdout, "\"start_evt\": \"%s\",\n", start_evt); fformat (stdout, "\"end_evt\": \"%s\",\n", end_evt); - fformat (stdout, "\"rx_data\": %lld,\n", em->rx_total); - fformat (stdout, "\"tx_rx\": %lld,\n", em->tx_total); + fformat (stdout, "\"rx_data\": %lld,\n", em->stats.rx_total); + fformat (stdout, "\"tx_rx\": %lld,\n", em->stats.tx_total); + fformat (stdout, "\"closing\": {\n"); + fformat (stdout, " \"reset\": { \"q\": %d, \"s\": %d },\n", + em->stats.reset_count.q, em->stats.reset_count.s); + fformat (stdout, " \"close\": { \"q\": %d, \"s\": %d },\n", + em->stats.close_count.q, em->stats.close_count.s); + fformat (stdout, " \"active\": { \"q\": %d, \"s\": %d },\n", + em->stats.active_count.q, em->stats.active_count.s); + fformat (stdout, " \"clean\": { \"q\": %d, \"s\": %d }\n", + em->stats.clean_count.q, em->stats.clean_count.s); + fformat (stdout, "}\n"); fformat (stdout, "}\n"); } @@ -800,26 +522,36 @@ static void print_global_stats (echo_main_t * em) { u8 *s; - if (!(em->events_sent & em->timing_start_event)) - return echo_event_didnt_happen (em->timing_start_event); - if (!(em->events_sent & em->timing_end_event)) - return echo_event_didnt_happen (em->timing_end_event); - f64 deltat = em->end_time - em->start_time; + if (!(em->timing.events_sent & em->timing.start_event)) + return echo_event_didnt_happen (em->timing.start_event); + if (!(em->timing.events_sent & em->timing.end_event)) + return echo_event_didnt_happen (em->timing.end_event); + f64 deltat = em->timing.end_time - em->timing.start_time; s = format (0, "%U:%U", - echo_format_timing_event, em->timing_start_event, - echo_format_timing_event, em->timing_end_event); + echo_format_timing_event, em->timing.start_event, + echo_format_timing_event, em->timing.end_event); fformat (stdout, "Timing %s\n", s); fformat (stdout, "-------- TX --------\n"); fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n", - em->tx_total, em->tx_total / (1ULL << 20), - em->tx_total / (1ULL << 30), deltat); - fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9); + em->stats.tx_total, em->stats.tx_total / (1ULL << 20), + em->stats.tx_total / (1ULL << 30), deltat); + fformat (stdout, "%.4f Gbit/second\n", + (em->stats.tx_total * 8.0) / deltat / 1e9); fformat (stdout, "-------- RX --------\n"); fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n", - em->rx_total, em->rx_total / (1ULL << 20), - em->rx_total / (1ULL << 30), deltat); - fformat (stdout, "%.4f Gbit/second\n", (em->rx_total * 8.0) / deltat / 1e9); + em->stats.rx_total, em->stats.rx_total / (1ULL << 20), + em->stats.rx_total / (1ULL << 30), deltat); + fformat (stdout, "%.4f Gbit/second\n", + (em->stats.rx_total * 8.0) / deltat / 1e9); fformat (stdout, "--------------------\n"); + fformat (stdout, "Received close on %dQ %dS\n", em->stats.close_count.q, + em->stats.close_count.s); + fformat (stdout, "Received reset on %dQ %dS\n", em->stats.reset_count.q, + em->stats.reset_count.s); + fformat (stdout, "Sent close on %dQ %dS\n", em->stats.active_count.q, + em->stats.active_count.s); + fformat (stdout, "Discarded %dQ %dS\n", em->stats.clean_count.q, + em->stats.clean_count.s); } static void @@ -840,7 +572,8 @@ echo_free_sessions (echo_main_t * em) { /* Free session */ s = pool_elt_at_index (em->sessions, *session_index); - ECHO_LOG (1, "Freeing session 0x%lx", s->vpp_session_handle); + echo_session_handle_add_del (em, s->vpp_session_handle, + SESSION_INVALID_INDEX); pool_put (em->sessions, s); clib_memset (s, 0xfe, sizeof (*s)); } @@ -849,45 +582,36 @@ echo_free_sessions (echo_main_t * em) static void echo_cleanup_session (echo_main_t * em, echo_session_t * s) { - u64 c; echo_session_t *ls; - if (s->listener_index != SESSION_INVALID_INDEX) + ASSERT (s->session_state < QUIC_SESSION_STATE_CLOSED); + if (s->session_type == QUIC_SESSION_TYPE_QUIC) + { + clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1); + } + else if (s->session_type == QUIC_SESSION_TYPE_STREAM) { ls = pool_elt_at_index (em->sessions, s->listener_index); - c = clib_atomic_sub_fetch (&ls->accepted_session_count, 1); - if (c == 0 && ls->session_type == QUIC_SESSION_TYPE_QUIC) + ASSERT (ls->session_type == QUIC_SESSION_TYPE_QUIC); + if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1)) { if (em->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE) - echo_disconnect_session (em, ls); + { + echo_send_rpc (em, echo_disconnect_session, (void *) ls, 0); + em->stats.active_count.q++; + } else if (em->send_quic_disconnects == ECHO_CLOSE_F_NONE) - echo_cleanup_session (em, ls); + { + echo_cleanup_session (em, ls); + em->stats.clean_count.q++; + } } + clib_atomic_sub_fetch (&em->n_clients_connected, 1); } - if (s->session_type == QUIC_SESSION_TYPE_QUIC) - clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1); - else if (s->session_type == QUIC_SESSION_TYPE_STREAM) - clib_atomic_sub_fetch (&em->n_clients_connected, 1); - ECHO_LOG (1, "Cleanup sessions (still %uQ %uS)", em->n_quic_clients_connected, em->n_clients_connected); - hash_unset (em->session_index_by_vpp_handles, s->vpp_session_handle); s->session_state = QUIC_SESSION_STATE_CLOSED; } -static void -echo_initiate_session_close (echo_main_t * em, echo_session_t * s, u8 active) -{ - if (s->session_type == QUIC_SESSION_TYPE_STREAM) - { - if (!active && s->bytes_to_receive) - s->session_state = QUIC_SESSION_STATE_AWAIT_DATA; - else - s->session_state = QUIC_SESSION_STATE_CLOSING; - } - else - echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */ -} - static void echo_initiate_qsession_close_no_stream (echo_main_t * em) { @@ -898,16 +622,23 @@ echo_initiate_qsession_close_no_stream (echo_main_t * em) /* *INDENT-OFF* */ pool_foreach (s, em->sessions, ({ - if (s->session_type == QUIC_SESSION_TYPE_QUIC && s->accepted_session_count == 0) + if (s->session_type == QUIC_SESSION_TYPE_QUIC) { ECHO_LOG (1,"ACTIVE close 0x%lx", s->vpp_session_handle); if (em->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE) - echo_disconnect_session (em, s); + { + echo_send_rpc (em, echo_disconnect_session, (void *) s, 0); + em->stats.active_count.q++; + } else if (em->send_quic_disconnects == ECHO_CLOSE_F_NONE) - echo_cleanup_session (em, s); + { + echo_cleanup_session (em, s); + em->stats.clean_count.q++; + } } })); /* *INDENT-ON* */ + em->state = STATE_DATA_DONE; } static void @@ -975,23 +706,54 @@ mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len) return n_sent; } +static void +echo_update_count_on_session_close (echo_main_t * em, echo_session_t * s) +{ + + ECHO_LOG (1, "[%lu/%lu] -> S(%x) -> [%lu/%lu]", + s->bytes_received, s->bytes_received + s->bytes_to_receive, + s->session_index, s->bytes_sent, + s->bytes_sent + s->bytes_to_send); + clib_atomic_fetch_add (&em->stats.tx_total, s->bytes_sent); + clib_atomic_fetch_add (&em->stats.rx_total, s->bytes_received); + + if (PREDICT_FALSE (em->stats.rx_total == + em->n_clients * em->n_stream_clients * + em->bytes_to_receive)) + quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); +} + +static inline void +echo_check_closed_listener (echo_main_t * em, echo_session_t * s) +{ + echo_session_t *ls; + /* if parent has died, terminate gracefully */ + ls = pool_elt_at_index (em->sessions, s->listener_index); + if (ls->session_state < QUIC_SESSION_STATE_CLOSING) + return; + ECHO_LOG (2, "Session 0%lx died, close child 0x%lx", ls->vpp_session_handle, + s->vpp_session_handle); + clib_atomic_sub_fetch (&em->n_clients_connected, 1); + em->stats.clean_count.s++; + echo_update_count_on_session_close (em, s); + s->session_state = QUIC_SESSION_STATE_CLOSED; +} + /* * Rx/Tx polling thread per connection */ static void -echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf) +echo_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf) { - int n_read, n_sent; + int n_read, n_sent = 0; n_read = recv_data_chunk (em, s, rx_buf); if (em->data_source == ECHO_TEST_DATA_SOURCE) - n_sent = - send_data_chunk (s, em->connect_test_data, - s->bytes_sent % em->tx_buf_size, em->tx_buf_size); + n_sent = send_data_chunk (s, em->connect_test_data, + s->bytes_sent % em->tx_buf_size, + em->tx_buf_size); else if (em->data_source == ECHO_RX_DATA_SOURCE) n_sent = mirror_data_chunk (em, s, rx_buf, n_read); - else - n_sent = 0; if (!s->bytes_to_send && !s->bytes_to_receive) { /* Session is done, need to close */ @@ -1001,16 +763,22 @@ echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf) { s->session_state = QUIC_SESSION_STATE_AWAIT_CLOSING; if (em->send_stream_disconnects == ECHO_CLOSE_F_ACTIVE) - echo_disconnect_session (em, s); + { + echo_send_rpc (em, echo_disconnect_session, (void *) s, 0); + em->stats.close_count.s++; + } else if (em->send_stream_disconnects == ECHO_CLOSE_F_NONE) - s->session_state = QUIC_SESSION_STATE_CLOSING; + { + s->session_state = QUIC_SESSION_STATE_CLOSING; + em->stats.clean_count.s++; + } } return; } + /* Check for idle clients */ if (em->log_lvl > 1) { - /* check idle clients */ if (n_sent || n_read) s->idle_cycles = 0; else if (s->idle_cycles++ == 1e7) @@ -1018,37 +786,24 @@ echo_thread_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf) s->idle_cycles = 0; ECHO_LOG (1, "Idle client TX:%dB RX:%dB", s->bytes_to_send, s->bytes_to_receive); - ECHO_LOG (1, "Idle FIFOs TX:%dB RX:%dB", + ECHO_LOG (1, "Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo), svm_fifo_max_dequeue (s->rx_fifo)); + ECHO_LOG (1, "Session 0x%lx state %u", s->vpp_session_handle, + s->session_state); } } } -static void -echo_thread_handle_closing (echo_main_t * em, echo_session_t * s) -{ - - ECHO_LOG (1, "[%lu/%lu] -> S(%x) -> [%lu/%lu]", - s->bytes_received, s->bytes_received + s->bytes_to_receive, - s->session_index, s->bytes_sent, - s->bytes_sent + s->bytes_to_send); - clib_atomic_fetch_add (&em->tx_total, s->bytes_sent); - clib_atomic_fetch_add (&em->rx_total, s->bytes_received); - - if (PREDICT_FALSE (em->rx_total == - em->n_clients * em->n_stream_clients * - em->bytes_to_receive)) - quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); - echo_cleanup_session (em, s); -} - static void * -echo_thread_fn (void *arg) +echo_data_thread_fn (void *arg) { + clib_mem_set_thread_index (); echo_main_t *em = &echo_main; - u32 thread_n_sessions = (u64) arg & 0xFFFFFFFF; - u32 session_first_idx = (u64) arg >> 32; + u32 N = em->n_clients * em->n_stream_clients; + u32 n = (N + em->n_rx_threads - 1) / em->n_rx_threads; + u32 idx = (u64) arg; + u32 thread_n_sessions = clib_min (n, N - n * idx); u32 i = 0; u32 n_closed_sessions = 0; @@ -1057,59 +812,37 @@ echo_thread_fn (void *arg) echo_session_t *s; vec_validate (rx_buf, em->rx_buf_size); - while (!em->time_to_stop && em->state != STATE_READY) - ; - - for (i = 0; !em->time_to_stop; i++) + for (i = 0; !em->time_to_stop; i = (i + 1) % thread_n_sessions) { - if (i % thread_n_sessions == 0) - n_closed_sessions = 0; - session_index = - em->thread_args[session_first_idx + i % thread_n_sessions]; + n_closed_sessions = i == 0 ? 0 : n_closed_sessions; + session_index = em->data_thread_args[n * idx + i]; + if (session_index == SESSION_INVALID_INDEX) + continue; s = pool_elt_at_index (em->sessions, session_index); - if (s->session_state == QUIC_SESSION_STATE_INITIAL - || s->session_state == QUIC_SESSION_STATE_AWAIT_DATA) - echo_thread_handle_data (em, s, rx_buf); - else if (s->session_state == QUIC_SESSION_STATE_CLOSING) - echo_thread_handle_closing (em, s); - else if (s->session_state == QUIC_SESSION_STATE_CLOSED) - n_closed_sessions++; + switch (s->session_state) + { + case QUIC_SESSION_STATE_READY: + case QUIC_SESSION_STATE_AWAIT_DATA: + echo_handle_data (em, s, rx_buf); + echo_check_closed_listener (em, s); + break; + case QUIC_SESSION_STATE_AWAIT_CLOSING: + echo_check_closed_listener (em, s); + break; + case QUIC_SESSION_STATE_CLOSING: + echo_update_count_on_session_close (em, s); + echo_cleanup_session (em, s); + break; + case QUIC_SESSION_STATE_CLOSED: + n_closed_sessions++; + break; + } if (n_closed_sessions == thread_n_sessions) break; } pthread_exit (0); } -static int -echo_start_rx_thread (u32 session_index) -{ - /* Each thread owns n consecutive sessions of the total N */ - - echo_main_t *em = &echo_main; - u32 N = em->n_clients * em->n_stream_clients; - u32 nc, n, first_idx, thread_sessions; - - n = (N + em->n_rx_threads - 1) / em->n_rx_threads; - nc = em->n_clients_connected; - - ASSERT (nc + 1 <= N); - em->thread_args[nc] = session_index; - - if ((nc + 1) % n == 0 || nc + 1 == N) - { - first_idx = n * (nc / n); - thread_sessions = (nc + 1) % n == 0 ? n : (nc + 1) % n; - ECHO_LOG (1, "Start thread %u [%u -> %u]", nc / n, first_idx, - first_idx + thread_sessions - 1); - return pthread_create (&em->client_thread_handles[nc / n], - NULL /*attr */ , echo_thread_fn, - (void *) ((u64) first_idx << 32 | (u64) - thread_sessions)); - } - - return 0; -} - static void session_bound_handler (session_bound_msg_t * mp) { @@ -1126,11 +859,9 @@ session_bound_handler (session_bound_msg_t * mp) clib_net_to_host_u16 (mp->lcl_port)); /* Allocate local session and set it up */ - listen_session = echo_session_alloc (em); + listen_session = echo_session_new (em); listen_session->session_type = QUIC_SESSION_TYPE_LISTEN; - listen_session->accepted_session_count = 0; - hash_set (em->session_index_by_vpp_handles, mp->handle, - listen_session->session_index); + echo_session_handle_add_del (em, mp->handle, listen_session->session_index); em->state = STATE_LISTEN; em->listen_session_index = listen_session->session_index; } @@ -1142,10 +873,9 @@ session_accepted_handler (session_accepted_msg_t * mp) session_accepted_reply_msg_t *rmp; svm_fifo_t *rx_fifo, *tx_fifo; echo_main_t *em = &echo_main; - echo_session_t *session, *listen_session; - uword *p; + echo_session_t *session, *ls; /* Allocate local session and set it up */ - session = echo_session_alloc (em); + session = echo_session_new (em); if (wait_for_segment_allocation (mp->segment_handle)) { @@ -1164,23 +894,14 @@ session_accepted_handler (session_accepted_msg_t * mp) session->start = clib_time_now (&em->clib_time); session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - - session->accepted_session_count = 0; - p = hash_get (em->session_index_by_vpp_handles, mp->listener_handle); - if (!p) - { - ECHO_FAIL ("unknown handle 0x%lx", mp->listener_handle); - return; - } - session->listener_index = p[0]; - listen_session = pool_elt_at_index (em->sessions, p[0]); - clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); + if (!(ls = echo_get_session_from_handle (em, mp->listener_handle))) + return; + session->listener_index = ls->session_index; /* Add it to lookup table */ ECHO_LOG (1, "Accepted session 0x%lx -> 0x%lx", mp->handle, mp->listener_handle); - hash_set (em->session_index_by_vpp_handles, mp->handle, - session->session_index); + echo_session_handle_add_del (em, mp->handle, session->session_index); app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY); @@ -1189,46 +910,37 @@ session_accepted_handler (session_accepted_msg_t * mp) rmp->context = mp->context; app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); - if (listen_session->session_type == QUIC_SESSION_TYPE_LISTEN) + if (ls->session_type == QUIC_SESSION_TYPE_LISTEN) { quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); session->session_type = QUIC_SESSION_TYPE_QUIC; + session->accepted_session_count = 0; if (em->cb_vft.quic_accepted_cb) em->cb_vft.quic_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } - else if (em->i_am_master) - { - session->session_type = QUIC_SESSION_TYPE_STREAM; - quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); - if (em->cb_vft.server_stream_accepted_cb) - em->cb_vft.server_stream_accepted_cb (mp, session->session_index); - clib_atomic_fetch_add (&em->n_clients_connected, 1); - } else { session->session_type = QUIC_SESSION_TYPE_STREAM; quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); - if (em->cb_vft.client_stream_accepted_cb) + clib_atomic_fetch_add (&ls->accepted_session_count, 1); + if (em->i_am_master && em->cb_vft.server_stream_accepted_cb) + em->cb_vft.server_stream_accepted_cb (mp, session->session_index); + if (!em->i_am_master && em->cb_vft.client_stream_accepted_cb) em->cb_vft.client_stream_accepted_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients && em->n_clients_connected != 0) - { - ECHO_LOG (1, "App is ready"); - em->state = STATE_READY; - quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); - } - if (em->n_quic_clients_connected == em->n_clients) + quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); + if (em->n_quic_clients_connected == em->n_clients + && em->state < STATE_READY) { quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); + em->state = STATE_READY; if (em->n_stream_clients == 0) - { - em->state = STATE_READY; - echo_initiate_qsession_close_no_stream (em); - } + echo_initiate_qsession_close_no_stream (em); } } @@ -1247,7 +959,7 @@ session_connected_handler (session_connected_msg_t * mp) return; } - session = echo_session_alloc (em); + session = echo_session_new (em); if (wait_for_segment_allocation (mp->segment_handle)) { ECHO_FAIL ("wait_for_segment_allocation errored"); @@ -1266,58 +978,42 @@ session_connected_handler (session_connected_msg_t * mp) session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - session->accepted_session_count = 0; - if (listener_index != SESSION_INVALID_INDEX) - { - listen_session = pool_elt_at_index (em->sessions, listener_index); - clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); - session->listener_index = listen_session->session_index; - } - - ECHO_LOG (1, "Connected session 0x%lx -> 0x%lx", mp->handle, - listener_index != - SESSION_INVALID_INDEX ? listen_session->vpp_session_handle : (u64) - ~ 0); - hash_set (em->session_index_by_vpp_handles, mp->handle, - session->session_index); + echo_session_handle_add_del (em, mp->handle, session->session_index); if (listener_index == SESSION_INVALID_INDEX) { + ECHO_LOG (1, "Connected session 0x%lx -> URI", mp->handle); session->session_type = QUIC_SESSION_TYPE_QUIC; + session->accepted_session_count = 0; if (em->cb_vft.quic_connected_cb) em->cb_vft.quic_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } - else if (em->i_am_master) - { - session->session_type = QUIC_SESSION_TYPE_STREAM; - if (em->cb_vft.server_stream_connected_cb) - em->cb_vft.server_stream_connected_cb (mp, session->session_index); - clib_atomic_fetch_add (&em->n_clients_connected, 1); - } else { + listen_session = pool_elt_at_index (em->sessions, listener_index); + session->listener_index = listener_index; + ECHO_LOG (1, "Connected session 0x%lx -> 0x%lx", mp->handle, + listen_session->vpp_session_handle); session->session_type = QUIC_SESSION_TYPE_STREAM; - if (em->cb_vft.client_stream_connected_cb) + clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); + if (em->i_am_master && em->cb_vft.server_stream_connected_cb) + em->cb_vft.server_stream_connected_cb (mp, session->session_index); + if (!em->i_am_master && em->cb_vft.client_stream_connected_cb) em->cb_vft.client_stream_connected_cb (mp, session->session_index); clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients && em->n_clients_connected != 0) - { - ECHO_LOG (1, "App is ready"); - em->state = STATE_READY; - quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); - } - if (em->n_quic_clients_connected == em->n_clients) + quic_echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED); + if (em->n_quic_clients_connected == em->n_clients + && em->state < STATE_READY) { quic_echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED); + em->state = STATE_READY; if (em->n_stream_clients == 0) - { - em->state = STATE_READY; - echo_initiate_qsession_close_no_stream (em); - } + echo_initiate_qsession_close_no_stream (em); } } @@ -1337,7 +1033,7 @@ echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index) quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); for (i = 0; i < em->n_stream_clients; i++) - echo_send_connect (em, uri, session_index); + echo_send_rpc (em, echo_send_connect, (void *) uri, session_index); ECHO_LOG (0, "Qsession 0x%llx connected to %U:%d", mp->handle, format_ip46_address, &mp->lcl.ip, @@ -1348,18 +1044,13 @@ static void echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; - int rv; echo_session_t *session; session = pool_elt_at_index (em->sessions, session_index); session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; - - if ((rv = echo_start_rx_thread (session_index))) - { - ECHO_FAIL ("pthread_create returned %d", rv); - return; - } + session->session_state = QUIC_SESSION_STATE_READY; + em->data_thread_args[em->n_clients_connected] = session->session_index; } static void @@ -1373,18 +1064,13 @@ static void echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; - int rv; echo_session_t *session; session = pool_elt_at_index (em->sessions, session_index); session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; - - if ((rv = echo_start_rx_thread (session_index))) - { - ECHO_FAIL ("pthread_create returned %d", rv); - return; - } + em->data_thread_args[em->n_clients_connected] = session->session_index; + session->session_state = QUIC_SESSION_STATE_READY; } static void @@ -1397,7 +1083,7 @@ echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index) quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); for (i = 0; i < em->n_stream_clients; i++) - echo_send_connect (em, uri, session_index); + echo_send_rpc (em, echo_send_connect, (void *) uri, session_index); } static void @@ -1467,17 +1153,11 @@ session_disconnected_handler (session_disconnected_msg_t * mp) session_disconnected_reply_msg_t *rmp; echo_main_t *em = &echo_main; echo_session_t *s; - uword *p; int rv = 0; ECHO_LOG (1, "passive close session 0x%lx", mp->handle); - p = hash_get (em->session_index_by_vpp_handles, mp->handle); - if (!p) - { - ECHO_FAIL ("couldn't find session key %llx", mp->handle); - return; - } + if (!(s = echo_get_session_from_handle (em, mp->handle))) + return; - s = pool_elt_at_index (em->sessions, p[0]); app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt, SESSION_CTRL_EVT_DISCONNECTED_REPLY); rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data; @@ -1486,9 +1166,20 @@ session_disconnected_handler (session_disconnected_msg_t * mp) rmp->context = mp->context; app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt); - echo_initiate_session_close (em, s, 0 /* active */ ); if (s->session_type == QUIC_SESSION_TYPE_STREAM) - session_print_stats (em, s); + { + session_print_stats (em, s); + if (s->bytes_to_receive || s->bytes_to_send) + s->session_state = QUIC_SESSION_STATE_AWAIT_DATA; + else + s->session_state = QUIC_SESSION_STATE_CLOSING; + em->stats.close_count.s++; + } + else + { + echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */ + em->stats.close_count.q++; + } } static void @@ -1497,28 +1188,29 @@ session_reset_handler (session_reset_msg_t * mp) app_session_evt_t _app_evt, *app_evt = &_app_evt; echo_main_t *em = &echo_main; session_reset_reply_msg_t *rmp; - echo_session_t *session = 0; - uword *p; + echo_session_t *s = 0; int rv = 0; ECHO_LOG (1, "Reset session 0x%lx", mp->handle); - p = hash_get (em->session_index_by_vpp_handles, mp->handle); - - if (!p) + if (!(s = echo_get_session_from_handle (em, mp->handle))) + return; + if (s->session_type == QUIC_SESSION_TYPE_STREAM) { - ECHO_FAIL ("couldn't find session key %llx", mp->handle); - return; + em->stats.reset_count.s++; + s->session_state = QUIC_SESSION_STATE_CLOSING; + } + else + { + em->stats.reset_count.q++; + echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */ } - session = pool_elt_at_index (em->sessions, p[0]); - /* Cleanup later */ - em->time_to_stop = 1; - app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt, SESSION_CTRL_EVT_RESET_REPLY); rmp = (session_reset_reply_msg_t *) app_evt->evt->data; rmp->retval = rv; rmp->handle = mp->handle; - app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt); } static void @@ -1552,47 +1244,68 @@ static int wait_for_state_change (echo_main_t * em, connection_state_t state, f64 timeout) { - svm_msg_q_msg_t msg; - session_event_t *e; f64 end_time = clib_time_now (&em->clib_time) + timeout; - while (!timeout || clib_time_now (&em->clib_time) < end_time) { if (em->state == state) return 0; - if (em->time_to_stop == 1) - return 0; - if (!em->our_event_queue || em->state < STATE_ATTACHED) - continue; - - if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0)) - continue; - e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (e); - svm_msg_q_free_msg (em->our_event_queue, &msg); + if (em->time_to_stop) + return 1; } ECHO_LOG (1, "timeout waiting for %U", format_quic_echo_state, state); return -1; } static void -echo_event_loop (echo_main_t * em) +echo_process_rpcs (echo_main_t * em) { + echo_rpc_msg_t *rpc; svm_msg_q_msg_t msg; + while (em->state < STATE_DATA_DONE && !em->time_to_stop) + { + if (svm_msg_q_sub (em->rpc_msq_queue, &msg, SVM_Q_TIMEDWAIT, 1)) + continue; + rpc = svm_msg_q_msg_data (em->rpc_msq_queue, &msg); + ((echo_rpc_t) rpc->fp) (rpc->arg, rpc->opaque); + svm_msg_q_free_msg (em->rpc_msq_queue, &msg); + } +} + +static void * +echo_mq_thread_fn (void *arg) +{ + clib_mem_set_thread_index (); + echo_main_t *em = &echo_main; session_event_t *e; + svm_msg_q_msg_t msg; + int rv; + wait_for_state_change (em, STATE_ATTACHED, 0); + if (em->state < STATE_ATTACHED || !em->our_event_queue) + { + ECHO_FAIL ("Application failed to attach"); + pthread_exit (0); + } ECHO_LOG (1, "Waiting for data on %u clients", em->n_clients_connected); - while (em->n_clients_connected | em->n_quic_clients_connected) + while (1) { - int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); - if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop)) + if (!(rv = svm_msg_q_sub (em->our_event_queue, + &msg, SVM_Q_TIMEDWAIT, 1))) + { + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + handle_mq_event (e); + svm_msg_q_free_msg (em->our_event_queue, &msg); + } + if (rv == ETIMEDOUT + && (em->time_to_stop || em->state == STATE_DETACHED)) break; - if (rc == ETIMEDOUT) - continue; - e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (e); - svm_msg_q_free_msg (em->our_event_queue, &msg); + if (!em->n_clients_connected && !em->n_quic_clients_connected && + em->state == STATE_READY) + { + em->state = STATE_DATA_DONE; + } } + pthread_exit (0); } static void @@ -1601,28 +1314,19 @@ clients_run (echo_main_t * em) u64 i; quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); for (i = 0; i < em->n_clients; i++) - echo_send_connect (em, em->uri, SESSION_INVALID_INDEX); - - if (wait_for_state_change (em, STATE_READY, TIMEOUT)) - { - ECHO_FAIL ("Timeout waiting for state ready"); - return; - } - - echo_event_loop (em); + echo_send_connect (em->uri, SESSION_INVALID_INDEX); + wait_for_state_change (em, STATE_READY, 0); + ECHO_LOG (1, "App is ready"); + echo_process_rpcs (em); } static void server_run (echo_main_t * em) { server_send_listen (em); - if (wait_for_state_change (em, STATE_READY, 0)) - { - ECHO_FAIL ("Timeout waiting for state ready"); - return; - } - echo_event_loop (em); - + wait_for_state_change (em, STATE_READY, 0); + ECHO_LOG (1, "App is ready"); + echo_process_rpcs (em); /* Cleanup */ server_send_unbind (em); if (wait_for_state_change (em, STATE_DISCONNECTED, TIMEOUT)) @@ -1638,7 +1342,6 @@ server_run (echo_main_t * em) * */ - static void vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * mp) @@ -1706,9 +1409,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * return; } } - clib_spinlock_lock (&em->segment_handles_lock); - hash_set (em->shared_segment_handles, segment_handle, 1); - clib_spinlock_unlock (&em->segment_handles_lock); + echo_segment_handle_add_del (em, segment_handle, 1 /* add */ ); ECHO_LOG (1, "Mapped segment 0x%lx", segment_handle); em->state = STATE_ATTACHED; @@ -1737,10 +1438,7 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp) { echo_main_t *em = &echo_main; u64 segment_handle = clib_net_to_host_u64 (mp->segment_handle); - - clib_spinlock_lock (&em->segment_handles_lock); - hash_unset (em->shared_segment_handles, segment_handle); - clib_spinlock_unlock (&em->segment_handles_lock); + echo_segment_handle_add_del (em, segment_handle, 0 /* add */ ); ECHO_LOG (1, "Unmaped segment 0x%lx", segment_handle); } @@ -1783,9 +1481,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) goto failed; } } - clib_spinlock_lock (&em->segment_handles_lock); - hash_set (em->shared_segment_handles, segment_handle, 1); - clib_spinlock_unlock (&em->segment_handles_lock); + echo_segment_handle_add_del (em, segment_handle, 1 /* add */ ); ECHO_LOG (1, "Mapped segment 0x%lx", segment_handle); return; @@ -1821,7 +1517,7 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) } em->state = STATE_DISCONNECTED; listen_session = pool_elt_at_index (em->sessions, em->listen_session_index); - echo_initiate_session_close (em, listen_session, 1 /* active */ ); + echo_cleanup_session (em, listen_session); } static void @@ -1830,7 +1526,6 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * { echo_main_t *em = &echo_main; echo_session_t *s; - uword *p; if (mp->retval) { @@ -1839,15 +1534,12 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * } ECHO_LOG (1, "Got disonnected reply for session 0x%lx", mp->handle); - p = hash_get (em->session_index_by_vpp_handles, mp->handle); - if (!p) - { - ECHO_FAIL ("couldn't find session key %llx", mp->handle); - return; - } - - s = pool_elt_at_index (em->sessions, p[0]); - echo_initiate_session_close (em, s, 1 /* active */ ); + if (!(s = echo_get_session_from_handle (em, mp->handle))) + return; + if (s->session_type == QUIC_SESSION_TYPE_STREAM) + s->session_state = QUIC_SESSION_STATE_CLOSING; + else + echo_cleanup_session (em, s); /* We can clean Q/Lsessions right away */ } static void @@ -1878,15 +1570,17 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp) if (mp->context == SESSION_INVALID_INDEX) { ECHO_LOG (1, "Retrying connect %s", em->uri); - echo_send_connect (em, em->uri, SESSION_INVALID_INDEX); + echo_send_rpc (em, echo_send_connect, (void *) em->uri, + SESSION_INVALID_INDEX); } else { session = pool_elt_at_index (em->sessions, mp->context); uri = format (0, "QUIC://session/%lu", session->vpp_session_handle); ECHO_LOG (1, "Retrying connect %s", uri); - echo_send_connect (em, uri, mp->context); + echo_send_rpc (em, echo_send_connect, (void *) uri, mp->context); } + } #define foreach_quic_echo_msg \ @@ -1957,6 +1651,7 @@ print_usage_and_exit (void) " exit - Exiting of the app\n" " json Output global stats in json\n" " log=N Set the log level to [0: no output, 1:errors, 2:log]\n" + " max-connects=N Don't do more than N parallel connect_uri\n" "\n" " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n" " nthreads N Use N busy loop threads for data [in addition to main & msg queue]\n" @@ -2047,8 +1742,8 @@ quic_echo_process_opts (int argc, char **argv) (a, "qclose=%U", unformat_close, &em->send_quic_disconnects)) ; else if (unformat (a, "time %U:%U", - echo_unformat_timing_event, &em->timing_start_event, - echo_unformat_timing_event, &em->timing_end_event)) + echo_unformat_timing_event, &em->timing.start_event, + echo_unformat_timing_event, &em->timing.end_event)) ; else print_usage_and_exit (); @@ -2091,13 +1786,16 @@ main (int argc, char **argv) echo_main_t *em = &echo_main; fifo_segment_main_t *sm = &em->segment_main; char *app_name; - u32 n_clients, i; + u64 n_clients, i; + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + u32 rpc_queue_size = 64 << 10; clib_mem_init_thread_safe (0, 256 << 20); clib_memset (em, 0, sizeof (*em)); em->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); + clib_spinlock_init (&em->sid_vpp_handles_lock); em->shared_segment_handles = hash_create (0, sizeof (uword)); - em->my_pid = getpid (); + clib_spinlock_init (&em->segment_handles_lock); em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0); em->use_sock_api = 1; em->fifo_size = 64 << 10; @@ -2108,8 +1806,8 @@ main (int argc, char **argv) em->i_am_master = 1; em->n_rx_threads = 4; em->test_return_packets = RETURN_PACKETS_NOTEST; - em->timing_start_event = ECHO_EVT_FIRST_QCONNECT; - em->timing_end_event = ECHO_EVT_LAST_BYTE; + em->timing.start_event = ECHO_EVT_FIRST_QCONNECT; + em->timing.end_event = ECHO_EVT_LAST_BYTE; em->bytes_to_receive = ~0; /* defaulted when we know if server/client */ em->bytes_to_send = ~0; /* defaulted when we know if server/client */ em->rx_buf_size = 1 << 20; @@ -2120,35 +1818,63 @@ main (int argc, char **argv) quic_echo_process_opts (argc, argv); n_clients = em->n_clients * em->n_stream_clients; - vec_validate (em->client_thread_handles, em->n_rx_threads - 1); - vec_validate (em->thread_args, n_clients - 1); + vec_validate (em->data_thread_handles, em->n_rx_threads - 1); + vec_validate (em->data_thread_args, n_clients - 1); + for (i = 0; i < n_clients; i++) + em->data_thread_args[i] = SESSION_INVALID_INDEX; clib_time_init (&em->clib_time); - init_error_string_table (em); + init_error_string_table (); fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20); - clib_spinlock_init (&em->segment_handles_lock); vec_validate (em->connect_test_data, em->tx_buf_size); for (i = 0; i < em->tx_buf_size; i++) em->connect_test_data[i] = i & 0xff; - setup_signal_handlers (); + /* *INDENT-OFF* */ + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {rpc_queue_size, sizeof (echo_rpc_msg_t), 0}, + }; + /* *INDENT-ON* */ + cfg->consumer_pid = getpid (); + cfg->n_rings = 1; + cfg->q_nitems = rpc_queue_size; + cfg->ring_cfgs = rc; + em->rpc_msq_queue = svm_msg_q_alloc (cfg); + + signal (SIGINT, stop_signal); + signal (SIGQUIT, stop_signal); + signal (SIGTERM, stop_signal); quic_echo_api_hookup (em); app_name = em->i_am_master ? "quic_echo_server" : "quic_echo_client"; if (connect_to_vpp (app_name) < 0) { svm_region_exit (); - ECHO_FAIL ("ECHO-ERROR: Couldn't connect to vpe, exiting...\n"); + ECHO_FAIL ("Couldn't connect to vpe, exiting...\n"); exit (1); } + echo_session_prealloc (em); quic_echo_notify_event (em, ECHO_EVT_START); application_send_attach (em); if (wait_for_state_change (em, STATE_ATTACHED, TIMEOUT)) { - ECHO_FAIL ("ECHO-ERROR: Couldn't attach to vpp, exiting...\n"); + ECHO_FAIL ("Couldn't attach to vpp, did you run ?\n"); + exit (1); + } + if (pthread_create (&em->mq_thread_handle, + NULL /*attr */ , echo_mq_thread_fn, 0)) + { + ECHO_FAIL ("pthread create errored\n"); exit (1); } + for (i = 0; i < em->n_rx_threads; i++) + if (pthread_create (&em->data_thread_handles[i], + NULL /*attr */ , echo_data_thread_fn, (void *) i)) + { + ECHO_FAIL ("pthread create errored\n"); + exit (1); + } if (em->i_am_master) server_run (em); else @@ -2166,6 +1892,13 @@ main (int argc, char **argv) ECHO_FAIL ("ECHO-ERROR: Couldn't detach from vpp, exiting...\n"); exit (1); } + int *rv; + pthread_join (em->mq_thread_handle, (void **) &rv); + if (rv) + { + ECHO_FAIL ("mq pthread errored %d", rv); + exit (1); + } if (em->use_sock_api) vl_socket_client_disconnect (); else diff --git a/src/plugins/hs_apps/sapi/quic_echo.h b/src/plugins/hs_apps/sapi/quic_echo.h new file mode 100644 index 00000000000..aec56d077f0 --- /dev/null +++ b/src/plugins/hs_apps/sapi/quic_echo.h @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#define vl_typedefs /* define message structures */ +#include +#undef vl_typedefs + +/* declare message handlers for each api */ + +#define vl_endianfun /* define message structures */ +#include +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) +#define vl_printfun +#include +#undef vl_printfun + +#define NITER 4000000 +#define TIMEOUT 10.0 + +#define CHECK(expected, result, _fmt, _args...) \ + if (expected != result) \ + ECHO_FAIL ("expected %d, got %d : " _fmt, expected, result, ##_args); + +#define ECHO_FAIL(_fmt,_args...) \ + { \ + echo_main_t *em = &echo_main; \ + em->has_failed = 1; \ + em->time_to_stop = 1; \ + if (em->log_lvl > 0) \ + clib_warning ("ECHO-ERROR: "_fmt, ##_args); \ + } + +#define ECHO_LOG(lvl, _fmt,_args...) \ + { \ + echo_main_t *em = &echo_main; \ + if (em->log_lvl > lvl) \ + clib_warning (_fmt, ##_args); \ + } + +typedef struct +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); +#define _(type, name) type name; + foreach_app_session_field +#undef _ + u64 vpp_session_handle; + u64 bytes_sent; + u64 bytes_to_send; + volatile u64 bytes_received; + volatile u64 bytes_to_receive; + f64 start; + u32 listener_index; /* listener index in echo session pool */ + u32 idle_cycles; /* consecutive enq/deq with no data */ + volatile u64 accepted_session_count; /* sessions we accepted */ +} echo_session_t; + +typedef enum +{ + ECHO_NO_DATA_SOURCE, + ECHO_TEST_DATA_SOURCE, + ECHO_RX_DATA_SOURCE, + ECHO_INVALID_DATA_SOURCE +} data_source_t; + +enum echo_close_f_t +{ + ECHO_CLOSE_F_INVALID = 0, + ECHO_CLOSE_F_PASSIVE, /* wait for close msg */ + ECHO_CLOSE_F_ACTIVE, /* send close msg */ + ECHO_CLOSE_F_NONE, /* don't bother sending close msg */ +}; + +enum quic_session_type_t +{ + QUIC_SESSION_TYPE_QUIC, + QUIC_SESSION_TYPE_STREAM, + QUIC_SESSION_TYPE_LISTEN, +}; + +enum quic_session_state_t +{ + QUIC_SESSION_STATE_INITIAL, + QUIC_SESSION_STATE_READY, + QUIC_SESSION_STATE_AWAIT_CLOSING, /* Data transfer is done, wait for close evt */ + QUIC_SESSION_STATE_AWAIT_DATA, /* Peer closed, wait for outstanding data */ + QUIC_SESSION_STATE_CLOSING, /* told vpp to close */ + QUIC_SESSION_STATE_CLOSED, /* closed in vpp */ +}; + +typedef enum +{ + STATE_START, + STATE_ATTACHED, + STATE_LISTEN, + STATE_READY, + STATE_DATA_DONE, + STATE_DISCONNECTED, + STATE_DETACHED +} connection_state_t; + +typedef enum echo_test_evt_ +{ + ECHO_EVT_START = 1, /* app starts */ + ECHO_EVT_FIRST_QCONNECT = (1 << 1), /* First connect Quic session sent */ + ECHO_EVT_LAST_QCONNECTED = (1 << 2), /* All Quic session are connected */ + ECHO_EVT_FIRST_SCONNECT = (1 << 3), /* First connect Stream session sent */ + ECHO_EVT_LAST_SCONNECTED = (1 << 4), /* All Stream session are connected */ + ECHO_EVT_LAST_BYTE = (1 << 5), /* Last byte received */ + ECHO_EVT_EXIT = (1 << 6), /* app exits */ +} echo_test_evt_t; + +typedef struct _quic_echo_cb_vft +{ + void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index); + void (*client_stream_connected_cb) (session_connected_msg_t * mp, + u32 session_index); + void (*server_stream_connected_cb) (session_connected_msg_t * mp, + u32 session_index); + void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index); + void (*client_stream_accepted_cb) (session_accepted_msg_t * mp, + u32 session_index); + void (*server_stream_accepted_cb) (session_accepted_msg_t * mp, + u32 session_index); +} quic_echo_cb_vft_t; + + +typedef enum +{ + RETURN_PACKETS_NOTEST, + RETURN_PACKETS_LOG_WRONG, + RETURN_PACKETS_ASSERT, +} test_return_packets_t; + +typedef struct teardown_stat_ +{ + u32 q; /* quic sessions */ + u32 s; /* stream sessions */ +} teardown_stat_t; + +typedef struct +{ + svm_queue_t *vl_input_queue; /* vpe input queue */ + u32 my_client_index; /* API client handle */ + u8 *uri; /* The URI we're playing with */ + echo_session_t *sessions; /* Session pool */ + svm_msg_q_t *our_event_queue; /* Our event queue */ + clib_time_t clib_time; /* For deadman timers */ + u8 *socket_name; + int i_am_master; + u32 listen_session_index; /* Index of vpp listener session */ + + uword *session_index_by_vpp_handles; /* Hash table : quic_echo s_id -> vpp s_handle */ + clib_spinlock_t sid_vpp_handles_lock; /* Hash table lock */ + + uword *shared_segment_handles; /* Hash table : segment_names -> 1*/ + clib_spinlock_t segment_handles_lock; /* Hash table lock */ + quic_echo_cb_vft_t cb_vft; /* cb vft for QUIC scenarios */ + svm_msg_q_t *rpc_msq_queue; /* MQ between quic_echo threads */ + fifo_segment_main_t segment_main; + + /* State of the connection, shared between msg RX thread and main thread */ + volatile connection_state_t state; + volatile u8 time_to_stop; /* Signal variables */ + u8 has_failed; /* stores the exit code */ + + /** Flag that decides if socket, instead of svm, api is used to connect to + * vpp. If sock api is used, shm binary api is subsequently bootstrapped + * and all other messages are exchanged using shm IPC. */ + u8 use_sock_api; + + u8 *connect_test_data; + u8 test_return_packets; + u64 bytes_to_send; /* target per stream */ + u64 bytes_to_receive; /* target per stream */ + u32 fifo_size; + u32 rx_buf_size; + u32 tx_buf_size; + data_source_t data_source; /* Use no/dummy/mirrored data */ + u8 send_quic_disconnects; /* actively send disconnect */ + u8 send_stream_disconnects; /* actively send disconnect */ + u8 output_json; /* Output stats as JSON */ + u8 log_lvl; /* Verbosity of the logging */ + int max_test_msg; /* Limit the number of incorrect data messages */ + + u8 *appns_id; + u64 appns_flags; + u64 appns_secret; + + pthread_t *data_thread_handles; /* vec of data thread handles */ + pthread_t mq_thread_handle; /* Message queue thread handle */ + u32 *data_thread_args; + + u32 n_clients; /* Target number of QUIC sessions */ + u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ + volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ + volatile u32 n_clients_connected; /* Number of STREAM sessions connected */ + u32 n_rx_threads; /* Number of data threads */ + volatile u32 nxt_available_sidx; /* next unused prealloced session_index */ + + struct { + u64 tx_total; + u64 rx_total; + teardown_stat_t reset_count; /* received reset from vpp */ + teardown_stat_t close_count; /* received close from vpp */ + teardown_stat_t active_count; /* sent close to vpp */ + teardown_stat_t clean_count; /* cleaned up stale session */ + } stats; + + struct /* Event based timing : start & end depend on CLI specified events */ + { + f64 start_time; + f64 end_time; + u8 events_sent; + u8 start_event; + u8 end_event; + } timing; +} echo_main_t; + +typedef void (*echo_rpc_t) (void *arg, u32 opaque); + +typedef struct +{ + void *fp; + void *arg; + u32 opaque; +} echo_rpc_msg_t; diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index af482d9dd36..43ac87990da 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -44,7 +44,7 @@ quic_ctx_alloc (u32 thread_index) memset (ctx, 0, sizeof (quic_ctx_t)); ctx->c_thread_index = thread_index; - QUIC_DBG (1, "Allocated quic_ctx %u on thread %u", + QUIC_DBG (3, "Allocated quic_ctx %u on thread %u", ctx - qm->ctx_pool[thread_index], thread_index); return ctx - qm->ctx_pool[thread_index]; } @@ -634,8 +634,7 @@ quic_accept_stream (void *s) SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); - rv = app_worker_accept_notify (app_wrk, stream_session); - if (rv) + if ((rv = app_worker_accept_notify (app_wrk, stream_session))) { QUIC_DBG (1, "failed to notify accept worker app"); session_free_w_fifos (stream_session); @@ -1130,6 +1129,7 @@ quic_proto_on_close (u32 ctx_index, u32 thread_index) quicly_stream_t *stream = ctx->stream; quicly_reset_stream (stream, QUIC_APP_ERROR_CLOSE_NOTIFY); quic_send_packets (ctx); + return; } switch (ctx->conn_state) @@ -1355,6 +1355,7 @@ quic_on_client_connected (quic_ctx_t * ctx) app_worker_t *app_wrk; u32 ctx_id = ctx->c_c_index; u32 thread_index = ctx->c_thread_index; + int rv; app_wrk = app_worker_get_if_valid (ctx->parent_app_wrk_id); if (!app_wrk) @@ -1381,9 +1382,10 @@ quic_on_client_connected (quic_ctx_t * ctx) } quic_session->session_state = SESSION_STATE_CONNECTING; - if (app_worker_connect_notify (app_wrk, quic_session, ctx->client_opaque)) + if ((rv = app_worker_connect_notify (app_wrk, quic_session, + ctx->client_opaque))) { - QUIC_DBG (1, "failed to notify app"); + QUIC_DBG (1, "failed to notify app %d", rv); quic_proto_on_close (ctx_id, thread_index); return -1; } @@ -1824,8 +1826,7 @@ quic_create_quic_session (quic_ctx_t * ctx) return rv; } app_wrk = app_worker_get (quic_session->app_wrk_index); - rv = app_worker_accept_notify (app_wrk, quic_session); - if (rv) + if ((rv = app_worker_accept_notify (app_wrk, quic_session))) { QUIC_DBG (1, "failed to notify accept worker app"); return rv; -- cgit 1.2.3-korg