diff options
author | 2025-01-24 13:56:22 +0100 | |
---|---|---|
committer | 2025-02-11 20:14:07 +0000 | |
commit | c8174f3660d2577c3819b51d65a3f2836d8afe31 (patch) | |
tree | e7ee8aec9423fb1914b07c4680ee6d9d9f34231f | |
parent | 29b1ece1d07477676e467e1b0f51987d47bd2997 (diff) |
hsa: http client parallel sessions
- client is now able to use multiple workers to send requests
(sometimes it uses multiple sessions on a single worker)
Type: feature
Change-Id: I2d83d47a9768011b3d8d05ed320852606841e4b8
Signed-off-by: Adrian Villin <avillin@cisco.com>
-rw-r--r-- | extras/hs-test/http_test.go | 97 | ||||
-rw-r--r-- | extras/hs-test/infra/suite_no_topo.go | 37 | ||||
-rw-r--r-- | extras/hs-test/topo-containers/single.yaml | 8 | ||||
-rw-r--r-- | src/plugins/hs_apps/http_client.c | 448 |
4 files changed, 394 insertions, 196 deletions
diff --git a/extras/hs-test/http_test.go b/extras/hs-test/http_test.go index 68934550b69..b143e559244 100644 --- a/extras/hs-test/http_test.go +++ b/extras/hs-test/http_test.go @@ -7,7 +7,6 @@ import ( "math/rand" "net" "net/http" - "net/http/httptest" "net/http/httptrace" "os" "strconv" @@ -36,10 +35,10 @@ func init() { HttpClientErrRespTest, HttpClientPostFormTest, HttpClientGet128kbResponseTest, HttpClientGetResponseBodyTest, HttpClientGetNoResponseBodyTest, HttpClientPostFileTest, HttpClientPostFilePtrTest, HttpUnitTest, HttpRequestLineTest, HttpClientGetTimeout, HttpStaticFileHandlerWrkTest, HttpStaticUrlHandlerWrkTest, HttpConnTimeoutTest, - HttpClientGetRepeat, HttpClientPostRepeat, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest) + HttpClientGetRepeatTest, HttpClientPostRepeatTest, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest) RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest, PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest, - PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest) + PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest, HttpClientGetRepeatMTTest, HttpClientPtrGetRepeatMTTest) } const wwwRootPath = "/tmp/www_root" @@ -382,35 +381,37 @@ func httpClientGet(s *NoTopoSuite, response string, size int) { s.AssertContains(file_contents, response) } -func startSimpleServer(s *NoTopoSuite, replyCount *int, serverAddress string) (server *httptest.Server) { - var err error - server = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Hello") - *replyCount++ - })) - server.Listener, err = net.Listen("tcp", serverAddress+":80") - s.AssertNil(err, "Error while creating listener.") - - server.Start() +func HttpClientGetRepeatMTTest(s *NoTopoSuite) { + httpClientRepeat(s, "", "sessions 2") +} - return server +func HttpClientPtrGetRepeatMTTest(s *NoTopoSuite) { + httpClientRepeat(s, "", "use-ptr sessions 2") } -func HttpClientGetRepeat(s *NoTopoSuite) { - httpClientRepeat(s, "") +func HttpClientGetRepeatTest(s *NoTopoSuite) { + httpClientRepeat(s, "", "") } -func HttpClientPostRepeat(s *NoTopoSuite) { - httpClientRepeat(s, "post") +func HttpClientPostRepeatTest(s *NoTopoSuite) { + httpClientRepeat(s, "post", "") } -func httpClientRepeat(s *NoTopoSuite, requestMethod string) { - replyCount := 0 +func httpClientRepeat(s *NoTopoSuite, requestMethod string, clientArgs string) { vpp := s.Containers.Vpp.VppInstance - serverAddress := s.HostAddr() + logPath := s.Containers.NginxServer.GetContainerWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log" + serverAddress := s.Interfaces.Tap.Ip4AddressString() + replyCountInt := 0 repeatAmount := 10000 - server := startSimpleServer(s, &replyCount, serverAddress) - defer server.Close() + durationInSec := 10 + var err error + + // recreate interfaces with RX-queues + s.AssertNil(vpp.DeleteTap(s.Interfaces.Tap)) + s.AssertNil(vpp.CreateTap(s.Interfaces.Tap, 2, 2)) + + s.CreateNginxServer() + s.AssertNil(s.Containers.NginxServer.Start()) if requestMethod == "post" { fileName := "/tmp/test_file.txt" @@ -419,41 +420,45 @@ func httpClientRepeat(s *NoTopoSuite, requestMethod string) { requestMethod += " file /tmp/test_file.txt" } - uri := "http://" + serverAddress + "/80" - cmd := fmt.Sprintf("http client %s use-ptr duration 10 header Hello:World uri %s target /index.html", - requestMethod, uri) + uri := "http://" + serverAddress + "/" + s.GetPortFromPpid() + cmd := fmt.Sprintf("http client %s %s duration %d header Hello:World uri %s target /index.html", + requestMethod, clientArgs, durationInSec, uri) - s.Log("Duration 10s") + s.Log("Duration %ds", durationInSec) o := vpp.Vppctl(cmd) - outputLen := len(o) - if outputLen > 500 { - s.Log(o[:500]) - s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen)) - } else { - s.Log(o) + s.Log(o) + + replyCount := s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath) + if replyCount != "" { + replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1]) + s.AssertNil(err) } - s.Log("Server response count: %d", replyCount) + // empty the log file + s.Containers.NginxServer.Exec(false, "truncate -s 0 "+logPath) + + s.Log("Server response count: %d", replyCountInt) s.AssertNotNil(o) s.AssertNotContains(o, "error") - s.AssertGreaterThan(replyCount, 15000) + s.AssertGreaterThan(replyCountInt, 15000) - cmd = fmt.Sprintf("http client %s use-ptr repeat %d header Hello:World uri %s target /index.html", - requestMethod, repeatAmount, uri) + replyCount = "" + cmd = fmt.Sprintf("http client %s %s repeat %d header Hello:World uri %s target /index.html", + requestMethod, clientArgs, repeatAmount, uri) - replyCount = 0 + s.AssertNil(err, fmt.Sprint(err)) s.Log("Repeat %d", repeatAmount) o = vpp.Vppctl(cmd) - outputLen = len(o) - if outputLen > 500 { - s.Log(o[:500]) - s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen)) - } else { - s.Log(o) + s.Log(o) + + replyCount = s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath) + if replyCount != "" { + replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1]) + s.AssertNil(err) } - s.Log("Server response count: %d", replyCount) + s.Log("Server response count: %d", replyCountInt) s.AssertNotNil(o) s.AssertNotContains(o, "error") - s.AssertEqual(repeatAmount, replyCount) + s.AssertEqual(repeatAmount, replyCountInt) } func HttpClientGetTimeout(s *NoTopoSuite) { diff --git a/extras/hs-test/infra/suite_no_topo.go b/extras/hs-test/infra/suite_no_topo.go index 1c7b6fe91c3..d084413f7e6 100644 --- a/extras/hs-test/infra/suite_no_topo.go +++ b/extras/hs-test/infra/suite_no_topo.go @@ -18,13 +18,15 @@ type NoTopoSuite struct { Tap *NetInterface } Containers struct { - Vpp *Container - Nginx *Container - NginxHttp3 *Container - Wrk *Container - Curl *Container - Ab *Container + Vpp *Container + Nginx *Container + NginxHttp3 *Container + NginxServer *Container + Wrk *Container + Curl *Container + Ab *Container } + NginxServerPort string } func RegisterNoTopoTests(tests ...func(s *NoTopoSuite)) { @@ -42,6 +44,7 @@ func (s *NoTopoSuite) SetupSuite() { s.Containers.Vpp = s.GetContainerByName("vpp") s.Containers.Nginx = s.GetContainerByName("nginx") s.Containers.NginxHttp3 = s.GetContainerByName("nginx-http3") + s.Containers.NginxServer = s.GetTransientContainerByName("nginx-server") s.Containers.Wrk = s.GetContainerByName("wrk") s.Containers.Curl = s.GetContainerByName("curl") s.Containers.Ab = s.GetContainerByName("ab") @@ -101,6 +104,28 @@ func (s *NoTopoSuite) CreateNginxConfig(container *Container, multiThreadWorkers ) } +// Creates container and config. +func (s *NoTopoSuite) CreateNginxServer() { + s.AssertNil(s.Containers.NginxServer.Create()) + s.NginxServerPort = s.GetPortFromPpid() + nginxSettings := struct { + LogPrefix string + Address string + Port string + Timeout int + }{ + LogPrefix: s.Containers.NginxServer.Name, + Address: s.Interfaces.Tap.Ip4AddressString(), + Port: s.NginxServerPort, + Timeout: 600, + } + s.Containers.NginxServer.CreateConfigFromTemplate( + "/nginx.conf", + "./resources/nginx/nginx_server.conf", + nginxSettings, + ) +} + func (s *NoTopoSuite) AddNginxVclConfig(multiThreadWorkers bool) { vclFileName := s.Containers.Nginx.GetHostWorkDir() + "/vcl.conf" appSocketApi := fmt.Sprintf("app-socket-api %s/var/run/app_ns_sockets/default", diff --git a/extras/hs-test/topo-containers/single.yaml b/extras/hs-test/topo-containers/single.yaml index b4449dc1918..2f5f31b6879 100644 --- a/extras/hs-test/topo-containers/single.yaml +++ b/extras/hs-test/topo-containers/single.yaml @@ -28,6 +28,14 @@ containers: image: "hs-test/nginx-http3" is-optional: true + - name: "nginx-server" + volumes: + - <<: *shared-vol + container-dir: "/tmp/nginx" + is-default-work-dir: true + image: "hs-test/nginx-server" + is-optional: true + - name: "ab" image: "hs-test/ab" is-optional: true diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c index 20271fc4aea..e4759317cbe 100644 --- a/src/plugins/hs_apps/http_client.c +++ b/src/plugins/hs_apps/http_client.c @@ -1,5 +1,5 @@ /* SPDX-License-Identifier: Apache-2.0 - * Copyright(c) 2024 Cisco Systems, Inc. + * Copyright(c) 2025 Cisco Systems, Inc. */ #include <vnet/session/application.h> @@ -12,29 +12,36 @@ typedef struct { + u64 req_per_wrk; + u64 request_count; + f64 start, end; + f64 elapsed_time; +} hc_stats_t; + +typedef struct +{ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); u32 session_index; u32 thread_index; - u32 vpp_session_index; u64 to_recv; u8 is_closed; + hc_stats_t stats; + u64 data_offset; + u8 *resp_headers; + u8 *http_response; + u8 *response_status; } hc_session_t; typedef struct { - u64 request_count; - f64 start, end; - f64 elapsed_time; -} hc_stats_t; - -typedef struct -{ hc_session_t *sessions; u32 thread_index; vlib_main_t *vlib_main; u8 *headers_buf; http_headers_ctx_t req_headers; http_msg_t msg; + u32 session_index; + bool has_common_headers; } hc_worker_t; typedef struct @@ -52,11 +59,7 @@ typedef struct session_endpoint_cfg_t connect_sep; u8 *target; u8 *data; - u64 data_offset; hc_worker_t *wrk; - u8 *resp_headers; - u8 *http_response; - u8 *response_status; hc_http_header_t *custom_header; u8 is_file; u8 use_ptr; @@ -67,6 +70,18 @@ typedef struct u64 repeat_count; f64 duration; bool repeat; + bool multi_session; + u32 done_count; + u32 connected_counter; + u32 worker_index; + u32 max_sessions; + u32 private_segment_size; + u32 prealloc_fifos; + u32 fifo_size; + u8 *appns_id; + u64 appns_secret; + clib_spinlock_t lock; + bool was_transport_closed; } hc_main_t; typedef enum @@ -95,13 +110,6 @@ hc_session_get (u32 session_index, u32 thread_index) return pool_elt_at_index (wrk->sessions, session_index); } -static void -hc_ho_session_free (u32 hs_index) -{ - hc_worker_t *wrk = hc_worker_get (0); - pool_put_index (wrk->sessions, hs_index); -} - static hc_session_t * hc_session_alloc (hc_worker_t *wrk) { @@ -115,14 +123,14 @@ hc_session_alloc (hc_worker_t *wrk) } static int -hc_request (session_t *s, session_error_t err) +hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session, + session_error_t err) { hc_main_t *hcm = &hc_main; u64 to_send; u32 n_enq; u8 n_segs; int rv; - hc_worker_t *wrk = hc_worker_get (s->thread_index); if (hcm->use_ptr) { @@ -166,7 +174,7 @@ hc_request (session_t *s, session_error_t err) rv = svm_fifo_enqueue (s->tx_fifo, n_enq, hcm->data); if (rv < to_send) { - hcm->data_offset = (rv > 0) ? rv : 0; + hc_session->data_offset = (rv > 0) ? rv : 0; svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } } @@ -185,9 +193,8 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index, { hc_main_t *hcm = &hc_main; hc_worker_t *wrk; - u32 new_hc_index; + hc_session_t *hc_session; hc_http_header_t *header; - HTTP_DBG (1, "ho hc_index: %d", hc_session_index); if (err) { @@ -199,68 +206,89 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index, } wrk = hc_worker_get (s->thread_index); - hc_session_t *hc_session, *new_hc_session = hc_session_alloc (wrk); - hc_session = hc_session_get (hc_session_index, 0); - new_hc_index = new_hc_session->session_index; - clib_memcpy_fast (new_hc_session, hc_session, sizeof (*hc_session)); - new_hc_session->session_index = new_hc_index; - new_hc_session->thread_index = s->thread_index; - new_hc_session->vpp_session_index = s->session_index; - HTTP_DBG (1, "new hc_index: %d", new_hc_session->session_index); - s->opaque = new_hc_index; + hc_session = hc_session_alloc (wrk); + clib_spinlock_lock_if_init (&hcm->lock); + hcm->connected_counter++; + clib_spinlock_unlock_if_init (&hcm->lock); - if (hcm->req_method == HTTP_REQ_POST) + hc_session->thread_index = s->thread_index; + s->opaque = hc_session->session_index; + wrk->session_index = hc_session->session_index; + + if (hcm->multi_session) { - if (hcm->is_file) - http_add_header ( - &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE, - http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM)); - else - http_add_header ( - &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE, - http_content_type_token (HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED)); + hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions; + clib_spinlock_lock_if_init (&hcm->lock); + /* add remaining requests to the first connected session */ + if (hcm->connected_counter == 1) + { + hc_session->stats.req_per_wrk += + hcm->repeat_count % hcm->max_sessions; + } + clib_spinlock_unlock_if_init (&hcm->lock); } - http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1); - - vec_foreach (header, hcm->custom_header) - http_add_custom_header ( - &wrk->req_headers, (const char *) header->name, vec_len (header->name), - (const char *) header->value, vec_len (header->value)); - - clib_warning ("%U", format_http_bytes, wrk->headers_buf, - wrk->req_headers.tail_offset); - wrk->msg.method_type = hcm->req_method; - if (hcm->req_method == HTTP_REQ_POST) - wrk->msg.data.body_len = vec_len (hcm->data); else - wrk->msg.data.body_len = 0; - - wrk->msg.type = HTTP_MSG_REQUEST; - /* request target */ - wrk->msg.data.target_path_len = vec_len (hcm->target); - /* custom headers */ - wrk->msg.data.headers_len = wrk->req_headers.tail_offset; - /* total length */ - wrk->msg.data.len = wrk->msg.data.target_path_len + - wrk->msg.data.headers_len + wrk->msg.data.body_len; - - if (hcm->use_ptr) { - wrk->msg.data.type = HTTP_MSG_DATA_PTR; + hc_session->stats.req_per_wrk = hcm->repeat_count; + hcm->worker_index = s->thread_index; } - else + + if (!wrk->has_common_headers) { - wrk->msg.data.type = HTTP_MSG_DATA_INLINE; - wrk->msg.data.target_path_offset = 0; - wrk->msg.data.headers_offset = wrk->msg.data.target_path_len; - wrk->msg.data.body_offset = - wrk->msg.data.headers_offset + wrk->msg.data.headers_len; + wrk->has_common_headers = true; + if (hcm->req_method == HTTP_REQ_POST) + { + if (hcm->is_file) + http_add_header ( + &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE, + http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM)); + else + http_add_header (&wrk->req_headers, HTTP_HEADER_CONTENT_TYPE, + http_content_type_token ( + HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED)); + } + http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1); + + vec_foreach (header, hcm->custom_header) + http_add_custom_header (&wrk->req_headers, (const char *) header->name, + vec_len (header->name), + (const char *) header->value, + vec_len (header->value)); + + wrk->msg.method_type = hcm->req_method; + if (hcm->req_method == HTTP_REQ_POST) + wrk->msg.data.body_len = vec_len (hcm->data); + else + wrk->msg.data.body_len = 0; + + wrk->msg.type = HTTP_MSG_REQUEST; + /* request target */ + wrk->msg.data.target_path_len = vec_len (hcm->target); + /* custom headers */ + wrk->msg.data.headers_len = wrk->req_headers.tail_offset; + /* total length */ + wrk->msg.data.len = wrk->msg.data.target_path_len + + wrk->msg.data.headers_len + wrk->msg.data.body_len; + + if (hcm->use_ptr) + { + wrk->msg.data.type = HTTP_MSG_DATA_PTR; + } + else + { + wrk->msg.data.type = HTTP_MSG_DATA_INLINE; + wrk->msg.data.target_path_offset = 0; + wrk->msg.data.headers_offset = wrk->msg.data.target_path_len; + wrk->msg.data.body_offset = + wrk->msg.data.headers_offset + wrk->msg.data.headers_len; + } } if (hcm->repeat) - hc_stats.start = vlib_time_now (vlib_get_main_by_index (s->thread_index)); + hc_session->stats.start = + vlib_time_now (vlib_get_main_by_index (s->thread_index)); - return hc_request (s, err); + return hc_request (s, wrk, hc_session, err); } static void @@ -275,21 +303,38 @@ hc_session_disconnect_callback (session_t *s) if ((rv = vnet_disconnect_session (a))) clib_warning ("warning: disconnect returned: %U", format_session_error, rv); + clib_spinlock_lock_if_init (&hcm->lock); + hcm->done_count++; + clib_spinlock_unlock_if_init (&hcm->lock); } static void hc_session_transport_closed_callback (session_t *s) { hc_main_t *hcm = &hc_main; - vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index, - HC_TRANSPORT_CLOSED, 0); -} + hc_worker_t *wrk = hc_worker_get (s->thread_index); -static void -hc_ho_cleanup_callback (session_t *s) -{ - HTTP_DBG (1, "ho hc_index: %d:", s->opaque); - hc_ho_session_free (s->opaque); + clib_spinlock_lock_if_init (&hcm->lock); + if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED) + { + hcm->was_transport_closed = true; + } + + /* send an event when all sessions are closed */ + if (hcm->done_count >= hcm->max_sessions) + { + if (hcm->was_transport_closed) + { + vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index, + HC_TRANSPORT_CLOSED, 0); + } + else + { + vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index, + HC_REPEAT_DONE, 0); + } + } + clib_spinlock_unlock_if_init (&hcm->lock); } static void @@ -315,20 +360,23 @@ hc_rx_callback (session_t *s) { hc_main_t *hcm = &hc_main; hc_worker_t *wrk = hc_worker_get (s->thread_index); - hc_session_t *hc_session; + hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index); http_msg_t msg; int rv; + u32 max_deq; session_error_t session_err = 0; int send_err = 0; - hc_session = hc_session_get (s->opaque, s->thread_index); - if (hc_session->is_closed) { clib_warning ("hc_session_index[%d] is closed", s->opaque); return -1; } + max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (max_deq == 0)) + goto done; + if (hc_session->to_recv == 0) { rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg); @@ -344,17 +392,20 @@ hc_rx_callback (session_t *s) if (msg.data.headers_len) { - hcm->response_status = - format (0, "%U", format_http_status_code, msg.code); + + if (!hcm->repeat) + hc_session->response_status = + format (0, "%U", format_http_status_code, msg.code); + svm_fifo_dequeue_drop (s->rx_fifo, msg.data.headers_offset); - vec_validate (hcm->resp_headers, msg.data.headers_len - 1); - vec_set_len (hcm->resp_headers, msg.data.headers_len); + vec_validate (hc_session->resp_headers, msg.data.headers_len - 1); + vec_set_len (hc_session->resp_headers, msg.data.headers_len); rv = svm_fifo_dequeue (s->rx_fifo, msg.data.headers_len, - hcm->resp_headers); + hc_session->resp_headers); ASSERT (rv == msg.data.headers_len); - HTTP_DBG (1, (char *) format (0, "%v", hcm->resp_headers)); + HTTP_DBG (1, (char *) format (0, "%v", hc_session->resp_headers)); msg.data.body_offset -= msg.data.headers_len + msg.data.headers_offset; } @@ -372,18 +423,18 @@ hc_rx_callback (session_t *s) { goto done; } - vec_validate (hcm->http_response, msg.data.body_len - 1); - vec_reset_length (hcm->http_response); + vec_validate (hc_session->http_response, msg.data.body_len - 1); + vec_reset_length (hc_session->http_response); } - u32 max_deq = svm_fifo_max_dequeue (s->rx_fifo); + max_deq = svm_fifo_max_dequeue (s->rx_fifo); if (!max_deq) { goto done; } u32 n_deq = clib_min (hc_session->to_recv, max_deq); - u32 curr = vec_len (hcm->http_response); - rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hcm->http_response + curr); + u32 curr = vec_len (hc_session->http_response); + rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hc_session->http_response + curr); if (rv < 0) { clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv); @@ -393,7 +444,7 @@ hc_rx_callback (session_t *s) } ASSERT (rv == n_deq); - vec_set_len (hcm->http_response, curr + n_deq); + vec_set_len (hc_session->http_response, curr + n_deq); ASSERT (hc_session->to_recv >= rv); hc_session->to_recv -= rv; @@ -402,20 +453,19 @@ done: { if (hcm->repeat) { - hc_stats.request_count++; - hc_stats.end = vlib_time_now (wrk->vlib_main); - hc_stats.elapsed_time = hc_stats.end - hc_stats.start; + hc_session->stats.request_count++; + hc_session->stats.end = vlib_time_now (wrk->vlib_main); + hc_session->stats.elapsed_time = + hc_session->stats.end - hc_session->stats.start; - if (hc_stats.elapsed_time >= hcm->duration && - hc_stats.request_count >= hcm->repeat_count) + if (hc_session->stats.elapsed_time >= hcm->duration && + hc_session->stats.request_count >= hc_session->stats.req_per_wrk) { - vlib_process_signal_event_mt ( - wrk->vlib_main, hcm->cli_node_index, HC_REPEAT_DONE, 0); hc_session_disconnect_callback (s); } else { - send_err = hc_request (s, session_err); + send_err = hc_request (s, wrk, hc_session, session_err); if (send_err) clib_warning ("failed to send request, error %d", send_err); } @@ -434,11 +484,13 @@ static int hc_tx_callback (session_t *s) { hc_main_t *hcm = &hc_main; + hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index); u64 to_send; int rv; - to_send = vec_len (hcm->data) - hcm->data_offset; - rv = svm_fifo_enqueue (s->tx_fifo, to_send, hcm->data + hcm->data_offset); + to_send = vec_len (hcm->data) - hc_session->data_offset; + rv = svm_fifo_enqueue (s->tx_fifo, to_send, + hcm->data + hc_session->data_offset); if (rv <= 0) { @@ -448,7 +500,7 @@ hc_tx_callback (session_t *s) if (rv < to_send) { - hcm->data_offset += rv; + hc_session->data_offset += rv; svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } @@ -465,7 +517,6 @@ static session_cb_vft_t hc_session_cb_vft = { .session_reset_callback = hc_session_reset_callback, .builtin_app_rx_callback = hc_rx_callback, .builtin_app_tx_callback = hc_tx_callback, - .half_open_cleanup_callback = hc_ho_cleanup_callback, }; static clib_error_t * @@ -474,8 +525,12 @@ hc_attach () hc_main_t *hcm = &hc_main; vnet_app_attach_args_t _a, *a = &_a; u64 options[18]; + u32 segment_size = 128 << 20; int rv; + if (hcm->private_segment_size) + segment_size = hcm->private_segment_size; + clib_memset (a, 0, sizeof (*a)); clib_memset (options, 0, sizeof (options)); @@ -483,7 +538,19 @@ hc_attach () a->name = format (0, "http_client"); a->session_cb_vft = &hc_session_cb_vft; a->options = options; + a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size; + a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size; + a->options[APP_OPTIONS_RX_FIFO_SIZE] = + hcm->fifo_size ? hcm->fifo_size : 8 << 10; + a->options[APP_OPTIONS_TX_FIFO_SIZE] = + hcm->fifo_size ? hcm->fifo_size : 32 << 10; a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN; + a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos; + if (hcm->appns_id) + { + a->namespace_id = hcm->appns_id; + a->options[APP_OPTIONS_NAMESPACE_SECRET] = hcm->appns_secret; + } if ((rv = vnet_application_attach (a))) return clib_error_return (0, "attach returned: %U", format_session_error, @@ -500,14 +567,19 @@ static int hc_connect_rpc (void *rpc_args) { vnet_connect_args_t *a = rpc_args; - int rv; + int rv = ~0; + hc_main_t *hcm = &hc_main; - rv = vnet_connect (a); - if (rv > 0) - clib_warning (0, "connect returned: %U", format_session_error, rv); + for (u32 i = 0; i < hcm->max_sessions; i++) + { + rv = vnet_connect (a); + if (rv > 0) + clib_warning (0, "connect returned: %U", format_session_error, rv); + } session_endpoint_free_ext_cfgs (&a->sep_ext); vec_free (a); + return rv; } @@ -516,14 +588,10 @@ hc_connect () { hc_main_t *hcm = &hc_main; vnet_connect_args_t *a = 0; - hc_worker_t *wrk; - hc_session_t *hc_session; transport_endpt_ext_cfg_t *ext_cfg; transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 }; - vec_validate (a, 0); clib_memset (a, 0, sizeof (a[0])); - clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); a->app_index = hcm->app_index; @@ -531,15 +599,41 @@ hc_connect () &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (http_cfg)); clib_memcpy (ext_cfg->data, &http_cfg, sizeof (http_cfg)); - /* allocate http session on main thread */ - wrk = hc_worker_get (0); - hc_session = hc_session_alloc (wrk); - a->api_context = hc_session->session_index; - session_send_rpc_evt_to_thread_force (transport_cl_thread (), hc_connect_rpc, a); } +static void +hc_get_repeat_stats (vlib_main_t *vm) +{ + hc_main_t *hcm = &hc_main; + hc_worker_t *wrk; + hc_session_t *hc_session; + + if (hcm->repeat) + { + vec_foreach (wrk, hcm->wrk) + { + vec_foreach (hc_session, wrk->sessions) + { + hc_stats.request_count += hc_session->stats.request_count; + hc_session->stats.request_count = 0; + if (hc_stats.elapsed_time < hc_session->stats.elapsed_time) + { + hc_stats.elapsed_time = hc_session->stats.elapsed_time; + hc_session->stats.elapsed_time = 0; + } + } + } + vlib_cli_output (vm, + "< %d request(s) in %.6fs\n< avg latency " + "%.4fms\n< %.2f req/sec", + hc_stats.request_count, hc_stats.elapsed_time, + (hc_stats.elapsed_time / hc_stats.request_count) * 1000, + hc_stats.request_count / hc_stats.elapsed_time); + } +} + static clib_error_t * hc_get_event (vlib_main_t *vm) { @@ -548,6 +642,8 @@ hc_get_event (vlib_main_t *vm) clib_error_t *err = NULL; FILE *file_ptr; u64 event_timeout; + hc_worker_t *wrk; + hc_session_t *hc_session; event_timeout = hcm->timeout ? hcm->timeout : 10; if (event_timeout == hcm->duration) @@ -558,20 +654,26 @@ hc_get_event (vlib_main_t *vm) switch (event_type) { case ~0: + hc_get_repeat_stats (vm); err = clib_error_return (0, "error: timeout"); break; case HC_CONNECT_FAILED: + hc_get_repeat_stats (vm); err = clib_error_return (0, "error: failed to connect"); break; case HC_TRANSPORT_CLOSED: + hc_get_repeat_stats (vm); err = clib_error_return (0, "error: transport closed"); break; case HC_GENERIC_ERR: + hc_get_repeat_stats (vm); err = clib_error_return (0, "error: unknown"); break; case HC_REPLY_RECEIVED: if (hcm->filename) { + wrk = hc_worker_get (hcm->worker_index); + hc_session = hc_session_get (wrk->session_index, wrk->thread_index); file_ptr = fopen ((char *) format (0, "/tmp/%v", hcm->filename), "a"); if (file_ptr == NULL) @@ -580,26 +682,27 @@ hc_get_event (vlib_main_t *vm) } else { - fprintf (file_ptr, "< %s\n< %s\n< %s", hcm->response_status, - hcm->resp_headers, hcm->http_response); + fprintf (file_ptr, "< %s\n< %s\n< %s", + hc_session->response_status, hc_session->resp_headers, + hc_session->http_response); fclose (file_ptr); vlib_cli_output (vm, "file saved (/tmp/%v)", hcm->filename); } } if (hcm->verbose) - vlib_cli_output (vm, "< %v< %v", hcm->response_status, - hcm->resp_headers); - vlib_cli_output (vm, "\n%v\n", hcm->http_response); + { + wrk = hc_worker_get (hcm->worker_index); + hc_session = hc_session_get (wrk->session_index, wrk->thread_index); + vlib_cli_output (vm, "< %v< %v", hc_session->response_status, + hc_session->resp_headers); + vlib_cli_output (vm, "\n%v\n", hc_session->http_response); + } break; case HC_REPEAT_DONE: - vlib_cli_output (vm, - "< %d request(s) in %.6fs\n< avg latency " - "%.4fms\n< %.2f req/sec", - hc_stats.request_count, hc_stats.elapsed_time, - (hc_stats.elapsed_time / hc_stats.request_count) * 1000, - hc_stats.request_count / hc_stats.elapsed_time); + hc_get_repeat_stats (vm); break; default: + hc_get_repeat_stats (vm); err = clib_error_return (0, "error: unexpected event %d", event_type); break; } @@ -612,15 +715,17 @@ static clib_error_t * hc_run (vlib_main_t *vm) { hc_main_t *hcm = &hc_main; - vlib_thread_main_t *vtm = vlib_get_thread_main (); u32 num_threads; hc_worker_t *wrk; clib_error_t *err; - num_threads = 1 /* main thread */ + vtm->n_threads; + num_threads = 1 /* main thread */ + vlib_num_workers (); + if (vlib_num_workers ()) + clib_spinlock_init (&hcm->lock); vec_validate (hcm->wrk, num_threads - 1); vec_foreach (wrk, hcm->wrk) { + wrk->has_common_headers = false; wrk->thread_index = wrk - hcm->wrk; /* 4k for headers should be enough */ vec_validate (wrk->headers_buf, 4095); @@ -657,10 +762,18 @@ hc_detach () } static void -hcc_worker_cleanup (hc_worker_t *wrk) +hc_worker_cleanup (hc_worker_t *wrk) { - HTTP_DBG (1, "worker cleanup"); + hc_session_t *hc_session; + HTTP_DBG (1, "worker and worker sessions cleanup"); + vec_free (wrk->headers_buf); + vec_foreach (hc_session, wrk->sessions) + { + vec_free (hc_session->resp_headers); + vec_free (hc_session->http_response); + vec_free (hc_session->response_status); + } pool_free (wrk->sessions); } @@ -673,16 +786,14 @@ hc_cleanup () hc_http_header_t *header; vec_foreach (wrk, hcm->wrk) - hcc_worker_cleanup (wrk); + hc_worker_cleanup (wrk); vec_free (hcm->uri); vec_free (hcm->target); vec_free (hcm->data); - vec_free (hcm->resp_headers); - vec_free (hcm->http_response); - vec_free (hcm->response_status); vec_free (hcm->wrk); vec_free (hcm->filename); + vec_free (hcm->appns_id); vec_foreach (header, hcm->custom_header) { vec_free (header->name); @@ -698,6 +809,8 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, hc_main_t *hcm = &hc_main; clib_error_t *err = 0; unformat_input_t _line_input, *line_input = &_line_input; + u64 mem_size; + u8 *appns_id = 0; u8 *path = 0; u8 *file_data; hc_http_header_t new_header; @@ -708,7 +821,16 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, hcm->repeat_count = 0; hcm->duration = 0; hcm->repeat = false; + hcm->multi_session = false; + hcm->done_count = 0; + hcm->connected_counter = 0; + hcm->max_sessions = 1; + hcm->prealloc_fifos = 0; + hcm->private_segment_size = 0; + hcm->fifo_size = 0; + hcm->was_transport_closed = false; hc_stats.request_count = 0; + hc_stats.elapsed_time = 0; if (hcm->attached) return clib_error_return (0, "failed: already running!"); @@ -761,6 +883,29 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, } else if (unformat (line_input, "duration %f", &hcm->duration)) hcm->repeat = true; + else if (unformat (line_input, "sessions %d", &hcm->max_sessions)) + { + hcm->multi_session = true; + if (hcm->max_sessions <= 1) + { + err = clib_error_return (0, "sessions must be > 1"); + goto done; + } + } + else if (unformat (line_input, "prealloc-fifos %d", + &hcm->prealloc_fifos)) + ; + else if (unformat (line_input, "private-segment-size %U", + unformat_memory_size, &mem_size)) + hcm->private_segment_size = mem_size; + else if (unformat (line_input, "fifo-size %U", unformat_memory_size, + &mem_size)) + hcm->fifo_size = mem_size; + else if (unformat (line_input, "appns %_%v%_", &appns_id)) + ; + else if (unformat (line_input, "secret %lu", &hcm->appns_secret)) + ; + else { err = clib_error_return (0, "unknown input `%U'", @@ -801,6 +946,13 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } + if (hcm->multi_session && !hcm->repeat) + { + err = clib_error_return ( + 0, "multiple sessions are only supported with request repeating"); + goto done; + } + if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep))) { err = @@ -808,6 +960,12 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } + if (hcm->duration >= hcm->timeout) + { + hcm->timeout = hcm->duration + 10; + } + hcm->appns_id = appns_id; + if (hcm->repeat) vlib_cli_output (vm, "Running, please wait..."); @@ -845,7 +1003,9 @@ VLIB_CLI_COMMAND (hc_command, static) = { "[post] uri http://<ip-addr> target <origin-form> " "[data <form-urlencoded> | file <file-path>] [use-ptr] " "[save-to <filename>] [header <Key:Value>] [verbose] " - "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>]", + "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>] " + "[sessions <# of sessions>] [appns <app-ns> secret <appns-secret>] " + "[fifo-size <nM|G>] [private-segment-size <nM|G>] [prealloc-fifos <n>]", .function = hc_command_fn, .is_mp_safe = 1, }; |