From a0d5433a6c25b90e7155948fbafbe138222efdd1 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Thu, 14 Apr 2022 18:19:42 -0700 Subject: hsa: vcl test client epoll worker loop Supports more connections and track connect time. Can be used to measure CPS. Only works in unidirectional mode for now. Type: improvement Signed-off-by: Florin Coras Change-Id: I70bc6a271996407dd16a96115f509bd680a0f302 --- src/plugins/hs_apps/vcl/vcl_test.h | 4 +- src/plugins/hs_apps/vcl/vcl_test_client.c | 307 ++++++++++++++++++++++++++++-- src/plugins/hs_apps/vcl/vcl_test_server.c | 5 +- 3 files changed, 301 insertions(+), 15 deletions(-) (limited to 'src/plugins/hs_apps') diff --git a/src/plugins/hs_apps/vcl/vcl_test.h b/src/plugins/hs_apps/vcl/vcl_test.h index ac949797cdb..e2a19e1a1fc 100644 --- a/src/plugins/hs_apps/vcl/vcl_test.h +++ b/src/plugins/hs_apps/vcl/vcl_test.h @@ -65,7 +65,8 @@ #define VCL_TEST_CFG_TXBUF_SIZE_DEF 8192 #define VCL_TEST_CFG_RXBUF_SIZE_DEF (64*VCL_TEST_CFG_TXBUF_SIZE_DEF) #define VCL_TEST_CFG_BUF_SIZE_MIN 128 -#define VCL_TEST_CFG_MAX_TEST_SESS 512 +#define VCL_TEST_CFG_MAX_TEST_SESS ((uint32_t) 1e6) +#define VCL_TEST_CFG_MAX_SELECT_SESS 512 #define VCL_TEST_CFG_MAX_EPOLL_EVENTS 16 #define VCL_TEST_CTRL_LISTENER (~0 - 1) @@ -140,6 +141,7 @@ typedef struct vcl_test_session vcl_test_stats_t stats; vcl_test_stats_t old_stats; int session_index; + struct vcl_test_session *next; vppcom_endpt_t endpt; uint8_t ip[16]; vppcom_data_segment_t ds[2]; diff --git a/src/plugins/hs_apps/vcl/vcl_test_client.c b/src/plugins/hs_apps/vcl/vcl_test_client.c index 1b149b2d9bc..427dd82ae85 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_client.c +++ b/src/plugins/hs_apps/vcl/vcl_test_client.c @@ -35,9 +35,21 @@ struct vtc_worker_ vcl_test_session_t *qsessions; uint32_t n_sessions; uint32_t wrk_index; - fd_set wr_fdset; - fd_set rd_fdset; - int max_fd_index; + union + { + struct + { + fd_set wr_fdset; + fd_set rd_fdset; + int max_fd_index; + }; + struct + { + uint32_t epoll_sh; + struct epoll_event ep_evts[VCL_TEST_CFG_MAX_EPOLL_EVENTS]; + vcl_test_session_t *next_to_send; + }; + }; pthread_t thread_handle; vtc_worker_run_fn *wrk_run_fn; vcl_test_cfg_t cfg; @@ -296,6 +308,19 @@ vtc_worker_start_transfer (vcl_test_client_worker_t *wrk) } } +static int +vtc_session_check_is_done (vcl_test_session_t *ts, uint8_t check_rx) +{ + if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) || + (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes)) + { + clock_gettime (CLOCK_REALTIME, &ts->stats.stop); + ts->is_done = 1; + return 1; + } + return 0; +} + static int vtc_worker_connect_sessions_select (vcl_test_client_worker_t *wrk) { @@ -374,16 +399,16 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk) if (ts->is_done) continue; - if (FD_ISSET (vppcom_session_index (ts->fd), rfdset) - && ts->stats.rx_bytes < ts->cfg.total_bytes) + if (FD_ISSET (vppcom_session_index (ts->fd), rfdset) && + ts->stats.rx_bytes < ts->cfg.total_bytes) { rv = ts->read (ts, ts->rxbuf, ts->rxbuf_size); if (rv < 0) break; } - if (FD_ISSET (vppcom_session_index (ts->fd), wfdset) - && ts->stats.tx_bytes < ts->cfg.total_bytes) + if (FD_ISSET (vppcom_session_index (ts->fd), wfdset) && + ts->stats.tx_bytes < ts->cfg.total_bytes) { rv = ts->write (ts, ts->txbuf, ts->cfg.txbuf_size); if (rv < 0) @@ -395,8 +420,8 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk) if (vcm->incremental_stats) vtc_inc_stats_check (ts); } - if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) - || (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes)) + if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) || + (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes)) { clock_gettime (CLOCK_REALTIME, &ts->stats.stop); ts->is_done = 1; @@ -408,6 +433,260 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk) return 0; } +static void +vtc_worker_epoll_send_add (vcl_test_client_worker_t *wrk, + vcl_test_session_t *ts) +{ + if (!wrk->next_to_send) + { + wrk->next_to_send = ts; + } + else + { + ts->next = wrk->next_to_send; + wrk->next_to_send = ts->next; + } +} + +static void +vtc_worker_epoll_send_del (vcl_test_client_worker_t *wrk, + vcl_test_session_t *ts, vcl_test_session_t *prev) +{ + if (!prev) + { + wrk->next_to_send = ts->next; + } + else + { + prev->next = ts->next; + } +} + +static int +vtc_worker_connect_sessions_epoll (vcl_test_client_worker_t *wrk) +{ + vcl_test_client_main_t *vcm = &vcl_client_main; + vcl_test_main_t *vt = &vcl_test_main; + const vcl_test_proto_vft_t *tp; + struct timespec start, end; + uint32_t n_connected = 0; + vcl_test_session_t *ts; + struct epoll_event ev; + int i, ci = 0, rv, n_ev; + double diff; + + tp = vt->protos[vcm->proto]; + wrk->epoll_sh = vppcom_epoll_create (); + + ev.events = EPOLLET | EPOLLOUT; + + clock_gettime (CLOCK_REALTIME, &start); + + while (n_connected < wrk->cfg.num_test_sessions) + { + /* + * Try to connect more sessions if under pending threshold + */ + while ((ci - n_connected) < 16 && ci < wrk->cfg.num_test_sessions) + { + ts = &wrk->sessions[ci]; + ts->noblk_connect = 1; + rv = tp->open (&wrk->sessions[ci], &vcm->server_endpt); + if (rv < 0) + { + vtwrn ("open: %d", rv); + return rv; + } + + ev.data.u64 = ci; + rv = vppcom_epoll_ctl (wrk->epoll_sh, EPOLL_CTL_ADD, ts->fd, &ev); + if (rv < 0) + { + vtwrn ("vppcom_epoll_ctl: %d", rv); + return rv; + } + ci += 1; + } + + /* + * Handle connected events + */ + n_ev = + vppcom_epoll_wait (wrk->epoll_sh, wrk->ep_evts, + VCL_TEST_CFG_MAX_EPOLL_EVENTS, 0 /* timeout */); + if (n_ev < 0) + { + vterr ("vppcom_epoll_wait() returned", n_ev); + return -1; + } + else if (n_ev == 0) + { + continue; + } + + for (i = 0; i < n_ev; i++) + { + ts = &wrk->sessions[wrk->ep_evts[i].data.u32]; + if (!(wrk->ep_evts[i].events & EPOLLOUT)) + { + vtwrn ("connect failed"); + return -1; + } + if (ts->is_open) + { + vtwrn ("connection already open?"); + return -1; + } + ts->is_open = 1; + n_connected += 1; + } + } + + clock_gettime (CLOCK_REALTIME, &end); + + diff = vcl_test_time_diff (&start, &end); + vtinf ("Connected (%u) connected in %.2f seconds (%u CPS)!", + wrk->cfg.num_test_sessions, diff, + (uint32_t) ((double) wrk->cfg.num_test_sessions / diff)); + + ev.events = EPOLLET | EPOLLIN | EPOLLOUT; + + for (i = 0; i < wrk->cfg.num_test_sessions; i++) + { + ts = &wrk->sessions[i]; + + /* No data to be sent */ + if (ts->cfg.total_bytes == 0) + { + n_connected -= 1; + clock_gettime (CLOCK_REALTIME, &ts->stats.stop); + ts->is_done = 1; + continue; + } + + ev.data.u64 = i; + rv = vppcom_epoll_ctl (wrk->epoll_sh, EPOLL_CTL_MOD, ts->fd, &ev); + if (rv < 0) + { + vtwrn ("vppcom_epoll_ctl: %d", rv); + return rv; + } + vtc_worker_epoll_send_add (wrk, ts); + } + + return n_connected; +} + +static int +vtc_worker_run_epoll (vcl_test_client_worker_t *wrk) +{ + vcl_test_client_main_t *vcm = &vcl_client_main; + uint32_t n_active_sessions, max_writes = 16, n_writes = 0; + vcl_test_session_t *ts, *prev = 0; + int i, rv, check_rx = 0, n_ev; + + rv = vtc_worker_connect_sessions_epoll (wrk); + if (rv < 0) + { + vterr ("vtc_worker_connect_sessions()", rv); + return rv; + } + + n_active_sessions = rv; + check_rx = wrk->cfg.test != VCL_TEST_TYPE_UNI; + + vtc_worker_start_transfer (wrk); + ts = wrk->next_to_send; + + while (n_active_sessions && vcm->test_running) + { + /* + * Try to write + */ + if (!ts) + { + ts = wrk->next_to_send; + if (!ts) + goto get_epoll_evts; + } + + rv = ts->write (ts, ts->txbuf, ts->cfg.txbuf_size); + if (rv > 0) + { + if (vcm->incremental_stats) + vtc_inc_stats_check (ts); + if (vtc_session_check_is_done (ts, check_rx)) + n_active_sessions -= 1; + } + else if (rv == 0) + { + vtc_worker_epoll_send_del (wrk, ts, prev); + } + else + { + vtwrn ("vppcom_test_write (%d) failed -- aborting test", ts->fd); + return -1; + } + prev = ts; + ts = ts->next; + n_writes += 1; + + if (rv > 0 && n_writes < max_writes) + continue; + + get_epoll_evts: + + /* + * Grab new events + */ + + n_ev = + vppcom_epoll_wait (wrk->epoll_sh, wrk->ep_evts, + VCL_TEST_CFG_MAX_EPOLL_EVENTS, 0 /* timeout */); + if (n_ev < 0) + { + vterr ("vppcom_epoll_wait()", n_ev); + break; + } + else if (n_ev == 0) + { + continue; + } + + for (i = 0; i < n_ev; i++) + { + ts = &wrk->sessions[wrk->ep_evts[i].data.u32]; + + if (ts->is_done) + continue; + + if (wrk->ep_evts[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) + { + vtinf ("%u finished before reading all data?", ts->fd); + break; + } + if ((wrk->ep_evts[i].events & EPOLLIN) && + ts->stats.rx_bytes < ts->cfg.total_bytes) + { + rv = ts->read (ts, ts->rxbuf, ts->rxbuf_size); + if (rv < 0) + break; + if (vtc_session_check_is_done (ts, check_rx)) + n_active_sessions -= 1; + } + if ((wrk->ep_evts[i].events & EPOLLOUT) && + ts->stats.tx_bytes < ts->cfg.total_bytes) + { + vtc_worker_epoll_send_add (wrk, ts); + } + } + + n_writes = 0; + } + + return 0; +} + static inline int vtc_worker_run (vcl_test_client_worker_t *wrk) { @@ -1078,14 +1357,18 @@ static void vtc_alloc_workers (vcl_test_client_main_t *vcm) { vcl_test_main_t *vt = &vcl_test_main; + vtc_worker_run_fn *run_fn; vcm->workers = calloc (vcm->n_workers, sizeof (vcl_test_client_worker_t)); vt->wrk = calloc (vcm->n_workers, sizeof (vcl_test_wrk_t)); + if (vcm->ctrl_session.cfg.num_test_sessions > VCL_TEST_CFG_MAX_SELECT_SESS) + run_fn = vtc_worker_run_epoll; + else + run_fn = vtc_worker_run_select; + for (int i = 0; i < vcm->n_workers; i++) - { - vcm->workers[i].wrk_run_fn = vtc_worker_run_select; - } + vcm->workers[i].wrk_run_fn = run_fn; } int diff --git a/src/plugins/hs_apps/vcl/vcl_test_server.c b/src/plugins/hs_apps/vcl/vcl_test_server.c index d1700d42d05..f5c81ce22b3 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_server.c +++ b/src/plugins/hs_apps/vcl/vcl_test_server.c @@ -373,8 +373,9 @@ vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd) if (tp->accept (listen_fd, conn)) return 0; - vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)", - conn->fd, conn->fd, listen_fd, listen_fd); + if (conn->cfg.num_test_sessions < VCL_TEST_CFG_MAX_SELECT_SESS) + vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)", + conn->fd, conn->fd, listen_fd, listen_fd); ev.events = EPOLLET | EPOLLIN; ev.data.u64 = conn - wrk->conn_pool; -- cgit 1.2.3-korg