diff options
author | Florin Coras <fcoras@cisco.com> | 2021-04-23 14:01:01 -0700 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2021-04-26 15:01:11 +0000 |
commit | 7992bdcecea6935b630203b5a73c4df641d46b5d (patch) | |
tree | b48d260d728a0781369e7113cda34d801961b2c1 /src/plugins/hs_apps/vcl/vcl_test_server.c | |
parent | 856d062ce67f93d83f1ff302a394e8a4448ad017 (diff) |
hsa: use tcp for vcl test control channel
Also, only exchange config over control session.
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I001df635896762bc5330cebb7d5744e3e754482d
Diffstat (limited to 'src/plugins/hs_apps/vcl/vcl_test_server.c')
-rw-r--r-- | src/plugins/hs_apps/vcl/vcl_test_server.c | 318 |
1 files changed, 201 insertions, 117 deletions
diff --git a/src/plugins/hs_apps/vcl/vcl_test_server.c b/src/plugins/hs_apps/vcl/vcl_test_server.c index b4966bf168b..5b02fc08e8e 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_server.c +++ b/src/plugins/hs_apps/vcl/vcl_test_server.c @@ -31,6 +31,7 @@ typedef struct { uint8_t is_alloc; + uint8_t is_open; int fd; uint8_t *buf; uint32_t buf_size; @@ -67,6 +68,8 @@ typedef struct vcl_test_server_cfg_t cfg; vcl_test_server_worker_t *workers; + vcl_test_server_conn_t *ctrl; + int ctrl_listen_fd; struct sockaddr_storage servaddr; volatile int worker_fails; volatile int active_workers; @@ -157,33 +160,81 @@ sync_config_and_reply (vcl_test_server_conn_t * conn, vcl_test_cfg_t * rx_cfg) } static void -vts_server_start_stop (vcl_test_server_worker_t * wrk, - vcl_test_server_conn_t * conn, vcl_test_cfg_t * rx_cfg) +vts_session_close (vcl_test_server_conn_t *conn) +{ + if (!conn->is_open) + return; + vppcom_session_close (conn->fd); + conn->is_open = 0; +} + +static void +vts_session_cleanup (vcl_test_server_conn_t *conn) +{ + vts_session_close (conn); + conn_pool_free (conn); +} + +static void +vts_wrk_cleanup_all (vcl_test_server_worker_t *wrk) +{ + vcl_test_server_conn_t *conn; + int i; + + for (i = 0; i < wrk->conn_pool_size; i++) + { + conn = &wrk->conn_pool[i]; + vts_session_cleanup (conn); + } + + wrk->nfds = 0; +} + +static void +vts_test_cmd (vcl_test_server_worker_t *wrk, vcl_test_server_conn_t *conn, + vcl_test_cfg_t *rx_cfg) { u8 is_bi = rx_cfg->test == VCL_TEST_TYPE_BI; vcl_test_server_conn_t *tc; char buf[64]; int i; - if (rx_cfg->ctrl_handle == conn->fd) + if (rx_cfg->cmd == VCL_TEST_CMD_STOP) { + struct timespec stop; + clock_gettime (CLOCK_REALTIME, &stop); + + /* Test session are not closed, e.g., connection-less or errors */ + if (wrk->nfds > 1) + { + vtinf ("%u sessions are still open", wrk->nfds - 1); + stop.tv_sec -= VCL_TEST_DELAY_DISCONNECT; + conn->stats.stop = stop; + } + + /* Accumulate stats over all of the worker's sessions */ for (i = 0; i < wrk->conn_pool_size; i++) { tc = &wrk->conn_pool[i]; - if (tc->cfg.ctrl_handle != conn->fd) + if (tc == conn) continue; vcl_test_stats_accumulate (&conn->stats, &tc->stats); - if (vcl_comp_tspec (&conn->stats.stop, &tc->stats.stop) < 0) - conn->stats.stop = tc->stats.stop; - /* Client delays sending of disconnect */ - conn->stats.stop.tv_sec -= VCL_TEST_DELAY_DISCONNECT; - if (conn->cfg.verbose) + if (tc->is_open) { - snprintf (buf, sizeof (buf), "SERVER (fd %d) RESULTS", tc->fd); - vcl_test_stats_dump (buf, &tc->stats, 1 /* show_rx */ , - is_bi /* show tx */ , conn->cfg.verbose); + vts_session_close (tc); + continue; } + /* Only relevant if all connections previously closed */ + if (vcl_comp_tspec (&conn->stats.stop, &tc->stats.stop) < 0) + conn->stats.stop = tc->stats.stop; + } + + if (conn->cfg.verbose) + { + snprintf (buf, sizeof (buf), "SERVER (fd %d) RESULTS", conn->fd); + vcl_test_stats_dump (buf, &conn->stats, 1 /* show_rx */, + is_bi /* show tx */, conn->cfg.verbose); } vcl_test_stats_dump ("SERVER RESULTS", &conn->stats, 1 /* show_rx */ , @@ -202,19 +253,17 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, sync_config_and_reply (conn, rx_cfg); memset (&conn->stats, 0, sizeof (conn->stats)); } - else + else if (rx_cfg->cmd == VCL_TEST_CMD_SYNC) { - if (rx_cfg->ctrl_handle == ~0) - { - rx_cfg->ctrl_handle = conn->fd; - vtinf ("Set control fd %d for test!", conn->fd); - } - else - { - vtinf ("Starting %s-directional Stream Test (fd %d)!", - is_bi ? "Bi" : "Uni", conn->fd); - } - + rx_cfg->ctrl_handle = conn->fd; + vtinf ("Set control fd %d for test!", conn->fd); + sync_config_and_reply (conn, rx_cfg); + } + else if (rx_cfg->cmd == VCL_TEST_CMD_START) + { + vtinf ("Starting %s-directional Stream Test (fd %d)!", + is_bi ? "Bi" : "Uni", conn->fd); + rx_cfg->ctrl_handle = conn->fd; sync_config_and_reply (conn, rx_cfg); /* read the 1st chunk, record start time */ @@ -224,7 +273,7 @@ vts_server_start_stop (vcl_test_server_worker_t * wrk, } static inline void -vts_server_rx (vcl_test_server_conn_t * conn, int rx_bytes) +vts_server_process_rx (vcl_test_server_conn_t *conn, int rx_bytes) { vcl_test_server_main_t *vsm = &vcl_server_main; int client_fd = conn->fd; @@ -290,8 +339,8 @@ vts_server_echo (vcl_test_server_conn_t * conn, int rx_bytes) vtinf ("(fd %d): TX (%d bytes) - '%s'", conn->fd, tx_bytes, conn->buf); } -static void -vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) +static vcl_test_server_conn_t * +vts_accept_client (vcl_test_server_worker_t *wrk, int listen_fd) { vcl_test_server_conn_t *conn; struct epoll_event ev; @@ -301,16 +350,17 @@ vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) if (!conn) { vtwrn ("No free connections!"); - return; + return 0; } client_fd = vppcom_session_accept (listen_fd, &conn->endpt, 0); if (client_fd < 0) { vterr ("vppcom_session_accept()", client_fd); - return; + return 0; } conn->fd = client_fd; + conn->is_open = 1; vtinf ("Got a connection -- fd = %d (0x%08x) on listener fd = %d (0x%08x)", client_fd, client_fd, listen_fd, listen_fd); @@ -321,9 +371,11 @@ vts_new_client (vcl_test_server_worker_t * wrk, int listen_fd) if (rv < 0) { vterr ("vppcom_epoll_ctl()", rv); - return; + return 0; } wrk->nfds++; + + return conn; } static void @@ -467,8 +519,8 @@ vts_clean_connected_listeners (vcl_test_server_worker_t * wrk, } int -vts_handle_cfg (vcl_test_server_worker_t * wrk, vcl_test_cfg_t * rx_cfg, - vcl_test_server_conn_t * conn, int rx_bytes) +vts_handle_ctrl_cfg (vcl_test_server_worker_t *wrk, vcl_test_cfg_t *rx_cfg, + vcl_test_server_conn_t *conn, int rx_bytes) { int listener_fd; if (rx_cfg->verbose) @@ -502,17 +554,17 @@ vts_handle_cfg (vcl_test_server_worker_t * wrk, vcl_test_cfg_t * rx_cfg, case VCL_TEST_TYPE_BI: case VCL_TEST_TYPE_UNI: - vts_server_start_stop (wrk, conn, rx_cfg); + vts_test_cmd (wrk, conn, rx_cfg); break; case VCL_TEST_TYPE_EXIT: - vtinf ("Session fd %d closing!", conn->fd); - clock_gettime (CLOCK_REALTIME, &conn->stats.stop); + vtinf ("Ctrl session fd %d closing!", conn->fd); listener_fd = vppcom_session_listener (conn->fd); - vppcom_session_close (conn->fd); - conn_pool_free (conn); - wrk->nfds--; vts_clean_connected_listeners (wrk, listener_fd); + vts_session_cleanup (conn); + wrk->nfds--; + if (wrk->nfds) + vts_wrk_cleanup_all (wrk); break; default: @@ -583,12 +635,16 @@ vts_worker_init (vcl_test_server_worker_t * wrk) vtfail ("vppcom_session_listen()", rv); } - wrk->epfd = vppcom_epoll_create (); - if (wrk->epfd < 0) - vtfail ("vppcom_epoll_create()", wrk->epfd); + /* First worker already has epoll fd */ + if (wrk->wrk_index) + { + wrk->epfd = vppcom_epoll_create (); + if (wrk->epfd < 0) + vtfail ("vppcom_epoll_create()", wrk->epfd); + } listen_ev.events = EPOLLIN; - listen_ev.data.u32 = ~0; + listen_ev.data.u32 = VCL_TEST_DATA_LISTENER; rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, wrk->listen_fd, &listen_ev); if (rv < 0) @@ -598,30 +654,6 @@ vts_worker_init (vcl_test_server_worker_t * wrk) vtinf ("Waiting for a client to connect on port %d ...", vsm->cfg.port); } -static int -vts_conn_expect_config (vcl_test_server_conn_t * conn) -{ - if (conn->cfg.test == VCL_TEST_TYPE_ECHO) - return 1; - - return (conn->stats.rx_bytes < 128 - || conn->stats.rx_bytes > conn->cfg.total_bytes); -} - -static vcl_test_cfg_t * -vts_conn_read_config (vcl_test_server_conn_t * conn) -{ - vcl_test_server_main_t *vsm = &vcl_server_main; - - if (vsm->use_ds) - { - /* We could avoid the copy if the first segment is big enough but this - * just simplifies things */ - vts_copy_ds (conn->buf, conn->ds, sizeof (vcl_test_cfg_t)); - } - return (vcl_test_cfg_t *) conn->buf; -} - static inline int vts_conn_read (vcl_test_server_conn_t * conn) { @@ -632,17 +664,6 @@ vts_conn_read (vcl_test_server_conn_t * conn) return vcl_test_read (conn->fd, conn->buf, conn->buf_size, &conn->stats); } -static inline int -vts_conn_has_ascii (vcl_test_server_conn_t * conn) -{ - vcl_test_server_main_t *vsm = &vcl_server_main; - - if (vsm->use_ds) - return isascii (conn->ds[0].data[0]); - else - return isascii (conn->buf[0]); -} - static void * vts_worker_loop (void *arg) { @@ -672,13 +693,15 @@ vts_worker_loop (void *arg) for (i = 0; i < num_ev; i++) { conn = &wrk->conn_pool[wrk->wait_events[i].data.u32]; + /* + * Check for close events + */ if (wrk->wait_events[i].events & (EPOLLHUP | EPOLLRDHUP)) { - vtinf ("Closing session %d on HUP", conn->fd); listener_fd = vppcom_session_listener (conn->fd); - vppcom_session_close (conn->fd); - wrk->nfds--; vts_clean_connected_listeners (wrk, listener_fd); + vts_session_close (conn); + wrk->nfds--; if (!wrk->nfds) { vtinf ("All client connections closed\n"); @@ -686,17 +709,66 @@ vts_worker_loop (void *arg) } continue; } - if (wrk->wait_events[i].data.u32 == ~0) + + /* + * Check if new session needs to be accepted + */ + + if (wrk->wait_events[i].data.u32 == VCL_TEST_CTRL_LISTENER) + { + if (vsm->ctrl) + { + vtwrn ("ctrl already exists"); + continue; + } + vsm->ctrl = vts_accept_client (wrk, vsm->ctrl_listen_fd); + continue; + } + if (wrk->wait_events[i].data.u32 == VCL_TEST_DATA_LISTENER) { - vts_new_client (wrk, wrk->listen_fd); + conn = vts_accept_client (wrk, wrk->listen_fd); + conn->cfg = vsm->ctrl->cfg; continue; } else if (vppcom_session_is_connectable_listener (conn->fd)) { - vts_new_client (wrk, conn->fd); + vts_accept_client (wrk, conn->fd); + continue; + } + + /* + * Message on control session + */ + + if (!wrk->wrk_index && conn->fd == vsm->ctrl->fd) + { + rx_bytes = vcl_test_read (conn->fd, conn->buf, conn->buf_size, + &conn->stats); + rx_cfg = (vcl_test_cfg_t *) conn->buf; + if (rx_cfg->magic == VCL_TEST_CFG_CTRL_MAGIC) + { + vts_handle_ctrl_cfg (wrk, rx_cfg, conn, rx_bytes); + if (!wrk->nfds) + { + vtinf ("All client connections closed\n"); + goto done; + } + } + else if (isascii (conn->buf[0])) + { + vts_server_echo (conn, rx_bytes); + } + else + { + vtwrn ("FIFO not drained! extra bytes %d", rx_bytes); + } continue; } + /* + * Read perf test data + */ + if (EPOLLIN & wrk->wait_events[i].events) { read_again: @@ -712,40 +784,11 @@ vts_worker_loop (void *arg) else continue; } - - if (vts_conn_expect_config (conn)) - { - rx_cfg = vts_conn_read_config (conn); - if (rx_cfg->magic == VCL_TEST_CFG_CTRL_MAGIC) - { - if (vsm->use_ds) - vppcom_session_free_segments (conn->fd, rx_bytes); - vts_handle_cfg (wrk, rx_cfg, conn, rx_bytes); - if (!wrk->nfds) - { - vtinf ("All client connections closed\n"); - goto done; - } - continue; - } - } - if ((conn->cfg.test == VCL_TEST_TYPE_UNI) - || (conn->cfg.test == VCL_TEST_TYPE_BI)) - { - vts_server_rx (conn, rx_bytes); - if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0, - 0) > 0) - goto read_again; - continue; - } - if (vts_conn_has_ascii (conn)) - { - vts_server_echo (conn, rx_bytes); - } - else - { - vtwrn ("FIFO not drained! extra bytes %d", rx_bytes); - } + vts_server_process_rx (conn, rx_bytes); + if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0, 0) > + 0) + goto read_again; + continue; } else { @@ -766,6 +809,42 @@ done: return 0; } +static void +vts_ctrl_session_init (vcl_test_server_worker_t *wrk) +{ + vcl_test_server_main_t *vsm = &vcl_server_main; + struct epoll_event listen_ev; + int rv; + + vtinf ("Initializing main ctrl session ..."); + + vsm->ctrl_listen_fd = + vppcom_session_create (VPPCOM_PROTO_TCP, 0 /* is_nonblocking */); + if (vsm->ctrl_listen_fd < 0) + vtfail ("vppcom_session_create()", vsm->ctrl_listen_fd); + + rv = vppcom_session_bind (vsm->ctrl_listen_fd, &vsm->cfg.endpt); + if (rv < 0) + vtfail ("vppcom_session_bind()", rv); + + rv = vppcom_session_listen (vsm->ctrl_listen_fd, 10); + if (rv < 0) + vtfail ("vppcom_session_listen()", rv); + + wrk->epfd = vppcom_epoll_create (); + if (wrk->epfd < 0) + vtfail ("vppcom_epoll_create()", wrk->epfd); + + listen_ev.events = EPOLLIN; + listen_ev.data.u32 = VCL_TEST_CTRL_LISTENER; + rv = vppcom_epoll_ctl (wrk->epfd, EPOLL_CTL_ADD, vsm->ctrl_listen_fd, + &listen_ev); + if (rv < 0) + vtfail ("vppcom_epoll_ctl", rv); + + vtinf ("Waiting for a client to connect on port %d ...", vsm->cfg.port); +} + int main (int argc, char **argv) { @@ -783,6 +862,10 @@ main (int argc, char **argv) vtfail ("vppcom_app_create()", rv); vsm->workers = calloc (vsm->cfg.workers, sizeof (*vsm->workers)); + vts_ctrl_session_init (&vsm->workers[0]); + + /* Update ctrl port to data port */ + vsm->cfg.endpt.port += 1; vts_worker_init (&vsm->workers[0]); for (i = 1; i < vsm->cfg.workers; i++) { @@ -790,6 +873,7 @@ main (int argc, char **argv) rv = pthread_create (&vsm->workers[i].thread_handle, NULL, vts_worker_loop, (void *) &vsm->workers[i]); } + vts_worker_loop (&vsm->workers[0]); while (vsm->active_workers > 0) |