diff options
-rw-r--r-- | src/plugins/hs_apps/echo_client.c | 576 | ||||
-rw-r--r-- | src/plugins/hs_apps/echo_client.h | 61 |
2 files changed, 319 insertions, 318 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index d2ae252cd98..1d33a855240 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -15,22 +15,19 @@ * limitations under the License. */ -#include <vnet/vnet.h> -#include <vlibapi/api.h> -#include <vlibmemory/api.h> #include <hs_apps/echo_client.h> -echo_client_main_t echo_client_main; +static ec_main_t ec_main; -#define ECHO_CLIENT_DBG (0) -#define DBG(_fmt, _args...) \ - if (ECHO_CLIENT_DBG) \ - clib_warning (_fmt, ##_args) +#define EC_DBG (0) +#define DBG(_fmt, _args...) \ + if (EC_DBG) \ + clib_warning (_fmt, ##_args) static void signal_evt_to_cli_i (int *code) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; ASSERT (vlib_get_thread_index () == 0); vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, *code, 0); } @@ -45,8 +42,32 @@ signal_evt_to_cli (int code) signal_evt_to_cli_i (&code); } +static inline ec_worker_t * +ec_worker_get (u32 thread_index) +{ + return vec_elt_at_index (ec_main.wrk, thread_index); +} + +static inline ec_session_t * +ec_session_alloc (ec_worker_t *wrk) +{ + ec_session_t *ecs; + + pool_get_zero (wrk->sessions, ecs); + ecs->data.session_index = ecs - wrk->sessions; + ecs->thread_index = wrk->thread_index; + + return ecs; +} + +static inline ec_session_t * +ec_session_get (ec_worker_t *wrk, u32 ec_index) +{ + return pool_elt_at_index (wrk->sessions, ec_index); +} + static void -send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) +send_data_chunk (ec_main_t *ecm, ec_session_t *es) { u8 *test_data = ecm->connect_test_data; int test_buf_len, test_buf_offset, rv; @@ -54,27 +75,27 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) test_buf_len = vec_len (test_data); ASSERT (test_buf_len > 0); - test_buf_offset = s->bytes_sent % test_buf_len; - bytes_this_chunk = clib_min (test_buf_len - test_buf_offset, - s->bytes_to_send); + test_buf_offset = es->bytes_sent % test_buf_len; + bytes_this_chunk = + clib_min (test_buf_len - test_buf_offset, es->bytes_to_send); if (!ecm->is_dgram) { if (ecm->no_copy) { - svm_fifo_t *f = s->data.tx_fifo; + svm_fifo_t *f = es->data.tx_fifo; rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk); svm_fifo_enqueue_nocopy (f, rv); session_send_io_evt_to_thread_custom ( - &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX); + &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX); } else - rv = app_send_stream (&s->data, test_data + test_buf_offset, + rv = app_send_stream (&es->data, test_data + test_buf_offset, bytes_this_chunk, 0); } else { - svm_fifo_t *f = s->data.tx_fifo; + svm_fifo_t *f = es->data.tx_fifo; u32 max_enqueue = svm_fifo_max_enqueue_prod (f); if (max_enqueue < sizeof (session_dgram_hdr_t)) @@ -85,7 +106,7 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) if (ecm->no_copy) { session_dgram_hdr_t hdr; - app_session_transport_t *at = &s->data.transport; + app_session_transport_t *at = &es->data.transport; rv = clib_min (max_enqueue, bytes_this_chunk); @@ -101,12 +122,12 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr); svm_fifo_enqueue_nocopy (f, rv); session_send_io_evt_to_thread_custom ( - &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX); + &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX); } else { bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue); - rv = app_send_dgram (&s->data, test_data + test_buf_offset, + rv = app_send_dgram (&es->data, test_data + test_buf_offset, bytes_this_chunk, 0); } } @@ -115,45 +136,43 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) if (rv > 0) { /* Account for it... */ - s->bytes_to_send -= rv; - s->bytes_sent += rv; + es->bytes_to_send -= rv; + es->bytes_sent += rv; - if (ECHO_CLIENT_DBG) + if (EC_DBG) { - /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "tx-enq: xfer %d bytes, sent %u remain %u", .format_args = "i4i4i4", }; - /* *INDENT-ON* */ struct { u32 data[3]; } *ed; ed = ELOG_DATA (&vlib_global_main.elog_main, e); ed->data[0] = rv; - ed->data[1] = s->bytes_sent; - ed->data[2] = s->bytes_to_send; + ed->data[1] = es->bytes_sent; + ed->data[2] = es->bytes_to_send; } } } static void -receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) +receive_data_chunk (ec_worker_t *wrk, ec_session_t *es) { - svm_fifo_t *rx_fifo = s->data.rx_fifo; - u32 thread_index = vlib_get_thread_index (); + ec_main_t *ecm = &ec_main; + svm_fifo_t *rx_fifo = es->data.rx_fifo; int n_read, i; if (ecm->test_bytes) { if (!ecm->is_dgram) - n_read = app_recv_stream (&s->data, ecm->rx_buf[thread_index], - vec_len (ecm->rx_buf[thread_index])); + n_read = + app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf)); else - n_read = app_recv_dgram (&s->data, ecm->rx_buf[thread_index], - vec_len (ecm->rx_buf[thread_index])); + n_read = + app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf)); } else { @@ -163,15 +182,13 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) if (n_read > 0) { - if (ECHO_CLIENT_DBG) + if (EC_DBG) { - /* *INDENT-OFF* */ ELOG_TYPE_DECLARE (e) = { .format = "rx-deq: %d bytes", .format_args = "i4", }; - /* *INDENT-ON* */ struct { u32 data[1]; @@ -184,65 +201,64 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s) { for (i = 0; i < n_read; i++) { - if (ecm->rx_buf[thread_index][i] - != ((s->bytes_received + i) & 0xff)) + if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff)) { clib_warning ("read %d error at byte %lld, 0x%x not 0x%x", - n_read, s->bytes_received + i, - ecm->rx_buf[thread_index][i], - ((s->bytes_received + i) & 0xff)); + n_read, es->bytes_received + i, wrk->rx_buf[i], + ((es->bytes_received + i) & 0xff)); ecm->test_failed = 1; } } } - ASSERT (n_read <= s->bytes_to_receive); - s->bytes_to_receive -= n_read; - s->bytes_received += n_read; + ASSERT (n_read <= es->bytes_to_receive); + es->bytes_to_receive -= n_read; + es->bytes_received += n_read; } } static uword -echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, - vlib_frame_t * frame) +ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame) { - echo_client_main_t *ecm = &echo_client_main; - int my_thread_index = vlib_get_thread_index (); - eclient_session_t *sp; - int i; - int delete_session; - u32 *connection_indices; - u32 *connections_this_batch; - u32 nconnections_this_batch; - - connection_indices = ecm->connection_index_by_thread[my_thread_index]; - connections_this_batch = - ecm->connections_this_batch_by_thread[my_thread_index]; - - if ((ecm->run_test != ECHO_CLIENTS_RUNNING) || - ((vec_len (connection_indices) == 0) - && vec_len (connections_this_batch) == 0)) + u32 *conn_indices, *conns_this_batch, nconns_this_batch; + int thread_index = vm->thread_index, i, delete_session; + ec_main_t *ecm = &ec_main; + ec_worker_t *wrk; + ec_session_t *es; + session_t *s; + + if (ecm->run_test != EC_RUNNING) + return 0; + + wrk = ec_worker_get (thread_index); + conn_indices = wrk->conn_indices; + conns_this_batch = wrk->conns_this_batch; + + if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0)) return 0; /* Grab another pile of connections */ - if (PREDICT_FALSE (vec_len (connections_this_batch) == 0)) + if (PREDICT_FALSE (vec_len (conns_this_batch) == 0)) { - nconnections_this_batch = - clib_min (ecm->connections_per_batch, vec_len (connection_indices)); - - ASSERT (nconnections_this_batch > 0); - vec_validate (connections_this_batch, nconnections_this_batch - 1); - clib_memcpy_fast (connections_this_batch, - connection_indices + vec_len (connection_indices) - - nconnections_this_batch, - nconnections_this_batch * sizeof (u32)); - _vec_len (connection_indices) -= nconnections_this_batch; + nconns_this_batch = + clib_min (ecm->connections_per_batch, vec_len (conn_indices)); + + ASSERT (nconns_this_batch > 0); + vec_validate (conns_this_batch, nconns_this_batch - 1); + clib_memcpy_fast (conns_this_batch, + conn_indices + vec_len (conn_indices) - + nconns_this_batch, + nconns_this_batch * sizeof (u32)); + _vec_len (conn_indices) -= nconns_this_batch; } - if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch - && ecm->prev_conns == vec_len (connections_this_batch))) + /* + * Track progress + */ + if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch && + ecm->prev_conns == vec_len (conns_this_batch))) { ecm->repeats++; - ecm->prev_conns = vec_len (connections_this_batch); + ecm->prev_conns = vec_len (conns_this_batch); if (ecm->repeats == 500000) { clib_warning ("stuck clients"); @@ -250,32 +266,35 @@ echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, } else { - ecm->prev_conns = vec_len (connections_this_batch); + ecm->prev_conns = vec_len (conns_this_batch); ecm->repeats = 0; } - for (i = 0; i < vec_len (connections_this_batch); i++) + /* + * Handle connections in this batch + */ + for (i = 0; i < vec_len (conns_this_batch); i++) { - delete_session = 1; + es = ec_session_get (wrk, conns_this_batch[i]); - sp = pool_elt_at_index (ecm->sessions, connections_this_batch[i]); + delete_session = 1; - if (sp->bytes_to_send > 0) + if (es->bytes_to_send > 0) { - send_data_chunk (ecm, sp); + send_data_chunk (ecm, es); delete_session = 0; } - if (sp->bytes_to_receive > 0) + + if (es->bytes_to_receive > 0) { delete_session = 0; } + if (PREDICT_FALSE (delete_session == 1)) { - session_t *s; - - clib_atomic_fetch_add (&ecm->tx_total, sp->bytes_sent); - clib_atomic_fetch_add (&ecm->rx_total, sp->bytes_received); - s = session_get_from_handle_if_valid (sp->vpp_session_handle); + clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent); + clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received); + s = session_get_from_handle_if_valid (es->vpp_session_handle); if (s) { @@ -284,40 +303,38 @@ echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, a->app_index = ecm->app_index; vnet_disconnect_session (a); - vec_delete (connections_this_batch, 1, i); + vec_delete (conns_this_batch, 1, i); i--; clib_atomic_fetch_add (&ecm->ready_connections, -1); } else { clib_warning ("session AWOL?"); - vec_delete (connections_this_batch, 1, i); + vec_delete (conns_this_batch, 1, i); } /* Kick the debug CLI process */ if (ecm->ready_connections == 0) { - signal_evt_to_cli (2); + signal_evt_to_cli (EC_CLI_TEST_DONE); } } } - ecm->connection_index_by_thread[my_thread_index] = connection_indices; - ecm->connections_this_batch_by_thread[my_thread_index] = - connections_this_batch; + wrk->conn_indices = conn_indices; + wrk->conns_this_batch = conns_this_batch; return 0; } -VLIB_REGISTER_NODE (echo_clients_node) = -{ - .function = echo_client_node_fn, +VLIB_REGISTER_NODE (echo_clients_node) = { + .function = ec_node_fn, .name = "echo-clients", .type = VLIB_NODE_TYPE_INPUT, .state = VLIB_NODE_STATE_DISABLED, }; static void -ec_reset_runtime_config (echo_client_main_t *ecm) +ec_reset_runtime_config (ec_main_t *ecm) { ecm->n_clients = 1; ecm->quic_streams = 1; @@ -332,7 +349,7 @@ ec_reset_runtime_config (echo_client_main_t *ecm) ecm->test_failed = 0; ecm->tls_engine = CRYPTO_ENGINE_OPENSSL; ecm->no_copy = 0; - ecm->run_test = ECHO_CLIENTS_STARTING; + ecm->run_test = EC_STARTING; ecm->ready_connections = 0; ecm->connect_conn_index = 0; ecm->rx_total = 0; @@ -349,10 +366,11 @@ ec_reset_runtime_config (echo_client_main_t *ecm) } static int -echo_clients_init (vlib_main_t * vm) +ec_init (vlib_main_t *vm) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vlib_thread_main_t *vtm = vlib_get_thread_main (); + ec_worker_t *wrk; u32 num_threads; int i; @@ -380,32 +398,30 @@ echo_clients_init (vlib_main_t * vm) */ vlib_node_set_state (vm, session_queue_node.index, VLIB_NODE_STATE_POLLING); - - clib_spinlock_init (&ecm->sessions_lock); } /* App init done only once */ if (ecm->app_is_init) return 0; - num_threads = 1 /* main thread */ + vtm->n_threads; /* Init test data. Big buffer */ vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1); for (i = 0; i < vec_len (ecm->connect_test_data); i++) ecm->connect_test_data[i] = i & 0xff; - vec_validate (ecm->rx_buf, num_threads - 1); - for (i = 0; i < num_threads; i++) - vec_validate (ecm->rx_buf[i], vec_len (ecm->connect_test_data) - 1); + num_threads = 1 /* main thread */ + vtm->n_threads; + vec_validate (ecm->wrk, num_threads); + vec_foreach (wrk, ecm->wrk) + { + vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1); + wrk->thread_index = wrk - ecm->wrk; + wrk->vpp_event_queue = + session_main_get_vpp_event_queue (wrk->thread_index); + } ecm->app_is_init = 1; - vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains); - vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains); - vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains); - vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains); - vlib_worker_thread_barrier_sync (vm); vnet_session_enable_disable (vm, 1 /* turn on session and transports */); vlib_worker_thread_barrier_release (vm); @@ -419,38 +435,51 @@ echo_clients_init (vlib_main_t * vm) } static void -echo_clients_cleanup (echo_client_main_t *ecm) +ec_prealloc_sessions (ec_main_t *ecm) { - int i; + u32 sessions_per_wrk, n_wrks; + ec_worker_t *wrk; - for (i = 0; i < vec_len (ecm->connection_index_by_thread); i++) - { - vec_reset_length (ecm->connection_index_by_thread[i]); - vec_reset_length (ecm->connections_this_batch_by_thread[i]); - vec_reset_length (ecm->quic_session_index_by_thread[i]); - } + n_wrks = vlib_num_workers () ? vlib_num_workers () : 1; + + sessions_per_wrk = ecm->n_clients / n_wrks; + vec_foreach (wrk, ecm->wrk) + pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk); +} + +static void +ec_worker_cleanup (ec_worker_t *wrk) +{ + pool_free (wrk->sessions); + vec_free (wrk->conn_indices); + vec_free (wrk->conns_this_batch); +} + +static void +ec_cleanup (ec_main_t *ecm) +{ + ec_worker_t *wrk; + + vec_foreach (wrk, ecm->wrk) + ec_worker_cleanup (wrk); - pool_free (ecm->sessions); vec_free (ecm->connect_uri); vec_free (ecm->appns_id); - clib_spinlock_free (&ecm->sessions_lock); if (ecm->barrier_acq_needed) vlib_worker_thread_barrier_sync (ecm->vlib_main); } static int -quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context, - session_t * s, - session_error_t err) +quic_ec_qsession_connected_callback (u32 app_index, u32 api_context, + session_t *s, session_error_t err) { - echo_client_main_t *ecm = &echo_client_main; - vnet_connect_args_t *a = 0; - int rv; - u8 thread_index = vlib_get_thread_index (); session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; - u32 stream_n; + ec_main_t *ecm = &ec_main; + vnet_connect_args_t *a = 0; session_handle_t handle; + u32 stream_n; + int rv; DBG ("QUIC Connection handle %d", session_handle (s)); @@ -479,102 +508,94 @@ quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context, * 's' is no longer valid, its underlying pool could have been moved in * vnet_connect() */ - vec_add1 (ecm->quic_session_index_by_thread[thread_index], handle); vec_free (a); return 0; } static int -quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context, - session_t * s, - session_error_t err) +quic_ec_session_connected_callback (u32 app_index, u32 api_context, + session_t *s, session_error_t err) { - echo_client_main_t *ecm = &echo_client_main; - eclient_session_t *session; - u32 session_index; - u8 thread_index; + ec_main_t *ecm = &ec_main; + ec_session_t *es; + ec_worker_t *wrk; + u32 thread_index; - if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING)) + if (PREDICT_FALSE (ecm->run_test != EC_STARTING)) return -1; if (err) { clib_warning ("connection %d failed!", api_context); - ecm->run_test = ECHO_CLIENTS_EXITING; - signal_evt_to_cli (-1); + ecm->run_test = EC_EXITING; + signal_evt_to_cli (EC_CLI_CONNECTS_FAILED); return 0; } if (s->listener_handle == SESSION_INVALID_HANDLE) - return quic_echo_clients_qsession_connected_callback (app_index, - api_context, s, - err); + return quic_ec_qsession_connected_callback (app_index, api_context, s, + err); DBG ("STREAM Connection callback %d", api_context); thread_index = s->thread_index; ASSERT (thread_index == vlib_get_thread_index () || session_transport_service_type (s) == TRANSPORT_SERVICE_CL); - if (!ecm->vpp_event_queue[thread_index]) - ecm->vpp_event_queue[thread_index] = - session_main_get_vpp_event_queue (thread_index); + wrk = ec_worker_get (thread_index); /* * Setup session */ - clib_spinlock_lock_if_init (&ecm->sessions_lock); - pool_get (ecm->sessions, session); - clib_spinlock_unlock_if_init (&ecm->sessions_lock); - - clib_memset (session, 0, sizeof (*session)); - session_index = session - ecm->sessions; - session->bytes_to_send = ecm->bytes_to_send; - session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; - session->data.rx_fifo = s->rx_fifo; - session->data.rx_fifo->shr->client_session_index = session_index; - session->data.tx_fifo = s->tx_fifo; - session->data.tx_fifo->shr->client_session_index = session_index; - session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index]; - session->vpp_session_handle = session_handle (s); + es = ec_session_alloc (wrk); + + es->bytes_to_send = ecm->bytes_to_send; + es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; + es->data.rx_fifo = s->rx_fifo; + es->data.rx_fifo->shr->client_session_index = es->data.session_index; + es->data.tx_fifo = s->tx_fifo; + es->data.tx_fifo->shr->client_session_index = es->data.session_index; + es->data.vpp_evt_q = wrk->vpp_event_queue; + es->vpp_session_handle = session_handle (s); + es->vpp_session_index = s->session_index; + s->opaque = es->data.session_index; if (ecm->is_dgram) { transport_connection_t *tc; tc = session_get_transport (s); - clib_memcpy_fast (&session->data.transport, tc, - sizeof (session->data.transport)); - session->data.is_dgram = 1; + clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport)); + es->data.is_dgram = 1; } - vec_add1 (ecm->connection_index_by_thread[thread_index], session_index); + vec_add1 (wrk->conn_indices, es->data.session_index); clib_atomic_fetch_add (&ecm->ready_connections, 1); if (ecm->ready_connections == ecm->expected_connections) { - ecm->run_test = ECHO_CLIENTS_RUNNING; + ecm->run_test = EC_RUNNING; /* Signal the CLI process that the action is starting... */ - signal_evt_to_cli (1); + signal_evt_to_cli (EC_CLI_CONNECTS_DONE); } return 0; } static int -echo_clients_session_connected_callback (u32 app_index, u32 api_context, - session_t * s, session_error_t err) +ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s, + session_error_t err) { - echo_client_main_t *ecm = &echo_client_main; - eclient_session_t *session; - u32 session_index; - u8 thread_index; + ec_main_t *ecm = &ec_main; + ec_session_t *es; + u32 thread_index; + ec_worker_t *wrk; - if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING)) + if (PREDICT_FALSE (ecm->run_test != EC_STARTING)) return -1; if (err) { clib_warning ("connection %d failed!", api_context); - ecm->run_test = ECHO_CLIENTS_EXITING; - signal_evt_to_cli (-1); + ecm->run_test = EC_EXITING; + signal_evt_to_cli (EC_CLI_CONNECTS_FAILED); return 0; } @@ -582,53 +603,48 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context, ASSERT (thread_index == vlib_get_thread_index () || session_transport_service_type (s) == TRANSPORT_SERVICE_CL); - if (!ecm->vpp_event_queue[thread_index]) - ecm->vpp_event_queue[thread_index] = - session_main_get_vpp_event_queue (thread_index); + wrk = ec_worker_get (thread_index); /* * Setup session */ - clib_spinlock_lock_if_init (&ecm->sessions_lock); - pool_get (ecm->sessions, session); - clib_spinlock_unlock_if_init (&ecm->sessions_lock); - - clib_memset (session, 0, sizeof (*session)); - session_index = session - ecm->sessions; - session->bytes_to_send = ecm->bytes_to_send; - session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; - session->data.rx_fifo = s->rx_fifo; - session->data.rx_fifo->shr->client_session_index = session_index; - session->data.tx_fifo = s->tx_fifo; - session->data.tx_fifo->shr->client_session_index = session_index; - session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index]; - session->vpp_session_handle = session_handle (s); + es = ec_session_alloc (wrk); + + es->bytes_to_send = ecm->bytes_to_send; + es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send; + es->data.rx_fifo = s->rx_fifo; + es->data.rx_fifo->shr->client_session_index = es->data.session_index; + es->data.tx_fifo = s->tx_fifo; + es->data.tx_fifo->shr->client_session_index = es->data.session_index; + es->data.vpp_evt_q = wrk->vpp_event_queue; + es->vpp_session_handle = session_handle (s); + es->vpp_session_index = s->session_index; + s->opaque = es->data.session_index; if (ecm->is_dgram) { transport_connection_t *tc; tc = session_get_transport (s); - clib_memcpy_fast (&session->data.transport, tc, - sizeof (session->data.transport)); - session->data.is_dgram = 1; + clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport)); + es->data.is_dgram = 1; } - vec_add1 (ecm->connection_index_by_thread[thread_index], session_index); + vec_add1 (wrk->conn_indices, es->data.session_index); clib_atomic_fetch_add (&ecm->ready_connections, 1); if (ecm->ready_connections == ecm->expected_connections) { - ecm->run_test = ECHO_CLIENTS_RUNNING; + ecm->run_test = EC_RUNNING; /* Signal the CLI process that the action is starting... */ - signal_evt_to_cli (1); + signal_evt_to_cli (EC_CLI_CONNECTS_DONE); } return 0; } static void -echo_clients_session_reset_callback (session_t * s) +ec_session_reset_callback (session_t *s) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_disconnect_args_t _a = { 0 }, *a = &_a; if (s->session_state == SESSION_STATE_READY) @@ -641,15 +657,15 @@ echo_clients_session_reset_callback (session_t * s) } static int -echo_clients_session_create_callback (session_t * s) +ec_session_accept_callback (session_t *s) { return 0; } static void -echo_clients_session_disconnect_callback (session_t * s) +ec_session_disconnect_callback (session_t *s) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_disconnect_args_t _a = { 0 }, *a = &_a; a->handle = session_handle (s); a->app_index = ecm->app_index; @@ -658,9 +674,9 @@ echo_clients_session_disconnect_callback (session_t * s) } void -echo_clients_session_disconnect (session_t * s) +ec_session_disconnect (session_t *s) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_disconnect_args_t _a = { 0 }, *a = &_a; a->handle = session_handle (s); a->app_index = ecm->app_index; @@ -668,20 +684,22 @@ echo_clients_session_disconnect (session_t * s) } static int -echo_clients_rx_callback (session_t * s) +ec_session_rx_callback (session_t *s) { - echo_client_main_t *ecm = &echo_client_main; - eclient_session_t *sp; + ec_main_t *ecm = &ec_main; + ec_worker_t *wrk; + ec_session_t *es; - if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_RUNNING)) + if (PREDICT_FALSE (ecm->run_test != EC_RUNNING)) { - echo_clients_session_disconnect (s); + ec_session_disconnect (s); return -1; } - sp = - pool_elt_at_index (ecm->sessions, s->rx_fifo->shr->client_session_index); - receive_data_chunk (ecm, sp); + wrk = ec_worker_get (s->thread_index); + es = ec_session_get (wrk, s->opaque); + + receive_data_chunk (wrk, es); if (svm_fifo_max_dequeue_cons (s->rx_fifo)) { @@ -691,27 +709,34 @@ echo_clients_rx_callback (session_t * s) return 0; } -int -echo_client_add_segment_callback (u32 client_index, u64 segment_handle) +static int +ec_add_segment_callback (u32 app_index, u64 segment_handle) +{ + /* New segments may be added */ + return 0; +} + +static int +ec_del_segment_callback (u32 app_index, u64 segment_handle) { - /* New heaps may be added */ return 0; } -static session_cb_vft_t echo_clients = { - .session_reset_callback = echo_clients_session_reset_callback, - .session_connected_callback = echo_clients_session_connected_callback, - .session_accept_callback = echo_clients_session_create_callback, - .session_disconnect_callback = echo_clients_session_disconnect_callback, - .builtin_app_rx_callback = echo_clients_rx_callback, - .add_segment_callback = echo_client_add_segment_callback +static session_cb_vft_t ec_cb_vft = { + .session_reset_callback = ec_session_reset_callback, + .session_connected_callback = ec_session_connected_callback, + .session_accept_callback = ec_session_accept_callback, + .session_disconnect_callback = ec_session_disconnect_callback, + .builtin_app_rx_callback = ec_session_rx_callback, + .add_segment_callback = ec_add_segment_callback, + .del_segment_callback = ec_del_segment_callback, }; static clib_error_t * -echo_clients_attach () +ec_attach () { vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair; - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_app_attach_args_t _a, *a = &_a; u32 prealloc_fifos; u64 options[18]; @@ -723,9 +748,8 @@ echo_clients_attach () a->api_client_index = ~0; a->name = format (0, "echo_client"); if (ecm->transport_proto == TRANSPORT_PROTO_QUIC) - echo_clients.session_connected_callback = - quic_echo_clients_session_connected_callback; - a->session_cb_vft = &echo_clients; + ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback; + a->session_cb_vft = &ec_cb_vft; prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1; @@ -767,9 +791,9 @@ echo_clients_attach () } static int -echo_clients_detach () +ec_detach () { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_app_detach_args_t _da, *da = &_da; int rv; @@ -786,30 +810,6 @@ echo_clients_detach () return rv; } -static void * -echo_client_thread_fn (void *arg) -{ - return 0; -} - -/** Start a transmit thread */ -int -echo_clients_start_tx_pthread (echo_client_main_t * ecm) -{ - if (ecm->client_thread_handle == 0) - { - int rv = pthread_create (&ecm->client_thread_handle, - NULL /*attr */ , - echo_client_thread_fn, 0); - if (rv) - { - ecm->client_thread_handle = 0; - return -1; - } - } - return 0; -} - static int ec_transport_needs_crypto (transport_proto_t proto) { @@ -817,10 +817,10 @@ ec_transport_needs_crypto (transport_proto_t proto) proto == TRANSPORT_PROTO_QUIC; } -int -echo_clients_connect_rpc (void *args) +static int +ec_connect_rpc (void *args) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; vnet_connect_args_t _a = {}, *a = &_a; vlib_main_t *vm = vlib_get_main (); int rv, needs_crypto; @@ -860,7 +860,8 @@ echo_clients_connect_rpc (void *args) if (rv) { clib_warning ("connect returned: %U", format_session_error, rv); - signal_evt_to_cli (2); + ecm->run_test = EC_EXITING; + signal_evt_to_cli (EC_CLI_CONNECTS_FAILED); break; } @@ -870,15 +871,15 @@ echo_clients_connect_rpc (void *args) vlib_worker_thread_barrier_release (vm); if (ci < ecm->expected_connections) - echo_clients_program_connects (); + ec_program_connects (); return 0; } void -echo_clients_program_connects (void) +ec_program_connects (void) { - session_send_rpc_evt_to_thread_force (0, echo_clients_connect_rpc, 0); + session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0); } #define ec_cli(_fmt, _args...) \ @@ -886,12 +887,12 @@ echo_clients_program_connects (void) vlib_cli_output (vm, _fmt, ##_args) static clib_error_t * -echo_clients_command_fn (vlib_main_t *vm, unformat_input_t *input, - vlib_cli_command_t *cmd) +ec_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) { unformat_input_t _line_input, *line_input = &_line_input; char *default_uri = "tcp://6.0.1.1/1234", *transfer_type; - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; uword *event_data = 0, event_type; clib_error_t *error = 0; int rv, had_config = 1; @@ -901,7 +902,7 @@ echo_clients_command_fn (vlib_main_t *vm, unformat_input_t *input, if (ecm->test_client_attached) return clib_error_return (0, "failed: already running!"); - if (echo_clients_init (vm)) + if (ec_init (vm)) { error = clib_error_return (0, "failed init"); goto cleanup; @@ -993,9 +994,9 @@ parse_config: ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP); if (ecm->prealloc_sessions) - pool_init_fixed (ecm->sessions, 1.1 * ecm->n_clients); + ec_prealloc_sessions (ecm); - if ((error = echo_clients_attach ())) + if ((error = ec_attach ())) { clib_error_report (error); goto cleanup; @@ -1006,7 +1007,7 @@ parse_config: */ ecm->syn_start_time = vlib_time_now (vm); - echo_clients_program_connects (); + ec_program_connects (); /* * Park until the sessions come up, or syn_timeout seconds pass @@ -1023,16 +1024,17 @@ parse_config: ecm->ready_connections); goto cleanup; - case 1: + case EC_CLI_CONNECTS_DONE: delta = vlib_time_now (vm) - ecm->syn_start_time; if (delta != 0.0) ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s", ecm->n_clients, delta, ((f64) ecm->n_clients) / delta); break; - case 2: + case EC_CLI_CONNECTS_FAILED: error = clib_error_return (0, "failed: connect returned"); goto cleanup; + default: ec_cli ("unexpected event(1): %d", event_type); error = clib_error_return (0, "failed: unexpected event(1): %d", @@ -1056,7 +1058,7 @@ parse_config: ecm->ready_connections); goto cleanup; - case 2: + case EC_CLI_TEST_DONE: ecm->test_end_time = vlib_time_now (vm); ec_cli ("Test finished at %.6f", ecm->test_end_time); break; @@ -1096,17 +1098,17 @@ cleanup: /* * Cleanup */ - ecm->run_test = ECHO_CLIENTS_EXITING; + ecm->run_test = EC_EXITING; vlib_process_wait_for_event_or_clock (vm, 10e-3); /* Detach the application, so we can use different fifo sizes next time */ - if (echo_clients_detach ()) + if (ec_detach ()) { error = clib_error_return (0, "failed: app detach"); ec_cli ("WARNING: app detach failed..."); } - echo_clients_cleanup (ecm); + ec_cleanup (ecm); if (had_config) unformat_free (line_input); @@ -1116,27 +1118,27 @@ cleanup: return error; } -VLIB_CLI_COMMAND (echo_clients_command, static) = -{ +VLIB_CLI_COMMAND (ec_command, static) = { .path = "test echo clients", - .short_help = "test echo clients [nclients %d][[m|g]bytes <bytes>]" - "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]" - "[private-segment-count <count>][private-segment-size <bytes>[m|g]]" - "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]" - "[uri <tcp://ip/port>][test-bytes][no-output]", - .function = echo_clients_command_fn, + .short_help = + "test echo clients [nclients %d][[m|g]bytes <bytes>]" + "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]" + "[private-segment-count <count>][private-segment-size <bytes>[m|g]]" + "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]" + "[uri <tcp://ip/port>][test-bytes][no-output]", + .function = ec_command_fn, .is_mp_safe = 1, }; clib_error_t * -echo_clients_main_init (vlib_main_t * vm) +ec_main_init (vlib_main_t *vm) { - echo_client_main_t *ecm = &echo_client_main; + ec_main_t *ecm = &ec_main; ecm->app_is_init = 0; return 0; } -VLIB_INIT_FUNCTION (echo_clients_main_init); +VLIB_INIT_FUNCTION (ec_main_init); /* * fd.io coding-style-patch-verification: ON diff --git a/src/plugins/hs_apps/echo_client.h b/src/plugins/hs_apps/echo_client.h index a0e844d7e6d..b5f6c981f7b 100644 --- a/src/plugins/hs_apps/echo_client.h +++ b/src/plugins/hs_apps/echo_client.h @@ -18,42 +18,39 @@ #ifndef __included_echo_client_h__ #define __included_echo_client_h__ -#include <vnet/vnet.h> -#include <vnet/ip/ip.h> -#include <vnet/ethernet/ethernet.h> - -#include <vppinfra/hash.h> -#include <vppinfra/error.h> #include <vnet/session/session.h> #include <vnet/session/application_interface.h> -typedef struct +typedef struct ec_session_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); app_session_t data; + u32 vpp_session_index; + u32 thread_index; u64 bytes_to_send; u64 bytes_sent; u64 bytes_to_receive; u64 bytes_received; u64 vpp_session_handle; - u8 thread_index; -} eclient_session_t; +} ec_session_t; + +typedef struct ec_worker_ +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + ec_session_t *sessions; /**< session pool */ + u8 *rx_buf; /**< prealloced rx buffer */ + u32 *conn_indices; /**< sessions handled by worker */ + u32 *conns_this_batch; /**< sessions handled in batch */ + svm_msg_q_t *vpp_event_queue; /**< session layer worker mq */ + u32 thread_index; /**< thread index for worker */ +} ec_worker_t; typedef struct { - /* - * Test state variables - */ - eclient_session_t *sessions; /**< Session pool, shared */ - clib_spinlock_t sessions_lock; /**< Session pool lock */ - u8 **rx_buf; /**< intermediate rx buffers */ + ec_worker_t *wrk; /**< Per-thread state */ u8 *connect_test_data; /**< Pre-computed test data */ - u32 **quic_session_index_by_thread; - u32 **connection_index_by_thread; - u32 **connections_this_batch_by_thread; /**< active connection batch */ volatile u32 ready_connections; - volatile u32 finished_connections; volatile u64 rx_total; volatile u64 tx_total; volatile int run_test; /**< Signal start of test */ @@ -64,16 +61,14 @@ typedef struct u32 prev_conns; u32 repeats; - u32 connect_conn_index; /**< Conencts attempted progress */ + u32 connect_conn_index; /**< Connects attempted progress */ /* * Application setup parameters */ - svm_msg_q_t **vpp_event_queue; u32 cli_node_index; /**< cli process node index */ u32 app_index; /**< app index after attach */ - pthread_t client_thread_handle; /* * Configuration params @@ -116,19 +111,23 @@ typedef struct u8 barrier_acq_needed; vlib_main_t *vlib_main; -} echo_client_main_t; +} ec_main_t; -enum +typedef enum ec_state_ { - ECHO_CLIENTS_STARTING, - ECHO_CLIENTS_RUNNING, - ECHO_CLIENTS_EXITING -} echo_clients_test_state_e; + EC_STARTING, + EC_RUNNING, + EC_EXITING +} ec_state_t; -extern echo_client_main_t echo_client_main; -vlib_node_registration_t echo_clients_node; +typedef enum ec_cli_signal_ +{ + EC_CLI_CONNECTS_DONE = 1, + EC_CLI_CONNECTS_FAILED, + EC_CLI_TEST_DONE +} ec_cli_signal_t; -void echo_clients_program_connects (void); +void ec_program_connects (void); #endif /* __included_echo_client_h__ */ |