diff options
author | Nathan Skrzypczak <nathan.skrzypczak@gmail.com> | 2019-05-22 18:41:50 +0200 |
---|---|---|
committer | Florin Coras <florin.coras@gmail.com> | 2019-06-28 19:28:17 +0000 |
commit | e82a7ade8a9667f1e49067bf59010f60beda5452 (patch) | |
tree | 619ead31dfa9a20e1fcd0043e79da5fb564b9a9f | |
parent | f73d4c2084c9cb6df4a1f8582acef523e4ba0cb2 (diff) |
quic : Use TX event for app read notification
Type: feature
Change-Id: I1846cdeb35f079249f66a0351aa244c540923a43
Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
-rw-r--r-- | src/plugins/hs_apps/sapi/quic_echo.c | 1532 | ||||
-rw-r--r-- | src/plugins/quic/quic.c | 158 | ||||
-rw-r--r-- | src/plugins/quic/quic.h | 3 | ||||
-rw-r--r-- | src/svm/svm_fifo.h | 10 | ||||
-rw-r--r-- | test/test_quic.py | 26 |
5 files changed, 926 insertions, 803 deletions
diff --git a/src/plugins/hs_apps/sapi/quic_echo.c b/src/plugins/hs_apps/sapi/quic_echo.c index a62af642b2f..3c9ddcc102d 100644 --- a/src/plugins/hs_apps/sapi/quic_echo.c +++ b/src/plugins/hs_apps/sapi/quic_echo.c @@ -38,11 +38,33 @@ #include <vpp/api/vpe_all_api_h.h> #undef vl_printfun +#define SESSION_INVALID_INDEX ((u32)~0) +#define SESSION_INVALID_HANDLE ((u64)~0) + #define QUIC_ECHO_DBG 0 #define DBG(_fmt, _args...) \ if (QUIC_ECHO_DBG) \ clib_warning (_fmt, ##_args) +#define CHECK(expected, result, _fmt, _args...) \ + if (expected != result) \ + ECHO_FAIL ("expected %d, got %d : " _fmt, expected, result, ##_args); + +#define ECHO_FAIL(_fmt,_args...) \ + { \ + echo_main_t *em = &echo_main; \ + em->has_failed = 1; \ + em->time_to_stop = 1; \ + clib_warning ("ECHO-ERROR: "_fmt, ##_args); \ + } + +typedef enum +{ + SESSION_FLAG_NOFLAG = 0, + SESSION_FLAG_SHOULD_CLOSE = 1, + SESSION_FLAG_SHOULD_FREE = 2, +} echo_session_flag_t; + typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -55,16 +77,34 @@ typedef struct volatile u64 bytes_received; volatile u64 bytes_to_receive; f64 start; + u32 listener_index; /* listener index in echo session pool */ + volatile u64 accepted_session_count; /* sessions we accepted */ + volatile echo_session_flag_t flags; } echo_session_t; typedef enum { + ECHO_NO_DATA_SOURCE, + ECHO_TEST_DATA_SOURCE, + ECHO_RX_DATA_SOURCE, + ECHO_INVALID_DATA_SOURCE +} data_source_t; + +enum quic_session_type_t +{ + QUIC_SESSION_TYPE_QUIC, + QUIC_SESSION_TYPE_STREAM, + QUIC_SESSION_TYPE_LISTEN, +}; + + +typedef enum +{ STATE_START, STATE_ATTACHED, STATE_LISTEN, STATE_READY, - STATE_DISCONNECTING, - STATE_FAILED, + STATE_DISCONNECTED, STATE_DETACHED } connection_state_t; @@ -79,13 +119,6 @@ typedef enum ECHO_EVT_EXIT, /* app exits */ } echo_test_evt_t; -enum quic_session_type_t -{ - QUIC_SESSION_TYPE_QUIC = 0, - QUIC_SESSION_TYPE_STREAM = 1, - QUIC_SESSION_TYPE_LISTEN = INT32_MAX, -}; - typedef struct _quic_echo_cb_vft { void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index); @@ -124,21 +157,15 @@ typedef struct /* Hash table for disconnect processing */ uword *session_index_by_vpp_handles; - /* Handle of vpp listener session */ - u64 listener_handle; + /* Index of vpp listener session */ + u32 listen_session_index; /* Hash table for shared segment_names */ uword *shared_segment_handles; clib_spinlock_t segment_handles_lock; - /* intermediate rx buffer */ - u8 *rx_buf; - int i_am_master; - /* drop all packets */ - int no_return; - /* Our event queue */ svm_msg_q_t *our_event_queue; @@ -153,23 +180,28 @@ typedef struct volatile connection_state_t state; /* Signal variables */ - volatile int time_to_stop; + volatile u8 time_to_stop; + u8 has_failed; /* VNET_API_ERROR_FOO -> "Foo" hash table */ uword *error_string_by_error_number; u8 *connect_test_data; - pthread_t *client_thread_handles; - u32 *thread_args; u8 test_return_packets; u64 bytes_to_send; u64 bytes_to_receive; u32 fifo_size; + u32 rx_buf_size; + u32 tx_buf_size; + data_source_t data_source; + u8 send_disconnects; /* actively send disconnect */ u8 *appns_id; u64 appns_flags; u64 appns_secret; + pthread_t *client_thread_handles; + u32 *thread_args; u32 n_clients; /* Target number of QUIC sessions */ u32 n_stream_clients; /* Target Number of STREAM sessions per QUIC session */ volatile u32 n_quic_clients_connected; /* Number of connected QUIC sessions */ @@ -213,6 +245,95 @@ echo_main_t echo_main; #define TIMEOUT 10.0 #endif +/* + * + * Format functions + * + */ + +u8 * +format_ip4_address (u8 * s, va_list * args) +{ + u8 *a = va_arg (*args, u8 *); + return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]); +} + +u8 * +format_ip6_address (u8 * s, va_list * args) +{ + ip6_address_t *a = va_arg (*args, ip6_address_t *); + u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon; + + i_max_n_zero = ARRAY_LEN (a->as_u16); + max_n_zeros = 0; + i_first_zero = i_max_n_zero; + n_zeros = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + u32 is_zero = a->as_u16[i] == 0; + if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16)) + { + i_first_zero = i; + n_zeros = 0; + } + n_zeros += is_zero; + if ((!is_zero && n_zeros > max_n_zeros) + || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros)) + { + i_max_n_zero = i_first_zero; + max_n_zeros = n_zeros; + i_first_zero = ARRAY_LEN (a->as_u16); + n_zeros = 0; + } + } + + last_double_colon = 0; + for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + { + if (i == i_max_n_zero && max_n_zeros > 1) + { + s = format (s, "::"); + i += max_n_zeros - 1; + last_double_colon = 1; + } + else + { + s = format (s, "%s%x", + (last_double_colon || i == 0) ? "" : ":", + clib_net_to_host_u16 (a->as_u16[i])); + last_double_colon = 0; + } + } + + return s; +} + +/* Format an IP46 address. */ +u8 * +format_ip46_address (u8 * s, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + ip46_type_t type = va_arg (*args, ip46_type_t); + int is_ip4 = 1; + + switch (type) + { + case IP46_TYPE_ANY: + is_ip4 = ip46_address_is_ip4 (ip46); + break; + case IP46_TYPE_IP4: + is_ip4 = 1; + break; + case IP46_TYPE_IP6: + is_ip4 = 0; + break; + } + + return is_ip4 ? + format (s, "%U", format_ip4_address, &ip46->ip4) : + format (s, "%U", format_ip6_address, &ip46->ip6); +} + u8 * format_quic_echo_state (u8 * s, va_list * args) { @@ -225,10 +346,8 @@ format_quic_echo_state (u8 * s, va_list * args) return format (s, "STATE_LISTEN"); if (state == STATE_READY) return format (s, "STATE_READY"); - if (state == STATE_DISCONNECTING) - return format (s, "STATE_DISCONNECTING"); - if (state == STATE_FAILED) - return format (s, "STATE_FAILED"); + if (state == STATE_DISCONNECTED) + return format (s, "STATE_DISCONNECTED"); if (state == STATE_DETACHED) return format (s, "STATE_DETACHED"); else @@ -251,13 +370,29 @@ format_api_error (u8 * s, va_list * args) return s; } -static void -quic_echo_notify_event (echo_main_t * em, echo_test_evt_t e) +static uword +unformat_data (unformat_input_t * input, va_list * args) { - if (em->timing_start_event == e) - em->start_time = clib_time_now (&em->clib_time); - else if (em->timing_end_event == e) - em->end_time = clib_time_now (&em->clib_time); + u64 _a; + u64 *a = va_arg (*args, u64 *); + if (unformat (input, "%lluGb", &_a)) + { + *a = _a << 30; + return 1; + } + else if (unformat (input, "%lluMb", &_a)) + { + *a = _a << 20; + return 1; + } + else if (unformat (input, "%lluKb", &_a)) + { + *a = _a << 10; + return 1; + } + else if (unformat (input, "%llu", a)) + return 1; + return 0; } static uword @@ -317,88 +452,17 @@ init_error_string_table (echo_main_t * em) hash_set (em->error_string_by_error_number, 99, "Misc"); } -static void handle_mq_event (echo_main_t * em, session_event_t * e, - int handle_rx); -static void echo_handle_rx (echo_main_t * em, session_event_t * e); - -static int -wait_for_segment_allocation (u64 segment_handle) -{ - echo_main_t *em = &echo_main; - f64 timeout; - timeout = clib_time_now (&em->clib_time) + TIMEOUT; - uword *segment_present; - DBG ("Waiting for segment %lx...", segment_handle); - while (clib_time_now (&em->clib_time) < timeout) - { - clib_spinlock_lock (&em->segment_handles_lock); - segment_present = hash_get (em->shared_segment_handles, segment_handle); - clib_spinlock_unlock (&em->segment_handles_lock); - if (segment_present != 0) - return 0; - if (em->time_to_stop == 1) - return 0; - } - DBG ("timeout waiting for segment_allocation %lx", segment_handle); - return -1; -} - -static int -wait_for_disconnected_sessions (echo_main_t * em) -{ - f64 timeout; - timeout = clib_time_now (&em->clib_time) + TIMEOUT; - while (clib_time_now (&em->clib_time) < timeout) - { - if (hash_elts (em->session_index_by_vpp_handles) == 0) - return 0; - } - DBG ("timeout waiting for disconnected_sessions"); - return -1; -} - -static int -wait_for_state_change (echo_main_t * em, connection_state_t state, - f64 timeout) -{ - svm_msg_q_msg_t msg; - session_event_t *e; - f64 end_time = clib_time_now (&em->clib_time) + timeout; - - while (!timeout || clib_time_now (&em->clib_time) < end_time) - { - if (em->state == state) - return 0; - if (em->state == STATE_FAILED) - return -1; - if (em->time_to_stop == 1) - return 0; - if (!em->our_event_queue || em->state < STATE_ATTACHED) - continue; - - if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0)) - continue; - e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (em, e, 0 /* handle_rx */ ); - svm_msg_q_free_msg (em->our_event_queue, &msg); - } - clib_warning ("timeout waiting for state %U", format_quic_echo_state, - state); - return -1; -} +/* + * + * End of format functions + * + */ -static void -notify_rx_data_to_vpp (echo_session_t * s) -{ - svm_fifo_t *f = s->tx_fifo; - return; /* FOR NOW */ - if (svm_fifo_set_event (f)) - { - DBG ("did send event"); - app_send_io_evt_to_vpp (s->vpp_evt_q, f->master_session_index, - SESSION_IO_EVT_TX, 0 /* noblock */ ); - } -} +/* + * + * Session API Calls + * + */ void application_send_attach (echo_main_t * em) @@ -450,13 +514,6 @@ application_send_attach (echo_main_t * em) vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & key_mp); } -static int -application_attach (echo_main_t * em) -{ - application_send_attach (em); - return wait_for_state_change (em, STATE_ATTACHED, TIMEOUT); -} - void application_detach (echo_main_t * em) { @@ -468,105 +525,142 @@ application_detach (echo_main_t * em) bmp->client_index = em->my_client_index; bmp->context = ntohl (0xfeedface); vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); - - DBG ("%s", "Sent detach"); } -static int -ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd) +static void +server_send_listen (echo_main_t * em) { - fifo_segment_create_args_t _a, *a = &_a; - fifo_segment_main_t *sm = &echo_main.segment_main; - int rv; - - clib_memset (a, 0, sizeof (*a)); - a->segment_name = (char *) name; - a->segment_type = type; - - if (type == SSVM_SEGMENT_MEMFD) - a->memfd_fd = fd; + vl_api_bind_uri_t *bmp; + bmp = vl_msg_api_alloc (sizeof (*bmp)); + clib_memset (bmp, 0, sizeof (*bmp)); - if ((rv = fifo_segment_attach (sm, a))) - { - clib_warning ("svm_fifo_segment_attach ('%s') failed", name); - return rv; - } - vec_reset_length (a->new_segment_indices); - return 0; + bmp->_vl_msg_id = ntohs (VL_API_BIND_URI); + bmp->client_index = em->my_client_index; + bmp->context = ntohl (0xfeedface); + memcpy (bmp->uri, em->uri, vec_len (em->uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); } static void -vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * - mp) +server_send_unbind (echo_main_t * em) { - echo_main_t *em = &echo_main; - int *fds = 0; - u32 n_fds = 0; - u64 segment_handle; - segment_handle = clib_net_to_host_u64 (mp->segment_handle); - DBG ("Attached returned app %u", htons (mp->app_index)); - - if (mp->retval) - { - clib_warning ("attach failed: %U", format_api_error, - clib_net_to_host_u32 (mp->retval)); - goto failed; - } + vl_api_unbind_uri_t *ump; - if (mp->segment_name_length == 0) - { - clib_warning ("segment_name_length zero"); - goto failed; - } + ump = vl_msg_api_alloc (sizeof (*ump)); + clib_memset (ump, 0, sizeof (*ump)); - ASSERT (mp->app_event_queue_address); - em->our_event_queue = uword_to_pointer (mp->app_event_queue_address, - svm_msg_q_t *); + ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI); + ump->client_index = em->my_client_index; + memcpy (ump->uri, em->uri, vec_len (em->uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & ump); +} - if (mp->n_fds) - { - vec_validate (fds, mp->n_fds); - vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5); +static void +echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque) +{ + vl_api_connect_uri_t *cmp; + cmp = vl_msg_api_alloc (sizeof (*cmp)); + clib_memset (cmp, 0, sizeof (*cmp)); - if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) - if (ssvm_segment_attach (0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) - goto failed; + cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI); + cmp->client_index = em->my_client_index; + cmp->context = ntohl (opaque); + memcpy (cmp->uri, uri, vec_len (uri)); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp); +} - if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) - if (ssvm_segment_attach ((char *) mp->segment_name, - SSVM_SEGMENT_MEMFD, fds[n_fds++])) - goto failed; +static void +echo_disconnect_session (echo_main_t * em, echo_session_t * s) +{ + vl_api_disconnect_session_t *dmp; + dmp = vl_msg_api_alloc (sizeof (*dmp)); + clib_memset (dmp, 0, sizeof (*dmp)); + dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION); + dmp->client_index = em->my_client_index; + dmp->handle = s->vpp_session_handle; + DBG ("Disconnect session 0x%lx", dmp->handle); + vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp); +} - if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) - svm_msg_q_set_consumer_eventfd (em->our_event_queue, fds[n_fds++]); +/* + * + * End Session API Calls + * + */ - vec_free (fds); - } - else +static int +wait_for_segment_allocation (u64 segment_handle) +{ + echo_main_t *em = &echo_main; + f64 timeout; + timeout = clib_time_now (&em->clib_time) + TIMEOUT; + uword *segment_present; + DBG ("Waiting for segment 0x%lx...", segment_handle); + while (clib_time_now (&em->clib_time) < timeout) { - if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM, - -1)) - goto failed; + clib_spinlock_lock (&em->segment_handles_lock); + segment_present = hash_get (em->shared_segment_handles, segment_handle); + clib_spinlock_unlock (&em->segment_handles_lock); + if (segment_present != 0) + return 0; + if (em->time_to_stop == 1) + return 0; } - clib_spinlock_lock (&em->segment_handles_lock); - hash_set (em->shared_segment_handles, segment_handle, 1); - clib_spinlock_unlock (&em->segment_handles_lock); - DBG ("Mapped new segment %lx", segment_handle); + DBG ("timeout wait_for_segment_allocation (0x%lx)", segment_handle); + return -1; +} - em->state = STATE_ATTACHED; - return; -failed: - em->state = STATE_FAILED; - return; +static void +quic_echo_notify_event (echo_main_t * em, echo_test_evt_t e) +{ + if (em->timing_start_event == e) + em->start_time = clib_time_now (&em->clib_time); + else if (em->timing_end_event == e) + em->end_time = clib_time_now (&em->clib_time); } static void -vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t * - mp) +echo_assert_test_suceeded (echo_main_t * em) +{ + CHECK (em->rx_total, + em->n_stream_clients * em->n_clients * em->bytes_to_receive, + "Not enough data received"); + CHECK (em->tx_total, + em->n_stream_clients * em->n_clients * em->bytes_to_send, + "Not enough data sent"); + CHECK (0, hash_elts (em->session_index_by_vpp_handles), + "Some sessions are still open"); +} + +always_inline void +echo_session_dequeue_notify (echo_session_t * s) { - if (mp->retval) - clib_warning ("detach returned with err: %d", mp->retval); - echo_main.state = STATE_DETACHED; + int rv; + rv = app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index, + SESSION_IO_EVT_RX, SVM_Q_WAIT); + svm_fifo_clear_tx_ntf (s->rx_fifo); + if (rv) + ECHO_FAIL ("app_send_io_evt_to_vpp errored %d", rv); +} + +static int +ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd) +{ + fifo_segment_create_args_t _a, *a = &_a; + fifo_segment_main_t *sm = &echo_main.segment_main; + int rv; + + clib_memset (a, 0, sizeof (*a)); + a->segment_name = (char *) name; + a->segment_type = type; + + if (type == SSVM_SEGMENT_MEMFD) + a->memfd_fd = fd; + + if ((rv = fifo_segment_attach (sm, a))) + return rv; + vec_reset_length (a->new_segment_indices); + return 0; } static void @@ -619,67 +713,12 @@ connect_to_vpp (char *name) return 0; } -void -disconnect_from_vpp (echo_main_t * em) -{ - if (em->use_sock_api) - vl_socket_client_disconnect (); - else - vl_client_disconnect_from_vlib (); -} - -static void -vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) -{ - fifo_segment_main_t *sm = &echo_main.segment_main; - fifo_segment_create_args_t _a, *a = &_a; - echo_main_t *em = &echo_main; - int rv; - int *fds = 0; - u64 segment_handle; - segment_handle = clib_net_to_host_u64 (mp->segment_handle); - - if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) - { - vec_validate (fds, 1); - vl_socket_client_recv_fd_msg (fds, 1, 5); - if (ssvm_segment_attach - ((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[0])) - clib_warning - ("svm_fifo_segment_attach ('%s') failed on SSVM_SEGMENT_MEMFD", - mp->segment_name); - clib_spinlock_lock (&em->segment_handles_lock); - hash_set (em->shared_segment_handles, segment_handle, 1); - clib_spinlock_unlock (&em->segment_handles_lock); - vec_free (fds); - DBG ("Mapped new segment %lx", segment_handle); - return; - } - - clib_memset (a, 0, sizeof (*a)); - a->segment_name = (char *) mp->segment_name; - a->segment_size = mp->segment_size; - /* Attach to the segment vpp created */ - rv = fifo_segment_attach (sm, a); - if (rv) - { - clib_warning ("svm_fifo_segment_attach ('%s') failed", - mp->segment_name); - return; - } - clib_spinlock_lock (&em->segment_handles_lock); - hash_set (em->shared_segment_handles, mp->segment_name, 1); - clib_spinlock_unlock (&em->segment_handles_lock); - clib_warning ("Mapped new segment '%s' size %d", mp->segment_name, - mp->segment_size); -} - static void session_print_stats (echo_main_t * em, echo_session_t * session) { f64 deltat = clib_time_now (&em->clib_time) - session->start; - fformat (stdout, "Session %x done in %.6fs RX[%.4f] TX[%.4f] Gbit/s\n", - session->session_index, deltat, + fformat (stdout, "Session 0x%x done in %.6fs RX[%.4f] TX[%.4f] Gbit/s\n", + session->vpp_session_handle, deltat, (session->bytes_received * 8.0) / deltat / 1e9, (session->bytes_sent * 8.0) / deltat / 1e9); } @@ -705,92 +744,136 @@ print_global_stats (echo_main_t * em) fformat (stdout, "--------------------\n"); } +static void +echo_free_sessions (echo_main_t * em) +{ + /* Free marked sessions */ + echo_session_t *s; + u32 *session_indexes = 0, *session_index; + + /* *INDENT-OFF* */ + pool_foreach (s, em->sessions, + ({ + if (s->flags & SESSION_FLAG_SHOULD_FREE) + vec_add1 (session_indexes, s->session_index);} + )); + /* *INDENT-ON* */ + vec_foreach (session_index, session_indexes) + { + /* Free session */ + s = pool_elt_at_index (em->sessions, *session_index); + DBG ("Freeing session 0x%lx", s->vpp_session_handle); + pool_put (em->sessions, s); + clib_memset (s, 0xfe, sizeof (*s)); + } +} + +static void +echo_cleanup_session (echo_main_t * em, echo_session_t * s) +{ + echo_session_t *ls; + u64 accepted_session_count; + if (s->listener_index != SESSION_INVALID_INDEX) + { + ls = pool_elt_at_index (em->sessions, s->listener_index); + accepted_session_count = + clib_atomic_sub_fetch (&ls->accepted_session_count, 1); + if (accepted_session_count == 0 + && ls->session_type != QUIC_SESSION_TYPE_LISTEN + && em->send_disconnects) + echo_disconnect_session (em, ls); + } + if (s->session_type == QUIC_SESSION_TYPE_QUIC) + clib_atomic_sub_fetch (&em->n_quic_clients_connected, 1); + else if (s->session_type == QUIC_SESSION_TYPE_STREAM) + clib_atomic_sub_fetch (&em->n_clients_connected, 1); + + DBG ("Cleanup sessions (still %uQ %uS)", em->n_quic_clients_connected, + em->n_clients_connected); + hash_unset (em->session_index_by_vpp_handles, s->vpp_session_handle); + + /* Mark session as to be freed */ + s->flags |= SESSION_FLAG_SHOULD_FREE; +} static void test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf, u32 n_read) { - int i; + u32 i; u8 expected; for (i = 0; i < n_read; i++) { expected = (s->bytes_received + i) & 0xff; - if (rx_buf[i] != expected && em->max_test_msg > 0) - { - clib_warning - ("Session[%lx][0x%lx] byte[%lld], got 0x%x but expected 0x%x", - s->session_index, s->vpp_session_handle, s->bytes_received + i, - rx_buf[i], expected); - em->max_test_msg--; - if (em->max_test_msg == 0) - clib_warning ("Too many errors, hiding next ones"); - if (em->test_return_packets == RETURN_PACKETS_ASSERT) - ASSERT (0); - } + if (rx_buf[i] == expected || em->max_test_msg > 0) + continue; + clib_warning ("Session 0x%lx byte %lld was 0x%x expected 0x%x", + s->vpp_session_handle, s->bytes_received + i, rx_buf[i], + expected); + em->max_test_msg--; + if (em->max_test_msg == 0) + clib_warning ("Too many errors, hiding next ones"); + if (em->test_return_packets == RETURN_PACKETS_ASSERT) + ECHO_FAIL ("test-bytes errored"); } } -static void +static int recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf) { - int n_to_read, n_read; + int n_read; + n_read = app_recv_stream ((app_session_t *) s, rx_buf, vec_len (rx_buf)); + if (n_read <= 0) + return 0; + if (svm_fifo_needs_tx_ntf (s->rx_fifo, n_read)) + echo_session_dequeue_notify (s); - n_to_read = svm_fifo_max_dequeue (s->rx_fifo); - if (!n_to_read) - return; + if (em->test_return_packets) + test_recv_bytes (em, s, rx_buf, n_read); - do - { - n_read = - app_recv_stream ((app_session_t *) s, rx_buf, vec_len (rx_buf)); - if (n_read <= 0) - break; - notify_rx_data_to_vpp (s); - if (em->test_return_packets) - test_recv_bytes (em, s, rx_buf, n_read); - - ASSERT (s->bytes_to_receive >= n_read); - n_to_read -= n_read; - s->bytes_received += n_read; - s->bytes_to_receive -= n_read; - } - while (n_to_read > 0); + ASSERT (s->bytes_to_receive >= n_read); + s->bytes_received += n_read; + s->bytes_to_receive -= n_read; + return n_read; } -static void -send_data_chunk (echo_main_t * em, echo_session_t * s) +static int +send_data_chunk (echo_session_t * s, u8 * tx_buf, int offset, int len) { - u64 test_buf_len, bytes_this_chunk, test_buf_offset; - u8 *test_data = em->connect_test_data; int n_sent; - - test_buf_len = vec_len (test_data); - test_buf_offset = s->bytes_sent % test_buf_len; - bytes_this_chunk = clib_min (test_buf_len - test_buf_offset, - s->bytes_to_send); - - n_sent = app_send_stream ((app_session_t *) s, test_data + test_buf_offset, + int bytes_this_chunk = clib_min (s->bytes_to_send, len - offset); + if (!bytes_this_chunk) + return 0; + n_sent = app_send_stream ((app_session_t *) s, tx_buf + offset, bytes_this_chunk, 0); + if (n_sent < 0) + return 0; + s->bytes_to_send -= n_sent; + s->bytes_sent += n_sent; + return n_sent; +} - if (n_sent > 0) - { - s->bytes_to_send -= n_sent; - s->bytes_sent += n_sent; - } +static int +mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len) +{ + u64 n_sent = 0; + while (n_sent < len && !em->time_to_stop) + n_sent += send_data_chunk (s, tx_buf, n_sent, len); + return n_sent; } /* * Rx/Tx polling thread per connection */ static void * -client_thread_fn (void *arg) +echo_thread_fn (void *arg) { echo_main_t *em = &echo_main; u8 *rx_buf = 0; u32 session_index = *(u32 *) arg; echo_session_t *s; - - vec_validate (rx_buf, 1 << 20); + int n_read, n_sent, idle_loop = 0; + vec_validate (rx_buf, em->rx_buf_size); while (!em->time_to_stop && em->state != STATE_READY) ; @@ -798,72 +881,81 @@ client_thread_fn (void *arg) s = pool_elt_at_index (em->sessions, session_index); while (!em->time_to_stop) { - send_data_chunk (em, s); - recv_data_chunk (em, s, rx_buf); + n_read = recv_data_chunk (em, s, rx_buf); + if (em->data_source == ECHO_TEST_DATA_SOURCE) + n_sent = + send_data_chunk (s, em->connect_test_data, + s->bytes_sent % em->tx_buf_size, em->tx_buf_size); + else if (em->data_source == ECHO_RX_DATA_SOURCE) + n_sent = mirror_data_chunk (em, s, rx_buf, n_read); + else + n_sent = 0; if (!s->bytes_to_send && !s->bytes_to_receive) break; - } + if (s->flags & SESSION_FLAG_SHOULD_CLOSE) + break; + /* check idle clients */ + if (!n_sent & !n_read) + idle_loop++; + else + idle_loop = 0; + if (idle_loop == 1e7) + { + DBG ("Idle client TX:%dB RX:%dB", s->bytes_to_send, + s->bytes_to_receive); + DBG ("Idle FIFOs TX:%dB RX:%dB", svm_fifo_max_dequeue (s->tx_fifo), + svm_fifo_max_dequeue (s->rx_fifo)); + } + } DBG ("[%lu/%lu] -> S(%x) -> [%lu/%lu]", s->bytes_received, s->bytes_received + s->bytes_to_receive, session_index, s->bytes_sent, s->bytes_sent + s->bytes_to_send); em->tx_total += s->bytes_sent; em->rx_total += s->bytes_received; - em->n_clients_connected--; - - if (em->n_clients_connected == 0) + clib_warning ("%d/%d", em->rx_total, + em->n_clients * em->n_stream_clients * em->bytes_to_receive); + if (em->rx_total == + em->n_clients * em->n_stream_clients * em->bytes_to_receive) quic_echo_notify_event (em, ECHO_EVT_LAST_BYTE); - + if (em->send_disconnects) + echo_disconnect_session (em, s); + else + { + while (!(s->flags & SESSION_FLAG_SHOULD_CLOSE) & !em->time_to_stop) + ; + echo_cleanup_session (em, s); + } pthread_exit (0); } static void -echo_send_connect (echo_main_t * em, u8 * uri, u32 opaque) -{ - vl_api_connect_uri_t *cmp; - cmp = vl_msg_api_alloc (sizeof (*cmp)); - clib_memset (cmp, 0, sizeof (*cmp)); - - cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI); - cmp->client_index = em->my_client_index; - cmp->context = ntohl (opaque); - memcpy (cmp->uri, uri, vec_len (uri)); - vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp); -} - -static void -client_disconnect_session (echo_main_t * em, echo_session_t * s) -{ - vl_api_disconnect_session_t *dmp; - dmp = vl_msg_api_alloc (sizeof (*dmp)); - clib_memset (dmp, 0, sizeof (*dmp)); - dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION); - dmp->client_index = em->my_client_index; - dmp->handle = s->vpp_session_handle; - DBG ("Sending Session disonnect handle %lu", dmp->handle); - vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & dmp); - pool_put (em->sessions, s); - clib_memset (s, 0xfe, sizeof (*s)); -} - -static void session_bound_handler (session_bound_msg_t * mp) { echo_main_t *em = &echo_main; + echo_session_t *listen_session; + u32 session_index; if (mp->retval) { - clib_warning ("bind failed: %U", format_api_error, - clib_net_to_host_u32 (mp->retval)); - em->state = STATE_FAILED; + ECHO_FAIL ("bind failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); return; } clib_warning ("listening on %U:%u", format_ip46_address, mp->lcl_ip, mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6, clib_net_to_host_u16 (mp->lcl_port)); - em->listener_handle = mp->handle; + + /* Allocate local session and set it up */ + pool_get (em->sessions, listen_session); + listen_session->session_type = QUIC_SESSION_TYPE_LISTEN; + listen_session->accepted_session_count = 0; + session_index = listen_session - em->sessions; + listen_session->session_index = session_index; + hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); em->state = STATE_LISTEN; + em->listen_session_index = session_index; } static void @@ -873,15 +965,18 @@ session_accepted_handler (session_accepted_msg_t * mp) session_accepted_reply_msg_t *rmp; svm_fifo_t *rx_fifo, *tx_fifo; echo_main_t *em = &echo_main; - echo_session_t *session; + echo_session_t *session, *listen_session; u32 session_index; - + uword *p; /* Allocate local session and set it up */ pool_get (em->sessions, session); session_index = session - em->sessions; if (wait_for_segment_allocation (mp->segment_handle)) - return; + { + ECHO_FAIL ("wait_for_segment_allocation errored"); + return; + } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; @@ -896,8 +991,19 @@ session_accepted_handler (session_accepted_msg_t * mp) session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); + session->accepted_session_count = 0; + p = hash_get (em->session_index_by_vpp_handles, mp->listener_handle); + if (!p) + { + ECHO_FAIL ("unknown handle 0x%lx", mp->listener_handle); + return; + } + session->listener_index = p[0]; + listen_session = pool_elt_at_index (em->sessions, p[0]); + clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); + /* Add it to lookup table */ - DBG ("Accepted session handle %lx, Listener %lx idx %lu", mp->handle, + DBG ("Accepted session 0x%lx, Listener 0x%lx idx %lu", mp->handle, mp->listener_handle, session_index); hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); @@ -908,24 +1014,26 @@ session_accepted_handler (session_accepted_msg_t * mp) rmp->context = mp->context; app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); - DBG ("SSession handle is %lu", mp->handle); - if (mp->listener_handle == em->listener_handle) + if (listen_session->session_type == QUIC_SESSION_TYPE_LISTEN) { + session->session_type = QUIC_SESSION_TYPE_QUIC; if (em->cb_vft.quic_accepted_cb) em->cb_vft.quic_accepted_cb (mp, session_index); - em->n_quic_clients_connected++; + clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } else if (em->i_am_master) { + session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.server_stream_accepted_cb) em->cb_vft.server_stream_accepted_cb (mp, session_index); - em->n_clients_connected++; + clib_atomic_fetch_add (&em->n_clients_connected, 1); } else { + session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.client_stream_accepted_cb) em->cb_vft.client_stream_accepted_cb (mp, session_index); - em->n_clients_connected++; + clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients) @@ -941,25 +1049,27 @@ static void session_connected_handler (session_connected_msg_t * mp) { echo_main_t *em = &echo_main; - echo_session_t *session; + echo_session_t *session, *listen_session; u32 session_index; + u32 listener_index = htonl (mp->context); svm_fifo_t *rx_fifo, *tx_fifo; if (mp->retval) { - clib_warning ("connection failed with code: %U", format_api_error, - clib_net_to_host_u32 (mp->retval)); - em->state = STATE_FAILED; + ECHO_FAIL ("connection failed with code: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); return; } pool_get (em->sessions, session); clib_memset (session, 0, sizeof (*session)); session_index = session - em->sessions; - DBG ("CONNECTED session[%lx][0x%lx]", session_index, mp->handle); if (wait_for_segment_allocation (mp->segment_handle)) - return; + { + ECHO_FAIL ("wait_for_segment_allocation errored"); + return; + } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; @@ -974,27 +1084,37 @@ session_connected_handler (session_connected_msg_t * mp) session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); - DBG ("Connected session handle %lx, idx %lu RX[%lx] TX[%lx]", mp->handle, - session_index, rx_fifo, tx_fifo); + session->listener_index = listener_index; + session->accepted_session_count = 0; + if (listener_index != SESSION_INVALID_INDEX) + { + listen_session = pool_elt_at_index (em->sessions, listener_index); + clib_atomic_fetch_add (&listen_session->accepted_session_count, 1); + } + + DBG ("Connected session 0x%lx", mp->handle, session_index); hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); - if (mp->context == QUIC_SESSION_TYPE_QUIC) + if (listener_index == SESSION_INVALID_INDEX) { + session->session_type = QUIC_SESSION_TYPE_QUIC; if (em->cb_vft.quic_connected_cb) em->cb_vft.quic_connected_cb (mp, session_index); - em->n_quic_clients_connected++; + clib_atomic_fetch_add (&em->n_quic_clients_connected, 1); } else if (em->i_am_master) { + session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.server_stream_connected_cb) em->cb_vft.server_stream_connected_cb (mp, session_index); - em->n_clients_connected++; + clib_atomic_fetch_add (&em->n_clients_connected, 1); } else { + session->session_type = QUIC_SESSION_TYPE_STREAM; if (em->cb_vft.client_stream_connected_cb) em->cb_vft.client_stream_connected_cb (mp, session_index); - em->n_clients_connected++; + clib_atomic_fetch_add (&em->n_clients_connected, 1); } if (em->n_clients_connected == em->n_clients * em->n_stream_clients) @@ -1018,7 +1138,7 @@ echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; u8 *uri = format (0, "QUIC://session/%lu", mp->handle); - int i; + u64 i; if (!em->first_sconnect_sent) { @@ -1026,13 +1146,10 @@ echo_on_connected_connect (session_connected_msg_t * mp, u32 session_index) quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); } for (i = 0; i < em->n_stream_clients; i++) - { - DBG ("CONNECT : new QUIC stream #%d: %s", i, uri); - echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); - } + echo_send_connect (em, uri, session_index); - clib_warning ("session %u (0x%llx) connected with local ip %U port %d", - session_index, mp->handle, format_ip46_address, &mp->lcl.ip, + clib_warning ("Qsession 0x%llx connected to %U:%d", + mp->handle, format_ip46_address, &mp->lcl.ip, mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port)); } @@ -1043,22 +1160,22 @@ echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) int rv; echo_session_t *session; - DBG ("Stream Session Connected"); - session = pool_elt_at_index (em->sessions, session_index); session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; + DBG ("Stream session 0x%lx connected", session->vpp_session_handle); + /* * Start RX thread */ em->thread_args[em->n_clients_connected] = session_index; rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], - NULL /*attr */ , client_thread_fn, + NULL /*attr */ , echo_thread_fn, (void *) &em->thread_args[em->n_clients_connected]); if (rv) { - clib_warning ("pthread_create returned %d", rv); + ECHO_FAIL ("pthread_create returned %d", rv); return; } } @@ -1066,8 +1183,8 @@ echo_on_connected_send (session_connected_msg_t * mp, u32 session_index) static void echo_on_connected_error (session_connected_msg_t * mp, u32 session_index) { - clib_warning ("Got a wrong connected on session %u [%lx]", session_index, - mp->handle); + ECHO_FAIL ("Got a wrong connected on session %u [%lx]", session_index, + mp->handle); } static void @@ -1081,29 +1198,27 @@ echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index) session->bytes_to_send = em->bytes_to_send; session->bytes_to_receive = em->bytes_to_receive; - DBG ("Stream session accepted 0x%lx, expecting %lu bytes", - session->vpp_session_handle, session->bytes_to_receive); + DBG ("Stream session 0x%lx accepted", mp->handle); /* * Start RX thread */ em->thread_args[em->n_clients_connected] = session_index; rv = pthread_create (&em->client_thread_handles[em->n_clients_connected], - NULL /*attr */ , client_thread_fn, + NULL /*attr */ , echo_thread_fn, (void *) &em->thread_args[em->n_clients_connected]); if (rv) { - clib_warning ("pthread_create returned %d", rv); + ECHO_FAIL ("pthread_create returned %d", rv); return; } - } static void echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index) { echo_main_t *em = &echo_main; - DBG ("Accept on QSession index %u", mp->handle); + DBG ("Accept on QSession 0x%lx %u", mp->handle); u8 *uri = format (0, "QUIC://session/%lu", mp->handle); u32 i; @@ -1113,17 +1228,14 @@ echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index) quic_echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT); } for (i = 0; i < em->n_stream_clients; i++) - { - DBG ("ACCEPT : new QUIC stream #%d: %s", i, uri); - echo_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); - } + echo_send_connect (em, uri, session_index); } static void echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index) { - clib_warning ("Got a wrong accept on session %u [%lx]", session_index, - mp->handle); + ECHO_FAIL ("Got a wrong accept on session %u [%lx]", session_index, + mp->handle); } static void @@ -1138,26 +1250,26 @@ echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index) static const quic_echo_cb_vft_t default_cb_vft = { /* Qsessions */ - .quic_accepted_cb = &echo_on_accept_log_ip, - .quic_connected_cb = &echo_on_connected_connect, + .quic_accepted_cb = echo_on_accept_log_ip, + .quic_connected_cb = echo_on_connected_connect, /* client initiated streams */ - .server_stream_accepted_cb = NULL, - .client_stream_connected_cb = &echo_on_connected_send, + .server_stream_accepted_cb = echo_on_accept_recv, + .client_stream_connected_cb = echo_on_connected_send, /* server initiated streams */ - .client_stream_accepted_cb = &echo_on_accept_error, - .server_stream_connected_cb = &echo_on_connected_error, + .client_stream_accepted_cb = echo_on_accept_error, + .server_stream_connected_cb = echo_on_connected_error, }; static const quic_echo_cb_vft_t server_stream_cb_vft = { /* Qsessions */ - .quic_accepted_cb = &echo_on_accept_connect, + .quic_accepted_cb = echo_on_accept_connect, .quic_connected_cb = NULL, /* client initiated streams */ - .server_stream_accepted_cb = &echo_on_accept_error, - .client_stream_connected_cb = &echo_on_connected_error, + .server_stream_accepted_cb = echo_on_accept_error, + .client_stream_connected_cb = echo_on_connected_error, /* server initiated streams */ - .client_stream_accepted_cb = &echo_on_accept_recv, - .server_stream_connected_cb = &echo_on_connected_send, + .client_stream_accepted_cb = echo_on_accept_recv, + .server_stream_connected_cb = echo_on_connected_send, }; static uword @@ -1165,40 +1277,12 @@ echo_unformat_quic_setup_vft (unformat_input_t * input, va_list * args) { echo_main_t *em = &echo_main; if (unformat (input, "serverstream")) - { - clib_warning ("Using QUIC server initiated streams"); - em->no_return = 1; - em->cb_vft = server_stream_cb_vft; - return 1; - } + em->cb_vft = server_stream_cb_vft; else if (unformat (input, "default")) - return 1; - return 0; -} - -static uword -echo_unformat_data (unformat_input_t * input, va_list * args) -{ - u64 _a; - u64 *a = va_arg (*args, u64 *); - if (unformat (input, "%lluGb", &_a)) - { - *a = _a << 30; - return 1; - } - else if (unformat (input, "%lluMb", &_a)) - { - *a = _a << 20; - return 1; - } - else if (unformat (input, "%lluKb", &_a)) - { - *a = _a << 10; - return 1; - } - else if (unformat (input, "%llu", a)) - return 1; - return 0; + ; + else + return 0; + return 1; } /* @@ -1213,31 +1297,33 @@ session_disconnected_handler (session_disconnected_msg_t * mp) app_session_evt_t _app_evt, *app_evt = &_app_evt; session_disconnected_reply_msg_t *rmp; echo_main_t *em = &echo_main; - echo_session_t *session = 0; + echo_session_t *s; uword *p; int rv = 0; - DBG ("Disonnected session handle %lx", mp->handle); + DBG ("passive close session 0x%lx", mp->handle); p = hash_get (em->session_index_by_vpp_handles, mp->handle); if (!p) { - clib_warning ("couldn't find session key %llx", mp->handle); + ECHO_FAIL ("couldn't find session key %llx", mp->handle); return; } - session = pool_elt_at_index (em->sessions, p[0]); - hash_unset (em->session_index_by_vpp_handles, mp->handle); - - pool_put (em->sessions, session); + s = pool_elt_at_index (em->sessions, p[0]); + if (s->session_type == QUIC_SESSION_TYPE_STREAM) + s->flags |= SESSION_FLAG_SHOULD_CLOSE; /* tell thread to close session */ + else + echo_cleanup_session (em, s); /* We can clean Qsessions right away */ - app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt, SESSION_CTRL_EVT_DISCONNECTED_REPLY); rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data; rmp->retval = rv; rmp->handle = mp->handle; rmp->context = mp->context; - app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt); - session_print_stats (em, session); + if (s->session_type == QUIC_SESSION_TYPE_STREAM) + session_print_stats (em, s); } static void @@ -1250,22 +1336,18 @@ session_reset_handler (session_reset_msg_t * mp) uword *p; int rv = 0; - DBG ("Reset session handle %lx", mp->handle); + DBG ("Reset session 0x%lx", mp->handle); p = hash_get (em->session_index_by_vpp_handles, mp->handle); - if (p) - { - session = pool_elt_at_index (em->sessions, p[0]); - clib_warning ("got reset"); - /* Cleanup later */ - em->time_to_stop = 1; - } - else + if (!p) { - clib_warning ("couldn't find session key %llx", mp->handle); + ECHO_FAIL ("couldn't find session key %llx", mp->handle); return; } + session = pool_elt_at_index (em->sessions, p[0]); + /* Cleanup later */ + em->time_to_stop = 1; app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, SESSION_CTRL_EVT_RESET_REPLY); rmp = (session_reset_reply_msg_t *) app_evt->evt->data; @@ -1275,34 +1357,26 @@ session_reset_handler (session_reset_msg_t * mp) } static void -handle_mq_event (echo_main_t * em, session_event_t * e, int handle_rx) +handle_mq_event (session_event_t * e) { switch (e->event_type) { case SESSION_CTRL_EVT_BOUND: - DBG ("SESSION_CTRL_EVT_BOUND"); session_bound_handler ((session_bound_msg_t *) e->data); break; case SESSION_CTRL_EVT_ACCEPTED: - DBG ("SESSION_CTRL_EVT_ACCEPTED"); session_accepted_handler ((session_accepted_msg_t *) e->data); break; case SESSION_CTRL_EVT_CONNECTED: - DBG ("SESSION_CTRL_EVT_CONNECTED"); session_connected_handler ((session_connected_msg_t *) e->data); break; case SESSION_CTRL_EVT_DISCONNECTED: - DBG ("SESSION_CTRL_EVT_DISCONNECTED"); session_disconnected_handler ((session_disconnected_msg_t *) e->data); break; case SESSION_CTRL_EVT_RESET: - DBG ("SESSION_CTRL_EVT_RESET"); session_reset_handler ((session_reset_msg_t *) e->data); break; case SESSION_IO_EVT_RX: - DBG ("SESSION_IO_EVT_RX"); - if (handle_rx) - echo_handle_rx (em, e); break; default: clib_warning ("unhandled event %u", e->event_type); @@ -1310,299 +1384,255 @@ handle_mq_event (echo_main_t * em, session_event_t * e, int handle_rx) } static int -clients_run (echo_main_t * em) +wait_for_state_change (echo_main_t * em, connection_state_t state, + f64 timeout) { svm_msg_q_msg_t msg; session_event_t *e; - echo_session_t *s; - hash_pair_t *p; - int i; + f64 end_time = clib_time_now (&em->clib_time) + timeout; - /* - * Attach and connect the clients - */ - if (application_attach (em)) - return -1; + while (!timeout || clib_time_now (&em->clib_time) < end_time) + { + if (em->state == state) + return 0; + if (em->time_to_stop == 1) + return 0; + if (!em->our_event_queue || em->state < STATE_ATTACHED) + continue; - quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); - for (i = 0; i < em->n_clients; i++) - echo_send_connect (em, em->uri, QUIC_SESSION_TYPE_QUIC); + if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0)) + continue; + e = svm_msg_q_msg_data (em->our_event_queue, &msg); + handle_mq_event (e); + svm_msg_q_free_msg (em->our_event_queue, &msg); + } + DBG ("timeout waiting for %U", format_quic_echo_state, state); + return -1; +} - wait_for_state_change (em, STATE_READY, TIMEOUT); +static void +echo_event_loop (echo_main_t * em) +{ + svm_msg_q_msg_t msg; + session_event_t *e; - /* - * Wait for client threads to send the data - */ DBG ("Waiting for data on %u clients", em->n_clients_connected); - while (em->n_clients_connected) + while (em->n_clients_connected | em->n_quic_clients_connected) { - if (svm_msg_q_is_empty (em->our_event_queue)) - continue; - if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1)) + int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); + if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop)) + break; + if (rc == ETIMEDOUT) continue; e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (em, e, 0 /* handle_rx */ ); + handle_mq_event (e); svm_msg_q_free_msg (em->our_event_queue, &msg); } - - /* *INDENT-OFF* */ - hash_foreach_pair (p, em->session_index_by_vpp_handles, - ({ - s = pool_elt_at_index (em->sessions, p->value[0]); - DBG ("Sending disconnect on session %lu", p->key); - client_disconnect_session (em, s); - })); - /* *INDENT-ON* */ - - wait_for_disconnected_sessions (em); - application_detach (em); - return 0; } static void -vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) +clients_run (echo_main_t * em) { - echo_main_t *em = &echo_main; - if (mp->retval) + u64 i; + quic_echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT); + for (i = 0; i < em->n_clients; i++) + echo_send_connect (em, em->uri, SESSION_INVALID_INDEX); + + if (wait_for_state_change (em, STATE_READY, TIMEOUT)) { - clib_warning ("bind failed: %U", format_api_error, - clib_net_to_host_u32 (mp->retval)); - em->state = STATE_FAILED; + ECHO_FAIL ("Timeout waiting for state ready"); return; } - em->state = STATE_LISTEN; + echo_event_loop (em); } static void -vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) +server_run (echo_main_t * em) { - echo_main_t *em = &echo_main; - - if (mp->retval != 0) - clib_warning ("returned %d", ntohl (mp->retval)); + server_send_listen (em); + if (wait_for_state_change (em, STATE_READY, 0)) + { + ECHO_FAIL ("Timeout waiting for state ready"); + return; + } + echo_event_loop (em); - em->state = STATE_START; + /* Cleanup */ + server_send_unbind (em); + wait_for_state_change (em, STATE_DISCONNECTED, TIMEOUT); } -u8 * -format_ip4_address (u8 * s, va_list * args) -{ - u8 *a = va_arg (*args, u8 *); - return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]); -} +/* + * + * Session API handlers + * + */ -u8 * -format_ip6_address (u8 * s, va_list * args) + +static void +vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * + mp) { - ip6_address_t *a = va_arg (*args, ip6_address_t *); - u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon; + echo_main_t *em = &echo_main; + int *fds = 0; + u32 n_fds = 0; + u64 segment_handle; + segment_handle = clib_net_to_host_u64 (mp->segment_handle); + DBG ("Attached returned app %u", htons (mp->app_index)); - i_max_n_zero = ARRAY_LEN (a->as_u16); - max_n_zeros = 0; - i_first_zero = i_max_n_zero; - n_zeros = 0; - for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + if (mp->retval) { - u32 is_zero = a->as_u16[i] == 0; - if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16)) - { - i_first_zero = i; - n_zeros = 0; - } - n_zeros += is_zero; - if ((!is_zero && n_zeros > max_n_zeros) - || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros)) - { - i_max_n_zero = i_first_zero; - max_n_zeros = n_zeros; - i_first_zero = ARRAY_LEN (a->as_u16); - n_zeros = 0; - } + ECHO_FAIL ("attach failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + return; } - last_double_colon = 0; - for (i = 0; i < ARRAY_LEN (a->as_u16); i++) + if (mp->segment_name_length == 0) { - if (i == i_max_n_zero && max_n_zeros > 1) - { - s = format (s, "::"); - i += max_n_zeros - 1; - last_double_colon = 1; - } - else - { - s = format (s, "%s%x", - (last_double_colon || i == 0) ? "" : ":", - clib_net_to_host_u16 (a->as_u16[i])); - last_double_colon = 0; - } + ECHO_FAIL ("segment_name_length zero"); + return; } - return s; -} - -/* Format an IP46 address. */ -u8 * -format_ip46_address (u8 * s, va_list * args) -{ - ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); - ip46_type_t type = va_arg (*args, ip46_type_t); - int is_ip4 = 1; + ASSERT (mp->app_event_queue_address); + em->our_event_queue = uword_to_pointer (mp->app_event_queue_address, + svm_msg_q_t *); - switch (type) + if (mp->n_fds) { - case IP46_TYPE_ANY: - is_ip4 = ip46_address_is_ip4 (ip46); - break; - case IP46_TYPE_IP4: - is_ip4 = 1; - break; - case IP46_TYPE_IP6: - is_ip4 = 0; - break; - } - - return is_ip4 ? - format (s, "%U", format_ip4_address, &ip46->ip4) : - format (s, "%U", format_ip6_address, &ip46->ip6); -} + vec_validate (fds, mp->n_fds); + vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5); -static void -echo_handle_rx (echo_main_t * em, session_event_t * e) -{ - int n_read, max_dequeue, n_sent; - u32 offset, to_dequeue; - echo_session_t *s; - s = pool_elt_at_index (em->sessions, e->session_index); - - /* Clear event only once. Otherwise, if we do it in the loop by calling - * app_recv_stream, we may end up with a lot of unhandled rx events on the - * message queue */ - svm_fifo_unset_event (s->rx_fifo); - max_dequeue = svm_fifo_max_dequeue (s->rx_fifo); - if (PREDICT_FALSE (!max_dequeue)) - return; - do - { - /* The options here are to limit ourselves to max_dequeue or read - * even the data that was enqueued while we were dequeueing and which - * now has an rx event in the mq. Either of the two work. */ - to_dequeue = clib_min (max_dequeue, vec_len (em->rx_buf)); - n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue, - 0 /* clear evt */ , 0 /* peek */ ); - - if (n_read <= 0) - break; - DBG ("Notify cause %u bytes", n_read); - notify_rx_data_to_vpp (s); - if (em->test_return_packets) - test_recv_bytes (em, s, em->rx_buf, n_read); + if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) + if (ssvm_segment_attach (0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) + { + ECHO_FAIL ("svm_fifo_segment_attach failed"); + return; + } - max_dequeue -= n_read; - s->bytes_received += n_read; - s->bytes_to_receive -= n_read; + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) + if (ssvm_segment_attach ((char *) mp->segment_name, + SSVM_SEGMENT_MEMFD, fds[n_fds++])) + { + ECHO_FAIL ("svm_fifo_segment_attach ('%s') failed", + mp->segment_name); + return; + } + if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) + svm_msg_q_set_consumer_eventfd (em->our_event_queue, fds[n_fds++]); - /* Reflect if a non-drop session */ - if (!em->no_return) + vec_free (fds); + } + else + { + if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM, + -1)) { - offset = 0; - do - { - n_sent = app_send_stream ((app_session_t *) s, - em->rx_buf + offset, - n_read, SVM_Q_WAIT); - if (n_sent <= 0) - continue; - n_read -= n_sent; - s->bytes_to_send -= n_sent; - s->bytes_sent += n_sent; - offset += n_sent; - } - while (n_read > 0); + ECHO_FAIL ("svm_fifo_segment_attach ('%s') failed", + mp->segment_name); + return; } } - while (max_dequeue > 0 && !em->time_to_stop); + clib_spinlock_lock (&em->segment_handles_lock); + hash_set (em->shared_segment_handles, segment_handle, 1); + clib_spinlock_unlock (&em->segment_handles_lock); + DBG ("Mapped segment 0x%lx", segment_handle); + + em->state = STATE_ATTACHED; } static void -server_handle_mq (echo_main_t * em) +vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t * + mp) { - svm_msg_q_msg_t msg; - session_event_t *e; - - while (1) + if (mp->retval) { - int rc = svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_TIMEDWAIT, 1); - if (PREDICT_FALSE (rc == ETIMEDOUT && em->time_to_stop)) - break; - if (rc == ETIMEDOUT) - continue; - e = svm_msg_q_msg_data (em->our_event_queue, &msg); - handle_mq_event (em, e, em->state == STATE_READY /* handle_rx */ ); - svm_msg_q_free_msg (em->our_event_queue, &msg); + ECHO_FAIL ("detach returned with err: %d", mp->retval); + return; } + echo_main.state = STATE_DETACHED; } + static void -server_send_listen (echo_main_t * em) +vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp) { - vl_api_bind_uri_t *bmp; - bmp = vl_msg_api_alloc (sizeof (*bmp)); - clib_memset (bmp, 0, sizeof (*bmp)); + echo_main_t *em = &echo_main; + u64 segment_handle = clib_net_to_host_u64 (mp->segment_handle); - bmp->_vl_msg_id = ntohs (VL_API_BIND_URI); - bmp->client_index = em->my_client_index; - bmp->context = ntohl (0xfeedface); - memcpy (bmp->uri, em->uri, vec_len (em->uri)); - vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & bmp); + clib_spinlock_lock (&em->segment_handles_lock); + hash_unset (em->shared_segment_handles, segment_handle); + clib_spinlock_unlock (&em->segment_handles_lock); + DBG ("Unmaped segment 0x%lx", segment_handle); } static void -server_send_unbind (echo_main_t * em) +vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) { - vl_api_unbind_uri_t *ump; - - ump = vl_msg_api_alloc (sizeof (*ump)); - clib_memset (ump, 0, sizeof (*ump)); + fifo_segment_main_t *sm = &echo_main.segment_main; + fifo_segment_create_args_t _a, *a = &_a; + echo_main_t *em = &echo_main; + int *fds = 0; + char *seg_name = (char *) mp->segment_name; + u64 segment_handle = clib_net_to_host_u64 (mp->segment_handle); - ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI); - ump->client_index = em->my_client_index; - memcpy (ump->uri, em->uri, vec_len (em->uri)); - vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & ump); + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) + { + vec_validate (fds, 1); + vl_socket_client_recv_fd_msg (fds, 1, 5); + if (ssvm_segment_attach (seg_name, SSVM_SEGMENT_MEMFD, fds[0])) + { + ECHO_FAIL ("svm_fifo_segment_attach ('%s')" + "failed on SSVM_SEGMENT_MEMFD", seg_name); + return; + } + vec_free (fds); + } + else + { + clib_memset (a, 0, sizeof (*a)); + a->segment_name = seg_name; + a->segment_size = mp->segment_size; + /* Attach to the segment vpp created */ + if (fifo_segment_attach (sm, a)) + { + ECHO_FAIL ("svm_fifo_segment_attach ('%s') failed", seg_name); + return; + } + } + clib_spinlock_lock (&em->segment_handles_lock); + hash_set (em->shared_segment_handles, segment_handle, 1); + clib_spinlock_unlock (&em->segment_handles_lock); + DBG ("Mapped segment 0x%lx", segment_handle); } -static int -server_run (echo_main_t * em) +static void +vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) { - echo_session_t *session; - int i; - - /* $$$$ hack preallocation */ - for (i = 0; i < 200000; i++) + echo_main_t *em = &echo_main; + if (mp->retval) { - pool_get (em->sessions, session); - clib_memset (session, 0, sizeof (*session)); + ECHO_FAIL ("bind failed: %U", format_api_error, + clib_net_to_host_u32 (mp->retval)); + return; } - for (i = 0; i < 200000; i++) - pool_put_index (em->sessions, i); - - if (application_attach (em)) - return -1; - - /* Bind to uri */ - server_send_listen (em); - if (wait_for_state_change (em, STATE_READY, 0)) - return -2; - /* Enter handle event loop */ - server_handle_mq (em); + em->state = STATE_LISTEN; +} - /* Cleanup */ - server_send_unbind (em); - application_detach (em); - fformat (stdout, "Test complete...\n"); - return 0; +static void +vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp) +{ + echo_session_t *listen_session; + echo_main_t *em = &echo_main; + if (mp->retval != 0) + { + /* ECHO_FAIL ("returned %d", ntohl (mp->retval)); + return; FIXME : UDPC issue */ + } + em->state = STATE_DISCONNECTED; + listen_session = pool_elt_at_index (em->sessions, em->listen_session_index); + echo_cleanup_session (em, listen_session); } static void @@ -1610,18 +1640,25 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * mp) { echo_main_t *em = &echo_main; + echo_session_t *s; uword *p; - DBG ("Got disonnected reply for session handle %lu", mp->handle); - em->state = STATE_START; + if (mp->retval) + { + ECHO_FAIL ("vpp complained about disconnect: %d", ntohl (mp->retval)); + return; + } + + DBG ("Got disonnected reply for session 0x%lx", mp->handle); p = hash_get (em->session_index_by_vpp_handles, mp->handle); - if (p) - hash_unset (em->session_index_by_vpp_handles, mp->handle); - else - clib_warning ("couldn't find session key %llx", mp->handle); + if (!p) + { + ECHO_FAIL ("couldn't find session key %llx", mp->handle); + return; + } - if (mp->retval) - clib_warning ("vpp complained about disconnect: %d", ntohl (mp->retval)); + s = pool_elt_at_index (em->sessions, p[0]); + echo_cleanup_session (em, s); } static void @@ -1629,7 +1666,7 @@ static void (vl_api_application_tls_cert_add_reply_t * mp) { if (mp->retval) - clib_warning ("failed to add tls cert"); + ECHO_FAIL ("failed to add tls cert"); } static void @@ -1637,7 +1674,7 @@ static void (vl_api_application_tls_key_add_reply_t * mp) { if (mp->retval) - clib_warning ("failed to add tls key"); + ECHO_FAIL ("failed to add tls key"); } #define foreach_quic_echo_msg \ @@ -1647,6 +1684,7 @@ _(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ _(APPLICATION_ATTACH_REPLY, application_attach_reply) \ _(APPLICATION_DETACH_REPLY, application_detach_reply) \ _(MAP_ANOTHER_SEGMENT, map_another_segment) \ +_(UNMAP_SEGMENT, unmap_segment) \ _(APPLICATION_TLS_CERT_ADD_REPLY, application_tls_cert_add_reply) \ _(APPLICATION_TLS_KEY_ADD_REPLY, application_tls_key_add_reply) \ @@ -1664,6 +1702,12 @@ quic_echo_api_hookup (echo_main_t * em) #undef _ } +/* + * + * End Session API handlers + * + */ + static void print_usage_and_exit (void) { @@ -1674,6 +1718,8 @@ print_usage_and_exit (void) " use-svm-api Use SVM API to connect to VPP\n" " test-bytes[:assert] Check data correctness when receiving (assert fails on first error)\n" " fifo-size N Use N Kb fifos\n" + " rx-buf N Use N Kb RX buffer\n" + " tx-buf N Use N Kb TX test buffer\n" " appns NAMESPACE Use the namespace NAMESPACE\n" " all-scope all-scope option\n" " local-scope local-scope option\n" @@ -1683,11 +1729,13 @@ print_usage_and_exit (void) " quic-setup OPT OPT=serverstream : Client open N connections. On each one server opens M streams\n" " by default : Client open N connections. On each one client opens M streams\n" "\n" - " no-return Drop the data when received, dont reply\n" " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n" - " send N[Kb|Mb|GB] Send N [K|M|G]bytes\n" - " recv N[Kb|Mb|GB] Expect N [K|M|G]bytes\n" - " nclients N[/M] Open N QUIC connections, each one with M streams (M defaults to 1)\n"); + " TX=1337[Kb|Mb|GB] Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n" + " RX=1337[Kb|Mb|GB] Expect 1337 [K|M|G]bytes\n" + "\n" + "Default config is :\n" + "server nclients 1/1 RX=64Kb TX=RX\n" + "client nclients 1/1 RX=64Kb TX=64Kb\n"); exit (1); } @@ -1714,8 +1762,6 @@ quic_echo_process_opts (int argc, char **argv) em->i_am_master = 1; else if (unformat (a, "client")) em->i_am_master = 0; - else if (unformat (a, "no-return")) - em->no_return = 1; else if (unformat (a, "test-bytes:assert")) em->test_return_packets = RETURN_PACKETS_ASSERT; else if (unformat (a, "test-bytes")) @@ -1726,6 +1772,10 @@ quic_echo_process_opts (int argc, char **argv) em->use_sock_api = 0; else if (unformat (a, "fifo-size %d", &tmp)) em->fifo_size = tmp << 10; + else if (unformat (a, "rx-buf %d", &tmp)) + em->rx_buf_size = tmp << 10; + else if (unformat (a, "tx-buf %d", &tmp)) + em->rx_buf_size = tmp << 10; else if (unformat (a, "nclients %d/%d", &em->n_clients, &em->n_stream_clients)) @@ -1745,12 +1795,11 @@ quic_echo_process_opts (int argc, char **argv) ; else if (unformat (a, "quic-setup %U", echo_unformat_quic_setup_vft)) ; - else - if (unformat (a, "send %U", echo_unformat_data, &em->bytes_to_send)) + else if (unformat (a, "TX=RX")) + em->data_source = ECHO_RX_DATA_SOURCE; + else if (unformat (a, "TX=%U", unformat_data, &em->bytes_to_send)) ; - else - if (unformat - (a, "recv %U", echo_unformat_data, &em->bytes_to_receive)) + else if (unformat (a, "RX=%U", unformat_data, &em->bytes_to_receive)) ; else if (unformat (a, "time %U:%U", echo_unformat_timing_event, &em->timing_start_event, @@ -1759,6 +1808,23 @@ quic_echo_process_opts (int argc, char **argv) else print_usage_and_exit (); } + + if (em->bytes_to_receive == (u64) ~ 0) + em->bytes_to_receive = 64 << 10; /* default */ + if (em->bytes_to_send == (u64) ~ 0) + em->bytes_to_send = 64 << 10; /* default */ + else if (em->bytes_to_send == 0) + em->data_source = ECHO_NO_DATA_SOURCE; + else + em->data_source = ECHO_TEST_DATA_SOURCE; + + if (em->data_source == ECHO_INVALID_DATA_SOURCE) + em->data_source = + em->i_am_master ? ECHO_RX_DATA_SOURCE : ECHO_TEST_DATA_SOURCE; + if (em->data_source == ECHO_RX_DATA_SOURCE) + em->bytes_to_send = em->bytes_to_receive; + + em->send_disconnects = !em->i_am_master; } int @@ -1767,8 +1833,7 @@ main (int argc, char **argv) echo_main_t *em = &echo_main; fifo_segment_main_t *sm = &em->segment_main; char *app_name; - int i, rv; - u32 n_clients; + u32 n_clients, i; clib_mem_init_thread_safe (0, 256 << 20); clib_memset (em, 0, sizeof (*em)); @@ -1786,8 +1851,11 @@ main (int argc, char **argv) em->test_return_packets = RETURN_PACKETS_NOTEST; em->timing_start_event = ECHO_EVT_FIRST_QCONNECT; em->timing_end_event = ECHO_EVT_LAST_BYTE; - em->bytes_to_receive = 64 << 10; - em->bytes_to_send = 64 << 10; + em->bytes_to_receive = ~0; /* defaulted when we know if server/client */ + em->bytes_to_send = ~0; /* defaulted when we know if server/client */ + em->rx_buf_size = 1 << 20; + em->tx_buf_size = 1 << 20; + em->data_source = ECHO_INVALID_DATA_SOURCE; em->uri = format (0, "%s%c", "quic://0.0.0.0/1234", 0); em->cb_vft = default_cb_vft; quic_echo_process_opts (argc, argv); @@ -1799,9 +1867,8 @@ main (int argc, char **argv) init_error_string_table (em); fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20); clib_spinlock_init (&em->segment_handles_lock); - vec_validate (em->rx_buf, 4 << 20); - vec_validate (em->connect_test_data, 1024 * 1024 - 1); - for (i = 0; i < vec_len (em->connect_test_data); i++) + vec_validate (em->connect_test_data, em->tx_buf_size); + for (i = 0; i < em->tx_buf_size; i++) em->connect_test_data[i] = i & 0xff; setup_signal_handlers (); @@ -1816,20 +1883,33 @@ main (int argc, char **argv) } quic_echo_notify_event (em, ECHO_EVT_START); + + application_send_attach (em); + if (wait_for_state_change (em, STATE_ATTACHED, TIMEOUT)) + { + fformat (stderr, "Couldn't attach to vpp, exiting...\n"); + exit (1); + } if (em->i_am_master) - rv = server_run (em); + server_run (em); else - rv = clients_run (em); - if (rv) - exit (rv); + clients_run (em); quic_echo_notify_event (em, ECHO_EVT_EXIT); print_global_stats (em); - - /* Make sure detach finishes */ + echo_free_sessions (em); + echo_assert_test_suceeded (em); + application_detach (em); if (wait_for_state_change (em, STATE_DETACHED, TIMEOUT)) - exit (-1); - disconnect_from_vpp (em); - exit (0); + { + fformat (stderr, "Couldn't detach to vpp, exiting...\n"); + exit (1); + } + if (em->use_sock_api) + vl_socket_client_disconnect (); + else + vl_client_disconnect_from_vlib (); + fformat (stdout, "Test complete !\n"); + exit (em->has_failed); } /* diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index f099d074211..93c01628e80 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -184,18 +184,10 @@ quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet) tc = session_get_transport (udp_session); max_enqueue = svm_fifo_max_enqueue (f); - if (max_enqueue <= sizeof (session_dgram_hdr_t)) - { - QUIC_DBG (1, "Not enough space to enqueue header"); - return QUIC_ERROR_FULL_FIFO; - } - - max_enqueue -= sizeof (session_dgram_hdr_t); - - if (max_enqueue < len) + if (max_enqueue < SESSION_CONN_HDR_LEN + len) { QUIC_DBG (1, "Too much data to send, max_enqueue %u, len %u", - max_enqueue, len); + max_enqueue, len + SESSION_CONN_HDR_LEN); return QUIC_ERROR_FULL_FIFO; } @@ -243,10 +235,9 @@ static int quic_sendable_packet_count (session_t * udp_session) { u32 max_enqueue; + u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN; max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo); - return clib_min (max_enqueue / - (QUIC_MAX_PACKET_SIZE + sizeof (session_dgram_hdr_t)), - QUIC_SEND_PACKET_VEC_SIZE); + return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE); } static int @@ -259,7 +250,7 @@ quic_send_packets (quic_ctx_t * ctx) quicly_context_t *quicly_context; app_worker_t *app_wrk; application_t *app; - int err; + int err = 0; /* We have sctx, get qctx */ if (ctx->c_quic_ctx_id.is_stream) @@ -270,7 +261,10 @@ quic_send_packets (quic_ctx_t * ctx) ASSERT (!ctx->c_quic_ctx_id.is_stream); udp_session = - session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle); + session_get_from_handle_if_valid (ctx->c_quic_ctx_id.udp_session_handle); + if (!udp_session) + goto quicly_error; + conn = ctx->c_quic_ctx_id.conn; if (!conn) @@ -311,16 +305,21 @@ quic_send_packets (quic_ctx_t * ctx) } while (num_packets > 0 && num_packets == max_packets); +stop_sending: if (svm_fifo_set_event (udp_session->tx_fifo)) - session_send_io_evt_to_thread (udp_session->tx_fifo, SESSION_IO_EVT_TX); + if ((err = + session_send_io_evt_to_thread (udp_session->tx_fifo, + SESSION_IO_EVT_TX))) + clib_warning ("Event enqueue errored %d", err); -stop_sending: + QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo), + svm_fifo_max_dequeue (udp_session->rx_fifo)); quic_update_timer (ctx); return 0; quicly_error: - if ((err != QUICLY_ERROR_PACKET_IGNORED) & (err != - QUICLY_ERROR_FREE_CONNECTION)) + if (err && err != QUICLY_ERROR_PACKET_IGNORED + && err != QUICLY_ERROR_FREE_CONNECTION) clib_warning ("Quic error '%s'.", quic_format_err (err)); quic_connection_closed (ctx->c_c_index, ctx->c_thread_index); return 1; @@ -393,6 +392,31 @@ get_stream_session_from_stream (quicly_stream_t * stream) return session_get (ctx->c_s_index, stream_data->thread_index); } +static void +quic_ack_rx_data (session_t * stream_session) +{ + u32 max_deq; + quic_ctx_t *sctx; + svm_fifo_t *f; + quicly_stream_t *stream; + quic_stream_data_t *stream_data; + + sctx = + quic_ctx_get (stream_session->connection_index, + stream_session->thread_index); + ASSERT (sctx->c_quic_ctx_id.is_stream); + stream = sctx->c_quic_ctx_id.stream; + stream_data = (quic_stream_data_t *) stream->data; + + f = stream_session->rx_fifo; + max_deq = svm_fifo_max_dequeue (f); + + ASSERT (stream_data->app_rx_data_len >= max_deq); + quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq); + QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq); + stream_data->app_rx_data_len = max_deq; +} + static int quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, size_t len) @@ -413,26 +437,28 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, max_enq = svm_fifo_max_enqueue_prod (f); QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq); - if (off + len > max_enq) + if (off - stream_data->app_rx_data_len + len > max_enq) { - /* TODO : can we find a better solution, listening on RX fifo evts ? */ - QUIC_DBG (3, "Ingoring packet, RX fifo is full"); - return QUICLY_ERROR_PACKET_IGNORED; + QUIC_DBG (1, "Error RX fifo is full"); + return 1; } - if (off == 0) + if (off == stream_data->app_rx_data_len) { + /* Streams live on the same thread so (f, stream_data) should stay consistent */ rlen = svm_fifo_enqueue (f, len, (u8 *) src); + stream_data->app_rx_data_len += rlen; ASSERT (rlen >= len); - - quicly_stream_sync_recvbuf (stream, rlen); app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) app_worker_lock_and_send_event (app_wrk, stream_session, SESSION_IO_EVT_RX); + quic_ack_rx_data (stream_session); } else { - rlen = svm_fifo_enqueue_with_offset (f, off, len, (u8 *) src); + rlen = + svm_fifo_enqueue_with_offset (f, off - stream_data->app_rx_data_len, + len, (u8 *) src); ASSERT (rlen == 0); } return 0; @@ -443,11 +469,13 @@ quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta) { session_t *stream_session; svm_fifo_t *f; + int rv; stream_session = get_stream_session_from_stream (stream); f = stream_session->tx_fifo; - ASSERT (svm_fifo_dequeue_drop (f, delta) == delta); + rv = svm_fifo_dequeue_drop (f, delta); + ASSERT (rv == delta); quicly_stream_sync_sendbuf (stream, 0); } @@ -472,9 +500,9 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst, } else { - QUIC_DBG (3, "Wrote ALL"); *wrote_all = 1; *len = deq_max - off; + QUIC_DBG (3, "Wrote ALL, %u", *len); } /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */ @@ -538,6 +566,7 @@ quic_accept_stream (void *s) stream_data = (quic_stream_data_t *) stream->data; stream_data->ctx_id = sctx_id; stream_data->thread_index = sctx->c_thread_index; + stream_data->app_rx_data_len = 0; sctx->c_s_index = stream_session->session_index; stream_session->session_state = SESSION_STATE_CREATED; @@ -557,6 +586,9 @@ quic_accept_stream (void *s) quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR); return; } + svm_fifo_add_want_tx_ntf (stream_session->rx_fifo, + SVM_FIFO_WANT_TX_NOTIF_IF_FULL | + SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY); rv = app_worker_accept_notify (app_wrk, stream_session); if (rv) @@ -1211,6 +1243,10 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep) return app_worker_connect_notify (app_wrk, NULL, sep->opaque); } + svm_fifo_add_want_tx_ntf (stream_session->rx_fifo, + SVM_FIFO_WANT_TX_NOTIF_IF_FULL | + SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY); + stream_session->session_state = SESSION_STATE_READY; if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque)) { @@ -1225,6 +1261,7 @@ quic_connect_new_stream (session_endpoint_cfg_t * sep) stream_data = (quic_stream_data_t *) stream->data; stream_data->ctx_id = sctx->c_c_index; stream_data->thread_index = sctx->c_thread_index; + stream_data->app_rx_data_len = 0; return 0; } @@ -1798,6 +1835,17 @@ quic_del_segment_callback (u32 client_index, u64 seg_handle) return 0; } + +static int +quic_custom_app_rx_callback (transport_connection_t * tc) +{ + session_t *stream_session = session_get (tc->s_index, tc->thread_index); + QUIC_DBG (2, "Received app READ notification"); + quic_ack_rx_data (stream_session); + svm_fifo_reset_tx_ntf (stream_session->rx_fifo); + return 0; +} + static int quic_custom_tx_callback (void *s) { @@ -1806,7 +1854,6 @@ quic_custom_tx_callback (void *s) quic_ctx_t *ctx; int rv; - svm_fifo_unset_event (stream_session->tx_fifo); if (PREDICT_FALSE (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING)) return 0; @@ -1818,6 +1865,11 @@ quic_custom_tx_callback (void *s) goto tx_end; /* Most probably a reschedule */ } + QUIC_DBG (3, "Stream TX event"); + quic_ack_rx_data (stream_session); + if (!svm_fifo_max_dequeue (stream_session->tx_fifo)) + return 0; + stream = ctx->c_quic_ctx_id.stream; if (!quicly_sendstate_is_open (&stream->sendstate)) { @@ -2013,6 +2065,7 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle, * reset, then the next CID is highly likely to contain a non-authenticating * CID, ... */ QUIC_DBG (2, "Sending stateless reset"); + int rv; quicly_datagram_t *dgram; session_t *udp_session; if (packet.cid.dest.plaintext.node_id == 0 @@ -2023,7 +2076,11 @@ quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle, if (dgram == NULL) return 1; udp_session = session_get_from_handle (udp_session_handle); - return quic_send_datagram (udp_session, dgram); /* TODO : set event on fifo */ + rv = quic_send_datagram (udp_session, dgram); + if (svm_fifo_set_event (udp_session->tx_fifo)) + session_send_io_evt_to_thread (udp_session->tx_fifo, + SESSION_IO_EVT_TX); + return rv; } return 0; } @@ -2041,8 +2098,7 @@ quic_app_rx_callback (session_t * udp_session) struct sockaddr_in6 sa6; struct sockaddr *sa = (struct sockaddr *) &sa6; socklen_t salen; - u32 max_deq, len, full_len, ctx_index = UINT32_MAX, ctx_thread = - UINT32_MAX, ret; + u32 max_deq, full_len, ctx_index = UINT32_MAX, ctx_thread = UINT32_MAX, ret; u8 *data; int err; u32 *opening_ctx_pool, *ctx_index_ptr; @@ -2050,7 +2106,6 @@ quic_app_rx_callback (session_t * udp_session) u64 udp_session_handle = session_handle (udp_session); int rv = 0; u32 thread_index = vlib_get_thread_index (); - app = application_get_if_valid (app_index); if (!app) { @@ -2063,24 +2118,23 @@ quic_app_rx_callback (session_t * udp_session) { udp_session = session_get_from_handle (udp_session_handle); /* session alloc might have happened */ f = udp_session->rx_fifo; - svm_fifo_unset_event (f); max_deq = svm_fifo_max_dequeue (f); - if (max_deq < sizeof (session_dgram_hdr_t)) + if (max_deq == 0) return 0; - ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph); - if (ret != SESSION_CONN_HDR_LEN) + if (max_deq < SESSION_CONN_HDR_LEN) { - QUIC_DBG (1, "Not enough data for header in RX"); + QUIC_DBG (1, "Not enough data for even a header in RX"); return 1; } - if (ph.data_length < ph.data_offset) + ret = svm_fifo_peek (f, 0, SESSION_CONN_HDR_LEN, (u8 *) & ph); + if (ret != SESSION_CONN_HDR_LEN) { - QUIC_DBG (1, "Not enough data vs offset in RX"); + QUIC_DBG (1, "Not enough data for header in RX"); return 1; } - len = ph.data_length - ph.data_offset; - full_len = ph.data_length + ph.data_offset + SESSION_CONN_HDR_LEN; + ASSERT (ph.data_offset == 0); + full_len = ph.data_length + SESSION_CONN_HDR_LEN; if (full_len > max_deq) { QUIC_DBG (1, "Not enough data in fifo RX"); @@ -2090,9 +2144,7 @@ quic_app_rx_callback (session_t * udp_session) /* Quicly can read len bytes from the fifo at offset: * ph.data_offset + SESSION_CONN_HDR_LEN */ data = malloc (ph.data_length); - ret = - svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN, - ph.data_length, data); + ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN, ph.data_length, data); if (ret != ph.data_length) { QUIC_DBG (1, "Not enough data peeked in RX"); @@ -2100,15 +2152,10 @@ quic_app_rx_callback (session_t * udp_session) return 1; } - plen = - quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet, - data, len); - rv = 0; quic_build_sockaddr (sa, &salen, &ph.rmt_ip, ph.rmt_port, ph.is_ip4); - plen = - quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, &packet, - data, len); + plen = quicly_decode_packet ((quicly_context_t *) app->quicly_ctx, + &packet, data, ph.data_length); if (plen != SIZE_MAX) { @@ -2157,9 +2204,7 @@ quic_app_rx_callback (session_t * udp_session) } } ctx_search_done: - svm_fifo_dequeue_drop (f, - ph.data_length + ph.data_offset + - SESSION_CONN_HDR_LEN); + svm_fifo_dequeue_drop (f, full_len); free (data); } while (1); @@ -2233,6 +2278,7 @@ static const transport_proto_vft_t quic_proto = { .get_connection = quic_connection_get, .get_listener = quic_listener_get, .update_time = quic_update_time, + .app_rx_evt = quic_custom_app_rx_callback, .custom_tx = quic_custom_tx_callback, .format_connection = format_quic_connection, .format_half_open = format_quic_half_open, diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h index 3ba0455d733..3ecb04f0c63 100644 --- a/src/plugins/quic/quic.h +++ b/src/plugins/quic/quic.h @@ -32,8 +32,6 @@ **/ #define QUIC_DEBUG 0 -#define QUIC_DEBUG_LEVEL_CLIENT 0 -#define QUIC_DEBUG_LEVEL_SERVER 0 #define QUIC_DEFAULT_CA_CERT_PATH "/etc/ssl/certs/ca-certificates.crt" @@ -95,6 +93,7 @@ typedef struct quic_stream_data_ { u32 ctx_id; u32 thread_index; + u32 app_rx_data_len; /* bytes received, to be read by external app */ } quic_stream_data_t; typedef struct quic_worker_ctx_ diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h index b5b26ac479b..ce73be2d0f0 100644 --- a/src/svm/svm_fifo.h +++ b/src/svm/svm_fifo.h @@ -45,6 +45,7 @@ typedef enum svm_fifo_tx_ntf_ SVM_FIFO_NO_TX_NOTIF = 0, SVM_FIFO_WANT_TX_NOTIF = 1, SVM_FIFO_WANT_TX_NOTIF_IF_FULL = 2, + SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY = 4, } svm_fifo_tx_ntf_t; typedef struct @@ -820,15 +821,18 @@ svm_fifo_needs_tx_ntf (svm_fifo_t * f, u32 n_last_deq) return 0; else if (want_ntf & SVM_FIFO_WANT_TX_NOTIF) return 1; - else if (want_ntf & SVM_FIFO_WANT_TX_NOTIF_IF_FULL) + if (want_ntf & SVM_FIFO_WANT_TX_NOTIF_IF_FULL) { u32 max_deq = svm_fifo_max_dequeue_cons (f); u32 nitems = f->nitems; if (!f->has_tx_ntf && max_deq < nitems && max_deq + n_last_deq >= nitems) return 1; - - return 0; + } + if (want_ntf & SVM_FIFO_WANT_TX_NOTIF_IF_EMPTY) + { + if (!f->has_tx_ntf && svm_fifo_is_empty (f)) + return 1; } return 0; } diff --git a/test/test_quic.py b/test/test_quic.py index 63b86b6efeb..0592c83ef40 100644 --- a/test/test_quic.py +++ b/test/test_quic.py @@ -8,23 +8,19 @@ import signal from framework import VppTestCase, VppTestRunner, running_extended_tests, \ Worker from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath -from threading import Event class QUICAppWorker(Worker): """ QUIC Test Application Worker """ process = None - def __init__(self, build_dir, appname, args, logger, env={}, event=None): + def __init__(self, build_dir, appname, args, logger, env={}): app = "%s/vpp/bin/%s" % (build_dir, appname) self.args = [app] + args - self.event = event super(QUICAppWorker, self).__init__(self.args, logger, env) def run(self): super(QUICAppWorker, self).run() - if self.event: - self.event.set() def teardown(self, logger, timeout): if self.process is None: @@ -172,7 +168,6 @@ class QUICEchoExternalTestCase(QUICTestCase): ["server", "appns", "server", "quic-setup", self.quic_setup] self.client_echo_test_args = common_args + \ ["client", "appns", "client", "quic-setup", self.quic_setup] - self.event = Event() def server(self, *args): _args = self.server_echo_test_args + list(args) @@ -180,8 +175,7 @@ class QUICEchoExternalTestCase(QUICTestCase): self.build_dir, "quic_echo", _args, - self.logger, - event=self.event) + self.logger) self.worker_server.start() self.sleep(self.pre_test_sleep) @@ -192,10 +186,10 @@ class QUICEchoExternalTestCase(QUICTestCase): self.build_dir, "quic_echo", _args, - self.logger, - event=self.event) + self.logger) self.worker_client.start() - self.event.wait(self.timeout) + self.worker_client.join() + self.worker_server.join() self.sleep(self.post_test_sleep) def validate_external_test_results(self): @@ -210,7 +204,7 @@ class QUICEchoExternalTestCase(QUICTestCase): self.logger, self.timeout) if self.worker_client.result is None: self.worker_client.teardown(self.logger, self.timeout) - self.assertIsNone(server_result, "Wrong server worker return code") + self.assertEqual(server_result, 0, "Wrong server worker return code") self.assertIsNotNone( client_result, "Timeout! Client worker did not finish in %ss" % @@ -234,8 +228,8 @@ class QUICEchoExternalServerStreamTestCase(QUICEchoExternalTestCase): @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_external_transfer_server_stream(self): - self.server("nclients", "1/1", "send", "1Kb", "recv", "0") - self.client("nclients", "1/1", "send", "0", "recv", "1Kb") + self.server("TX=1Kb", "RX=0") + self.client("TX=0", "RX=1Kb") self.validate_external_test_results() @@ -245,8 +239,8 @@ class QUICEchoExternalServerStreamWorkersTestCase(QUICEchoExternalTestCase): @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_external_transfer_server_stream_multi_workers(self): - self.server("nclients", "4/4", "send", "1Kb", "recv", "0") - self.client("nclients", "4/4", "send", "0", "recv", "1Kb") + self.server("nclients", "4/4", "TX=1Kb", "RX=0") + self.client("nclients", "4/4", "TX=0", "RX=1Kb") self.validate_external_test_results() |