From 3a34b1d9366551bbff7eabff3ab48ec8189249e2 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Fri, 3 May 2019 14:20:27 +0200 Subject: Add QUIC multistream support Change-Id: I9dc746b8c62c3e7ee2f65e34a1a1dca243ed4bd9 Signed-off-by: Nathan Skrzypczak --- src/tests/vnet/session/quic_echo.c | 242 +++++++++++++++++++++++++++++-------- 1 file changed, 194 insertions(+), 48 deletions(-) (limited to 'src/tests/vnet') diff --git a/src/tests/vnet/session/quic_echo.c b/src/tests/vnet/session/quic_echo.c index 7d0cedd1d6b..29b33f5aeb7 100644 --- a/src/tests/vnet/session/quic_echo.c +++ b/src/tests/vnet/session/quic_echo.c @@ -38,10 +38,10 @@ #include #undef vl_printfun -#define TCP_ECHO_DBG 0 -#define DBG(_fmt,_args...) \ - if (TCP_ECHO_DBG) \ - clib_warning (_fmt, _args) +#define QUIC_ECHO_DBG 0 +#define DBG(_fmt, _args...) \ + if (QUIC_ECHO_DBG) \ + clib_warning (_fmt, ##_args) typedef struct { @@ -68,6 +68,13 @@ typedef enum STATE_DETACHED } connection_state_t; +enum quic_session_type_t +{ + QUIC_SESSION_TYPE_QUIC = 0, + QUIC_SESSION_TYPE_STREAM = 1, + QUIC_SESSION_TYPE_LISTEN = INT32_MAX, +}; + typedef struct { /* vpe input queue */ @@ -85,6 +92,10 @@ typedef struct /* Hash table for disconnect processing */ uword *session_index_by_vpp_handles; + /* Hash table for shared segment_names */ + uword *shared_segment_names; + clib_spinlock_t segment_names_lock; + /* intermediate rx buffer */ u8 *rx_buf; @@ -140,6 +151,7 @@ typedef struct * vpp. If sock api is used, shm binary api is subsequently bootstrapped * and all other messages are exchanged using shm IPC. */ u8 use_sock_api; + int max_test_msg; fifo_segment_main_t segment_main; } echo_main_t; @@ -182,19 +194,54 @@ init_error_string_table (echo_main_t * em) static void handle_mq_event (session_event_t * e); +#if CLIB_DEBUG > 0 +#define TIMEOUT 10.0 +#else +#define TIMEOUT 10.0 +#endif + +static int +wait_for_segment_allocation (u64 segment_handle) +{ + echo_main_t *em = &echo_main; + f64 timeout; + timeout = clib_time_now (&em->clib_time) + TIMEOUT; + uword *segment_present; + DBG ("ASKING for %lu", segment_handle); + while (clib_time_now (&em->clib_time) < timeout) + { + clib_spinlock_lock (&em->segment_names_lock); + segment_present = hash_get (em->shared_segment_names, segment_handle); + clib_spinlock_unlock (&em->segment_names_lock); + if (segment_present != 0) + return 0; + if (em->time_to_stop == 1) + return 0; + } + DBG ("timeout waiting for segment_allocation %lu", segment_handle); + return -1; +} + +static int +wait_for_disconnected_sessions (echo_main_t * em) +{ + f64 timeout; + timeout = clib_time_now (&em->clib_time) + TIMEOUT; + while (clib_time_now (&em->clib_time) < timeout) + { + if (hash_elts (em->session_index_by_vpp_handles) == 0) + return 0; + } + DBG ("timeout waiting for disconnected_sessions"); + return -1; +} + static int wait_for_state_change (echo_main_t * em, connection_state_t state) { svm_msg_q_msg_t msg; session_event_t *e; f64 timeout; - -#if CLIB_DEBUG > 0 -#define TIMEOUT 600.0 -#else -#define TIMEOUT 600.0 -#endif - timeout = clib_time_now (&em->clib_time) + TIMEOUT; while (clib_time_now (&em->clib_time) < timeout) @@ -306,7 +353,6 @@ ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd) clib_warning ("svm_fifo_segment_attach ('%s') failed", name); return rv; } - vec_reset_length (a->new_segment_indices); return 0; } @@ -318,6 +364,9 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * echo_main_t *em = &echo_main; int *fds = 0; u32 n_fds = 0; + u64 segment_handle; + segment_handle = clib_net_to_host_u64 (mp->segment_handle); + DBG ("Attached returned app %u", htons (mp->app_index)); if (mp->retval) { @@ -361,6 +410,10 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t * -1)) goto failed; } + DBG ("SETTING for %lu", segment_handle); + clib_spinlock_lock (&em->segment_names_lock); + hash_set (em->shared_segment_names, segment_handle, 1); + clib_spinlock_unlock (&em->segment_names_lock); em->state = STATE_ATTACHED; return; @@ -456,7 +509,26 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) { fifo_segment_main_t *sm = &echo_main.segment_main; fifo_segment_create_args_t _a, *a = &_a; + echo_main_t *em = &echo_main; int rv; + int *fds = 0; + + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) + { + vec_validate (fds, 1); + vl_socket_client_recv_fd_msg (fds, 1, 5); + if (ssvm_segment_attach + ((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[0])) + clib_warning + ("svm_fifo_segment_attach ('%s') failed on SSVM_SEGMENT_MEMFD", + mp->segment_name); + DBG ("SETTING for %lu", mp->segment_name); + clib_spinlock_lock (&em->segment_names_lock); + hash_set (em->shared_segment_names, mp->segment_name, 1); + clib_spinlock_unlock (&em->segment_names_lock); + vec_free (fds); + return; + } clib_memset (a, 0, sizeof (*a)); a->segment_name = (char *) mp->segment_name; @@ -471,6 +543,9 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) } clib_warning ("Mapped new segment '%s' size %d", mp->segment_name, mp->segment_size); + clib_spinlock_lock (&em->segment_names_lock); + hash_set (em->shared_segment_names, mp->segment_name, 1); + clib_spinlock_unlock (&em->segment_names_lock); } static void @@ -486,16 +561,21 @@ session_print_stats (echo_main_t * em, echo_session_t * session) } static void -test_recv_bytes (echo_session_t * s, u8 * rx_buf, u32 n_read) +test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf, + u32 n_read) { int i; for (i = 0; i < n_read; i++) { - if (rx_buf[i] != ((s->bytes_received + i) & 0xff)) + if (rx_buf[i] != ((s->bytes_received + i) & 0xff) + && em->max_test_msg > 0) { clib_warning ("error at byte %lld, 0x%x not 0x%x", s->bytes_received + i, rx_buf[i], ((s->bytes_received + i) & 0xff)); + em->max_test_msg--; + if (em->max_test_msg == 0) + clib_warning ("Too many errors, hiding next ones"); } } } @@ -517,12 +597,13 @@ recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf) if (n_read > 0) { if (em->test_return_packets) - test_recv_bytes (s, rx_buf, n_read); + test_recv_bytes (em, s, rx_buf, n_read); n_to_read -= n_read; s->bytes_received += n_read; s->bytes_to_receive -= n_read; + ASSERT (s->bytes_to_receive >= 0); } else break; @@ -577,8 +658,9 @@ client_thread_fn (void *arg) break; } - clib_warning ("GOT OUT"); - DBG ("session %d done", session_index); + DBG ("session %d done send %lu to do, %lu done || recv %lu to do, %lu done", + session_index, s->bytes_to_send, s->bytes_sent, s->bytes_to_receive, + s->bytes_received); em->tx_total += s->bytes_sent; em->rx_total += s->bytes_received; em->n_active_clients--; @@ -587,7 +669,7 @@ client_thread_fn (void *arg) } void -client_send_connect (echo_main_t * em) +client_send_connect (echo_main_t * em, u8 * uri, u32 opaque) { vl_api_connect_uri_t *cmp; cmp = vl_msg_api_alloc (sizeof (*cmp)); @@ -595,8 +677,8 @@ client_send_connect (echo_main_t * em) cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI); cmp->client_index = em->my_client_index; - cmp->context = ntohl (0xfeedface); - memcpy (cmp->uri, em->connect_uri, vec_len (em->connect_uri)); + cmp->context = ntohl (opaque); + memcpy (cmp->uri, uri, vec_len (uri)); vl_msg_api_send_shmem (em->vl_input_queue, (u8 *) & cmp); } @@ -640,6 +722,13 @@ session_bound_handler (session_bound_msg_t * mp) em->state = STATE_READY; } +static void +quic_qsession_accepted_handler (session_accepted_msg_t * mp) +{ + DBG ("Accept on QSession index %u", mp->handle); +} + + static void session_accepted_handler (session_accepted_msg_t * mp) { @@ -650,8 +739,11 @@ session_accepted_handler (session_accepted_msg_t * mp) echo_session_t *session; static f64 start_time; u32 session_index; + u64 segment_handle; u8 *ip_str; + segment_handle = mp->segment_handle; + if (start_time == 0.0) start_time = clib_time_now (&em->clib_time); @@ -662,7 +754,14 @@ session_accepted_handler (session_accepted_msg_t * mp) /* Allocate local session and set it up */ pool_get (em->sessions, session); session_index = session - em->sessions; + DBG ("Setting session_index %lu", session_index); + if (wait_for_segment_allocation (segment_handle)) + { + clib_warning ("timeout waiting for segment allocation %lu", + segment_handle); + return; + } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); @@ -670,12 +769,28 @@ session_accepted_handler (session_accepted_msg_t * mp) session->rx_fifo = rx_fifo; session->tx_fifo = tx_fifo; + session->vpp_session_handle = mp->handle; session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *); /* Add it to lookup table */ hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + /* + * Send accept reply to vpp + */ + app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, + SESSION_CTRL_EVT_ACCEPTED_REPLY); + rmp = (session_accepted_reply_msg_t *) app_evt->evt->data; + rmp->handle = mp->handle; + rmp->context = mp->context; + app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); + + /* TODO : this is very ugly */ + if (mp->rmt.is_ip4 != 255) + return quic_qsession_accepted_handler (mp); + DBG ("SSession handle is %lu", mp->handle); + em->state = STATE_READY; /* Stats printing */ @@ -687,20 +802,19 @@ session_accepted_handler (session_accepted_msg_t * mp) (f64) pool_elts (em->sessions) / (now - start_time)); } - /* - * Send accept reply to vpp - */ - app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, - SESSION_CTRL_EVT_ACCEPTED_REPLY); - rmp = (session_accepted_reply_msg_t *) app_evt->evt->data; - rmp->handle = mp->handle; - rmp->context = mp->context; - app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt); - session->bytes_received = 0; session->start = clib_time_now (&em->clib_time); } +static void +quic_session_connected_handler (session_connected_msg_t * mp) +{ + echo_main_t *em = &echo_main; + u8 *uri = format (0, "QUIC://session/%lu", mp->handle); + DBG ("QSession Connect : %s", uri); + client_send_connect (em, uri, QUIC_SESSION_TYPE_STREAM); +} + static void session_connected_handler (session_connected_msg_t * mp) { @@ -709,6 +823,8 @@ session_connected_handler (session_connected_msg_t * mp) u32 session_index; svm_fifo_t *rx_fifo, *tx_fifo; int rv; + u64 segment_handle; + segment_handle = mp->segment_handle; if (mp->retval) { @@ -725,7 +841,14 @@ session_connected_handler (session_connected_msg_t * mp) pool_get (em->sessions, session); clib_memset (session, 0, sizeof (*session)); session_index = session - em->sessions; + DBG ("Setting session_index %lu", session_index); + if (wait_for_segment_allocation (segment_handle)) + { + clib_warning ("timeout waiting for segment allocation %lu", + segment_handle); + return; + } rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *); rx_fifo->client_session_index = session_index; tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *); @@ -740,6 +863,11 @@ session_connected_handler (session_connected_msg_t * mp) hash_set (em->session_index_by_vpp_handles, mp->handle, session_index); + if (mp->context == QUIC_SESSION_TYPE_QUIC) + return quic_session_connected_handler (mp); + + DBG ("SSession Connected"); + /* * Start RX thread */ @@ -768,6 +896,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp) echo_session_t *session = 0; uword *p; int rv = 0; + DBG ("Got a SESSION_CTRL_EVT_DISCONNECTED for session %lu", mp->handle); p = hash_get (em->session_index_by_vpp_handles, mp->handle); if (!p) @@ -778,6 +907,7 @@ session_disconnected_handler (session_disconnected_msg_t * mp) session = pool_elt_at_index (em->sessions, p[0]); hash_unset (em->session_index_by_vpp_handles, mp->handle); + pool_put (em->sessions, session); app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt, @@ -830,18 +960,23 @@ handle_mq_event (session_event_t * e) switch (e->event_type) { case SESSION_CTRL_EVT_BOUND: + DBG ("SESSION_CTRL_EVT_BOUND"); session_bound_handler ((session_bound_msg_t *) e->data); break; case SESSION_CTRL_EVT_ACCEPTED: + DBG ("SESSION_CTRL_EVT_ACCEPTED"); session_accepted_handler ((session_accepted_msg_t *) e->data); break; case SESSION_CTRL_EVT_CONNECTED: + DBG ("SESSION_CTRL_EVT_CONNECTED"); session_connected_handler ((session_connected_msg_t *) e->data); break; case SESSION_CTRL_EVT_DISCONNECTED: + DBG ("SESSION_CTRL_EVT_DISCONNECTED"); session_disconnected_handler ((session_disconnected_msg_t *) e->data); break; case SESSION_CTRL_EVT_RESET: + DBG ("SESSION_CTRL_EVT_RESET"); session_reset_handler ((session_reset_msg_t *) e->data); break; default: @@ -856,6 +991,7 @@ clients_run (echo_main_t * em) svm_msg_q_msg_t msg; session_event_t *e; echo_session_t *s; + hash_pair_t *p; int i; /* Init test data */ @@ -870,7 +1006,7 @@ clients_run (echo_main_t * em) return; for (i = 0; i < em->n_clients; i++) - client_send_connect (em); + client_send_connect (em, em->connect_uri, QUIC_SESSION_TYPE_QUIC); start_time = clib_time_now (&em->clib_time); while (em->n_clients_connected < em->n_clients @@ -897,18 +1033,23 @@ clients_run (echo_main_t * em) /* * Initialize connections */ - for (i = 0; i < em->n_clients; i++) - { - s = pool_elt_at_index (em->sessions, i); + DBG ("Initialize connections on %u clients", em->n_clients); + + /* *INDENT-OFF* */ + hash_foreach_pair (p, em->session_index_by_vpp_handles, + ({ + s = pool_elt_at_index (em->sessions, p->value[0]); s->bytes_to_send = em->bytes_to_send; if (!em->no_return) s->bytes_to_receive = em->bytes_to_send; - } + })); + /* *INDENT-ON* */ em->n_active_clients = em->n_clients_connected; /* * Wait for client threads to send the data */ + DBG ("Waiting for data on %u clients", em->n_active_clients); start_time = clib_time_now (&em->clib_time); em->state = STATE_READY; while (em->n_active_clients) @@ -925,11 +1066,14 @@ clients_run (echo_main_t * em) svm_msg_q_free_msg (em->our_event_queue, &msg); } - for (i = 0; i < em->n_clients; i++) - { - s = pool_elt_at_index (em->sessions, i); + /* *INDENT-OFF* */ + hash_foreach_pair (p, em->session_index_by_vpp_handles, + ({ + s = pool_elt_at_index (em->sessions, p->value[0]); + DBG ("Sending disconnect on session %lu", p->key); client_disconnect (em, s); - } + })); + /* *INDENT-ON* */ /* * Stats and detach @@ -940,6 +1084,7 @@ clients_run (echo_main_t * em) em->tx_total / (1ULL << 30), deltat); fformat (stdout, "%.4f Gbit/second\n", (em->tx_total * 8.0) / deltat / 1e9); + wait_for_disconnected_sessions (em); application_detach (em); } @@ -1059,7 +1204,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e) int n_read, max_dequeue, n_sent; u32 offset, to_dequeue; echo_session_t *s; - s = pool_elt_at_index (em->sessions, e->session_index); /* Clear event only once. Otherwise, if we do it in the loop by calling @@ -1070,7 +1214,6 @@ server_handle_rx (echo_main_t * em, session_event_t * e) max_dequeue = svm_fifo_max_dequeue (s->rx_fifo); if (PREDICT_FALSE (!max_dequeue)) return; - do { /* The options here are to limit ourselves to max_dequeue or read @@ -1079,8 +1222,12 @@ server_handle_rx (echo_main_t * em, session_event_t * e) to_dequeue = clib_min (max_dequeue, vec_len (em->rx_buf)); n_read = app_recv_stream_raw (s->rx_fifo, em->rx_buf, to_dequeue, 0 /* clear evt */ , 0 /* peek */ ); + if (n_read > 0) { + if (em->test_return_packets) + test_recv_bytes (em, s, em->rx_buf, n_read); + max_dequeue -= n_read; s->bytes_received += n_read; } @@ -1127,10 +1274,10 @@ server_handle_mq (echo_main_t * em) if (rc == ETIMEDOUT) continue; e = svm_msg_q_msg_data (em->our_event_queue, &msg); - clib_warning ("Event %d", e->event_type); switch (e->event_type) { - case FIFO_EVENT_APP_RX: + case SESSION_IO_EVT_RX: + DBG ("SESSION_IO_EVT_RX"); server_handle_rx (em, e); break; default: @@ -1220,6 +1367,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * { echo_main_t *em = &echo_main; uword *p; + DBG ("Got disonnected reply for session %lu", mp->handle); if (mp->retval) { @@ -1300,12 +1448,14 @@ main (int argc, char **argv) clib_memset (em, 0, sizeof (*em)); em->session_index_by_vpp_handles = hash_create (0, sizeof (uword)); + em->shared_segment_names = hash_create (0, sizeof (uword)); + clib_spinlock_init (&em->segment_names_lock); em->my_pid = getpid (); - em->configured_segment_size = 1 << 20; em->socket_name = 0; em->use_sock_api = 1; em->fifo_size = 64 << 10; em->n_clients = 1; + em->max_test_msg = 50; clib_time_init (&em->clib_time); init_error_string_table (em); @@ -1320,10 +1470,6 @@ main (int argc, char **argv) } else if (unformat (a, "uri %s", &uri)) ; - else if (unformat (a, "segment-size %dM", &tmp)) - em->configured_segment_size = tmp << 20; - else if (unformat (a, "segment-size %dG", &tmp)) - em->configured_segment_size = tmp << 30; else if (unformat (a, "server")) i_am_server = 1; else if (unformat (a, "client")) -- cgit 1.2.3-korg