aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdrian Villin <avillin@cisco.com>2025-01-24 13:56:22 +0100
committerFlorin Coras <florin.coras@gmail.com>2025-02-11 20:14:07 +0000
commitc8174f3660d2577c3819b51d65a3f2836d8afe31 (patch)
treee7ee8aec9423fb1914b07c4680ee6d9d9f34231f
parent29b1ece1d07477676e467e1b0f51987d47bd2997 (diff)
hsa: http client parallel sessions
- client is now able to use multiple workers to send requests (sometimes it uses multiple sessions on a single worker) Type: feature Change-Id: I2d83d47a9768011b3d8d05ed320852606841e4b8 Signed-off-by: Adrian Villin <avillin@cisco.com>
-rw-r--r--extras/hs-test/http_test.go97
-rw-r--r--extras/hs-test/infra/suite_no_topo.go37
-rw-r--r--extras/hs-test/topo-containers/single.yaml8
-rw-r--r--src/plugins/hs_apps/http_client.c448
4 files changed, 394 insertions, 196 deletions
diff --git a/extras/hs-test/http_test.go b/extras/hs-test/http_test.go
index 68934550b69..b143e559244 100644
--- a/extras/hs-test/http_test.go
+++ b/extras/hs-test/http_test.go
@@ -7,7 +7,6 @@ import (
"math/rand"
"net"
"net/http"
- "net/http/httptest"
"net/http/httptrace"
"os"
"strconv"
@@ -36,10 +35,10 @@ func init() {
HttpClientErrRespTest, HttpClientPostFormTest, HttpClientGet128kbResponseTest, HttpClientGetResponseBodyTest,
HttpClientGetNoResponseBodyTest, HttpClientPostFileTest, HttpClientPostFilePtrTest, HttpUnitTest,
HttpRequestLineTest, HttpClientGetTimeout, HttpStaticFileHandlerWrkTest, HttpStaticUrlHandlerWrkTest, HttpConnTimeoutTest,
- HttpClientGetRepeat, HttpClientPostRepeat, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest)
+ HttpClientGetRepeatTest, HttpClientPostRepeatTest, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest)
RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest,
PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest,
- PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest)
+ PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest, HttpClientGetRepeatMTTest, HttpClientPtrGetRepeatMTTest)
}
const wwwRootPath = "/tmp/www_root"
@@ -382,35 +381,37 @@ func httpClientGet(s *NoTopoSuite, response string, size int) {
s.AssertContains(file_contents, response)
}
-func startSimpleServer(s *NoTopoSuite, replyCount *int, serverAddress string) (server *httptest.Server) {
- var err error
- server = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprintf(w, "Hello")
- *replyCount++
- }))
- server.Listener, err = net.Listen("tcp", serverAddress+":80")
- s.AssertNil(err, "Error while creating listener.")
-
- server.Start()
+func HttpClientGetRepeatMTTest(s *NoTopoSuite) {
+ httpClientRepeat(s, "", "sessions 2")
+}
- return server
+func HttpClientPtrGetRepeatMTTest(s *NoTopoSuite) {
+ httpClientRepeat(s, "", "use-ptr sessions 2")
}
-func HttpClientGetRepeat(s *NoTopoSuite) {
- httpClientRepeat(s, "")
+func HttpClientGetRepeatTest(s *NoTopoSuite) {
+ httpClientRepeat(s, "", "")
}
-func HttpClientPostRepeat(s *NoTopoSuite) {
- httpClientRepeat(s, "post")
+func HttpClientPostRepeatTest(s *NoTopoSuite) {
+ httpClientRepeat(s, "post", "")
}
-func httpClientRepeat(s *NoTopoSuite, requestMethod string) {
- replyCount := 0
+func httpClientRepeat(s *NoTopoSuite, requestMethod string, clientArgs string) {
vpp := s.Containers.Vpp.VppInstance
- serverAddress := s.HostAddr()
+ logPath := s.Containers.NginxServer.GetContainerWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+ serverAddress := s.Interfaces.Tap.Ip4AddressString()
+ replyCountInt := 0
repeatAmount := 10000
- server := startSimpleServer(s, &replyCount, serverAddress)
- defer server.Close()
+ durationInSec := 10
+ var err error
+
+ // recreate interfaces with RX-queues
+ s.AssertNil(vpp.DeleteTap(s.Interfaces.Tap))
+ s.AssertNil(vpp.CreateTap(s.Interfaces.Tap, 2, 2))
+
+ s.CreateNginxServer()
+ s.AssertNil(s.Containers.NginxServer.Start())
if requestMethod == "post" {
fileName := "/tmp/test_file.txt"
@@ -419,41 +420,45 @@ func httpClientRepeat(s *NoTopoSuite, requestMethod string) {
requestMethod += " file /tmp/test_file.txt"
}
- uri := "http://" + serverAddress + "/80"
- cmd := fmt.Sprintf("http client %s use-ptr duration 10 header Hello:World uri %s target /index.html",
- requestMethod, uri)
+ uri := "http://" + serverAddress + "/" + s.GetPortFromPpid()
+ cmd := fmt.Sprintf("http client %s %s duration %d header Hello:World uri %s target /index.html",
+ requestMethod, clientArgs, durationInSec, uri)
- s.Log("Duration 10s")
+ s.Log("Duration %ds", durationInSec)
o := vpp.Vppctl(cmd)
- outputLen := len(o)
- if outputLen > 500 {
- s.Log(o[:500])
- s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen))
- } else {
- s.Log(o)
+ s.Log(o)
+
+ replyCount := s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath)
+ if replyCount != "" {
+ replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1])
+ s.AssertNil(err)
}
- s.Log("Server response count: %d", replyCount)
+ // empty the log file
+ s.Containers.NginxServer.Exec(false, "truncate -s 0 "+logPath)
+
+ s.Log("Server response count: %d", replyCountInt)
s.AssertNotNil(o)
s.AssertNotContains(o, "error")
- s.AssertGreaterThan(replyCount, 15000)
+ s.AssertGreaterThan(replyCountInt, 15000)
- cmd = fmt.Sprintf("http client %s use-ptr repeat %d header Hello:World uri %s target /index.html",
- requestMethod, repeatAmount, uri)
+ replyCount = ""
+ cmd = fmt.Sprintf("http client %s %s repeat %d header Hello:World uri %s target /index.html",
+ requestMethod, clientArgs, repeatAmount, uri)
- replyCount = 0
+ s.AssertNil(err, fmt.Sprint(err))
s.Log("Repeat %d", repeatAmount)
o = vpp.Vppctl(cmd)
- outputLen = len(o)
- if outputLen > 500 {
- s.Log(o[:500])
- s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen))
- } else {
- s.Log(o)
+ s.Log(o)
+
+ replyCount = s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath)
+ if replyCount != "" {
+ replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1])
+ s.AssertNil(err)
}
- s.Log("Server response count: %d", replyCount)
+ s.Log("Server response count: %d", replyCountInt)
s.AssertNotNil(o)
s.AssertNotContains(o, "error")
- s.AssertEqual(repeatAmount, replyCount)
+ s.AssertEqual(repeatAmount, replyCountInt)
}
func HttpClientGetTimeout(s *NoTopoSuite) {
diff --git a/extras/hs-test/infra/suite_no_topo.go b/extras/hs-test/infra/suite_no_topo.go
index 1c7b6fe91c3..d084413f7e6 100644
--- a/extras/hs-test/infra/suite_no_topo.go
+++ b/extras/hs-test/infra/suite_no_topo.go
@@ -18,13 +18,15 @@ type NoTopoSuite struct {
Tap *NetInterface
}
Containers struct {
- Vpp *Container
- Nginx *Container
- NginxHttp3 *Container
- Wrk *Container
- Curl *Container
- Ab *Container
+ Vpp *Container
+ Nginx *Container
+ NginxHttp3 *Container
+ NginxServer *Container
+ Wrk *Container
+ Curl *Container
+ Ab *Container
}
+ NginxServerPort string
}
func RegisterNoTopoTests(tests ...func(s *NoTopoSuite)) {
@@ -42,6 +44,7 @@ func (s *NoTopoSuite) SetupSuite() {
s.Containers.Vpp = s.GetContainerByName("vpp")
s.Containers.Nginx = s.GetContainerByName("nginx")
s.Containers.NginxHttp3 = s.GetContainerByName("nginx-http3")
+ s.Containers.NginxServer = s.GetTransientContainerByName("nginx-server")
s.Containers.Wrk = s.GetContainerByName("wrk")
s.Containers.Curl = s.GetContainerByName("curl")
s.Containers.Ab = s.GetContainerByName("ab")
@@ -101,6 +104,28 @@ func (s *NoTopoSuite) CreateNginxConfig(container *Container, multiThreadWorkers
)
}
+// Creates container and config.
+func (s *NoTopoSuite) CreateNginxServer() {
+ s.AssertNil(s.Containers.NginxServer.Create())
+ s.NginxServerPort = s.GetPortFromPpid()
+ nginxSettings := struct {
+ LogPrefix string
+ Address string
+ Port string
+ Timeout int
+ }{
+ LogPrefix: s.Containers.NginxServer.Name,
+ Address: s.Interfaces.Tap.Ip4AddressString(),
+ Port: s.NginxServerPort,
+ Timeout: 600,
+ }
+ s.Containers.NginxServer.CreateConfigFromTemplate(
+ "/nginx.conf",
+ "./resources/nginx/nginx_server.conf",
+ nginxSettings,
+ )
+}
+
func (s *NoTopoSuite) AddNginxVclConfig(multiThreadWorkers bool) {
vclFileName := s.Containers.Nginx.GetHostWorkDir() + "/vcl.conf"
appSocketApi := fmt.Sprintf("app-socket-api %s/var/run/app_ns_sockets/default",
diff --git a/extras/hs-test/topo-containers/single.yaml b/extras/hs-test/topo-containers/single.yaml
index b4449dc1918..2f5f31b6879 100644
--- a/extras/hs-test/topo-containers/single.yaml
+++ b/extras/hs-test/topo-containers/single.yaml
@@ -28,6 +28,14 @@ containers:
image: "hs-test/nginx-http3"
is-optional: true
+ - name: "nginx-server"
+ volumes:
+ - <<: *shared-vol
+ container-dir: "/tmp/nginx"
+ is-default-work-dir: true
+ image: "hs-test/nginx-server"
+ is-optional: true
+
- name: "ab"
image: "hs-test/ab"
is-optional: true
diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c
index 20271fc4aea..e4759317cbe 100644
--- a/src/plugins/hs_apps/http_client.c
+++ b/src/plugins/hs_apps/http_client.c
@@ -1,5 +1,5 @@
/* SPDX-License-Identifier: Apache-2.0
- * Copyright(c) 2024 Cisco Systems, Inc.
+ * Copyright(c) 2025 Cisco Systems, Inc.
*/
#include <vnet/session/application.h>
@@ -12,29 +12,36 @@
typedef struct
{
+ u64 req_per_wrk;
+ u64 request_count;
+ f64 start, end;
+ f64 elapsed_time;
+} hc_stats_t;
+
+typedef struct
+{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
u32 session_index;
u32 thread_index;
- u32 vpp_session_index;
u64 to_recv;
u8 is_closed;
+ hc_stats_t stats;
+ u64 data_offset;
+ u8 *resp_headers;
+ u8 *http_response;
+ u8 *response_status;
} hc_session_t;
typedef struct
{
- u64 request_count;
- f64 start, end;
- f64 elapsed_time;
-} hc_stats_t;
-
-typedef struct
-{
hc_session_t *sessions;
u32 thread_index;
vlib_main_t *vlib_main;
u8 *headers_buf;
http_headers_ctx_t req_headers;
http_msg_t msg;
+ u32 session_index;
+ bool has_common_headers;
} hc_worker_t;
typedef struct
@@ -52,11 +59,7 @@ typedef struct
session_endpoint_cfg_t connect_sep;
u8 *target;
u8 *data;
- u64 data_offset;
hc_worker_t *wrk;
- u8 *resp_headers;
- u8 *http_response;
- u8 *response_status;
hc_http_header_t *custom_header;
u8 is_file;
u8 use_ptr;
@@ -67,6 +70,18 @@ typedef struct
u64 repeat_count;
f64 duration;
bool repeat;
+ bool multi_session;
+ u32 done_count;
+ u32 connected_counter;
+ u32 worker_index;
+ u32 max_sessions;
+ u32 private_segment_size;
+ u32 prealloc_fifos;
+ u32 fifo_size;
+ u8 *appns_id;
+ u64 appns_secret;
+ clib_spinlock_t lock;
+ bool was_transport_closed;
} hc_main_t;
typedef enum
@@ -95,13 +110,6 @@ hc_session_get (u32 session_index, u32 thread_index)
return pool_elt_at_index (wrk->sessions, session_index);
}
-static void
-hc_ho_session_free (u32 hs_index)
-{
- hc_worker_t *wrk = hc_worker_get (0);
- pool_put_index (wrk->sessions, hs_index);
-}
-
static hc_session_t *
hc_session_alloc (hc_worker_t *wrk)
{
@@ -115,14 +123,14 @@ hc_session_alloc (hc_worker_t *wrk)
}
static int
-hc_request (session_t *s, session_error_t err)
+hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session,
+ session_error_t err)
{
hc_main_t *hcm = &hc_main;
u64 to_send;
u32 n_enq;
u8 n_segs;
int rv;
- hc_worker_t *wrk = hc_worker_get (s->thread_index);
if (hcm->use_ptr)
{
@@ -166,7 +174,7 @@ hc_request (session_t *s, session_error_t err)
rv = svm_fifo_enqueue (s->tx_fifo, n_enq, hcm->data);
if (rv < to_send)
{
- hcm->data_offset = (rv > 0) ? rv : 0;
+ hc_session->data_offset = (rv > 0) ? rv : 0;
svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
}
}
@@ -185,9 +193,8 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index,
{
hc_main_t *hcm = &hc_main;
hc_worker_t *wrk;
- u32 new_hc_index;
+ hc_session_t *hc_session;
hc_http_header_t *header;
- HTTP_DBG (1, "ho hc_index: %d", hc_session_index);
if (err)
{
@@ -199,68 +206,89 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index,
}
wrk = hc_worker_get (s->thread_index);
- hc_session_t *hc_session, *new_hc_session = hc_session_alloc (wrk);
- hc_session = hc_session_get (hc_session_index, 0);
- new_hc_index = new_hc_session->session_index;
- clib_memcpy_fast (new_hc_session, hc_session, sizeof (*hc_session));
- new_hc_session->session_index = new_hc_index;
- new_hc_session->thread_index = s->thread_index;
- new_hc_session->vpp_session_index = s->session_index;
- HTTP_DBG (1, "new hc_index: %d", new_hc_session->session_index);
- s->opaque = new_hc_index;
+ hc_session = hc_session_alloc (wrk);
+ clib_spinlock_lock_if_init (&hcm->lock);
+ hcm->connected_counter++;
+ clib_spinlock_unlock_if_init (&hcm->lock);
- if (hcm->req_method == HTTP_REQ_POST)
+ hc_session->thread_index = s->thread_index;
+ s->opaque = hc_session->session_index;
+ wrk->session_index = hc_session->session_index;
+
+ if (hcm->multi_session)
{
- if (hcm->is_file)
- http_add_header (
- &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
- http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
- else
- http_add_header (
- &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
- http_content_type_token (HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED));
+ hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions;
+ clib_spinlock_lock_if_init (&hcm->lock);
+ /* add remaining requests to the first connected session */
+ if (hcm->connected_counter == 1)
+ {
+ hc_session->stats.req_per_wrk +=
+ hcm->repeat_count % hcm->max_sessions;
+ }
+ clib_spinlock_unlock_if_init (&hcm->lock);
}
- http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1);
-
- vec_foreach (header, hcm->custom_header)
- http_add_custom_header (
- &wrk->req_headers, (const char *) header->name, vec_len (header->name),
- (const char *) header->value, vec_len (header->value));
-
- clib_warning ("%U", format_http_bytes, wrk->headers_buf,
- wrk->req_headers.tail_offset);
- wrk->msg.method_type = hcm->req_method;
- if (hcm->req_method == HTTP_REQ_POST)
- wrk->msg.data.body_len = vec_len (hcm->data);
else
- wrk->msg.data.body_len = 0;
-
- wrk->msg.type = HTTP_MSG_REQUEST;
- /* request target */
- wrk->msg.data.target_path_len = vec_len (hcm->target);
- /* custom headers */
- wrk->msg.data.headers_len = wrk->req_headers.tail_offset;
- /* total length */
- wrk->msg.data.len = wrk->msg.data.target_path_len +
- wrk->msg.data.headers_len + wrk->msg.data.body_len;
-
- if (hcm->use_ptr)
{
- wrk->msg.data.type = HTTP_MSG_DATA_PTR;
+ hc_session->stats.req_per_wrk = hcm->repeat_count;
+ hcm->worker_index = s->thread_index;
}
- else
+
+ if (!wrk->has_common_headers)
{
- wrk->msg.data.type = HTTP_MSG_DATA_INLINE;
- wrk->msg.data.target_path_offset = 0;
- wrk->msg.data.headers_offset = wrk->msg.data.target_path_len;
- wrk->msg.data.body_offset =
- wrk->msg.data.headers_offset + wrk->msg.data.headers_len;
+ wrk->has_common_headers = true;
+ if (hcm->req_method == HTTP_REQ_POST)
+ {
+ if (hcm->is_file)
+ http_add_header (
+ &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
+ http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
+ else
+ http_add_header (&wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
+ http_content_type_token (
+ HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED));
+ }
+ http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1);
+
+ vec_foreach (header, hcm->custom_header)
+ http_add_custom_header (&wrk->req_headers, (const char *) header->name,
+ vec_len (header->name),
+ (const char *) header->value,
+ vec_len (header->value));
+
+ wrk->msg.method_type = hcm->req_method;
+ if (hcm->req_method == HTTP_REQ_POST)
+ wrk->msg.data.body_len = vec_len (hcm->data);
+ else
+ wrk->msg.data.body_len = 0;
+
+ wrk->msg.type = HTTP_MSG_REQUEST;
+ /* request target */
+ wrk->msg.data.target_path_len = vec_len (hcm->target);
+ /* custom headers */
+ wrk->msg.data.headers_len = wrk->req_headers.tail_offset;
+ /* total length */
+ wrk->msg.data.len = wrk->msg.data.target_path_len +
+ wrk->msg.data.headers_len + wrk->msg.data.body_len;
+
+ if (hcm->use_ptr)
+ {
+ wrk->msg.data.type = HTTP_MSG_DATA_PTR;
+ }
+ else
+ {
+ wrk->msg.data.type = HTTP_MSG_DATA_INLINE;
+ wrk->msg.data.target_path_offset = 0;
+ wrk->msg.data.headers_offset = wrk->msg.data.target_path_len;
+ wrk->msg.data.body_offset =
+ wrk->msg.data.headers_offset + wrk->msg.data.headers_len;
+ }
}
if (hcm->repeat)
- hc_stats.start = vlib_time_now (vlib_get_main_by_index (s->thread_index));
+ hc_session->stats.start =
+ vlib_time_now (vlib_get_main_by_index (s->thread_index));
- return hc_request (s, err);
+ return hc_request (s, wrk, hc_session, err);
}
static void
@@ -275,21 +303,38 @@ hc_session_disconnect_callback (session_t *s)
if ((rv = vnet_disconnect_session (a)))
clib_warning ("warning: disconnect returned: %U", format_session_error,
rv);
+ clib_spinlock_lock_if_init (&hcm->lock);
+ hcm->done_count++;
+ clib_spinlock_unlock_if_init (&hcm->lock);
}
static void
hc_session_transport_closed_callback (session_t *s)
{
hc_main_t *hcm = &hc_main;
- vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index,
- HC_TRANSPORT_CLOSED, 0);
-}
+ hc_worker_t *wrk = hc_worker_get (s->thread_index);
-static void
-hc_ho_cleanup_callback (session_t *s)
-{
- HTTP_DBG (1, "ho hc_index: %d:", s->opaque);
- hc_ho_session_free (s->opaque);
+ clib_spinlock_lock_if_init (&hcm->lock);
+ if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
+ {
+ hcm->was_transport_closed = true;
+ }
+
+ /* send an event when all sessions are closed */
+ if (hcm->done_count >= hcm->max_sessions)
+ {
+ if (hcm->was_transport_closed)
+ {
+ vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index,
+ HC_TRANSPORT_CLOSED, 0);
+ }
+ else
+ {
+ vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index,
+ HC_REPEAT_DONE, 0);
+ }
+ }
+ clib_spinlock_unlock_if_init (&hcm->lock);
}
static void
@@ -315,20 +360,23 @@ hc_rx_callback (session_t *s)
{
hc_main_t *hcm = &hc_main;
hc_worker_t *wrk = hc_worker_get (s->thread_index);
- hc_session_t *hc_session;
+ hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index);
http_msg_t msg;
int rv;
+ u32 max_deq;
session_error_t session_err = 0;
int send_err = 0;
- hc_session = hc_session_get (s->opaque, s->thread_index);
-
if (hc_session->is_closed)
{
clib_warning ("hc_session_index[%d] is closed", s->opaque);
return -1;
}
+ max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
+ if (PREDICT_FALSE (max_deq == 0))
+ goto done;
+
if (hc_session->to_recv == 0)
{
rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
@@ -344,17 +392,20 @@ hc_rx_callback (session_t *s)
if (msg.data.headers_len)
{
- hcm->response_status =
- format (0, "%U", format_http_status_code, msg.code);
+
+ if (!hcm->repeat)
+ hc_session->response_status =
+ format (0, "%U", format_http_status_code, msg.code);
+
svm_fifo_dequeue_drop (s->rx_fifo, msg.data.headers_offset);
- vec_validate (hcm->resp_headers, msg.data.headers_len - 1);
- vec_set_len (hcm->resp_headers, msg.data.headers_len);
+ vec_validate (hc_session->resp_headers, msg.data.headers_len - 1);
+ vec_set_len (hc_session->resp_headers, msg.data.headers_len);
rv = svm_fifo_dequeue (s->rx_fifo, msg.data.headers_len,
- hcm->resp_headers);
+ hc_session->resp_headers);
ASSERT (rv == msg.data.headers_len);
- HTTP_DBG (1, (char *) format (0, "%v", hcm->resp_headers));
+ HTTP_DBG (1, (char *) format (0, "%v", hc_session->resp_headers));
msg.data.body_offset -=
msg.data.headers_len + msg.data.headers_offset;
}
@@ -372,18 +423,18 @@ hc_rx_callback (session_t *s)
{
goto done;
}
- vec_validate (hcm->http_response, msg.data.body_len - 1);
- vec_reset_length (hcm->http_response);
+ vec_validate (hc_session->http_response, msg.data.body_len - 1);
+ vec_reset_length (hc_session->http_response);
}
- u32 max_deq = svm_fifo_max_dequeue (s->rx_fifo);
+ max_deq = svm_fifo_max_dequeue (s->rx_fifo);
if (!max_deq)
{
goto done;
}
u32 n_deq = clib_min (hc_session->to_recv, max_deq);
- u32 curr = vec_len (hcm->http_response);
- rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hcm->http_response + curr);
+ u32 curr = vec_len (hc_session->http_response);
+ rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hc_session->http_response + curr);
if (rv < 0)
{
clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv);
@@ -393,7 +444,7 @@ hc_rx_callback (session_t *s)
}
ASSERT (rv == n_deq);
- vec_set_len (hcm->http_response, curr + n_deq);
+ vec_set_len (hc_session->http_response, curr + n_deq);
ASSERT (hc_session->to_recv >= rv);
hc_session->to_recv -= rv;
@@ -402,20 +453,19 @@ done:
{
if (hcm->repeat)
{
- hc_stats.request_count++;
- hc_stats.end = vlib_time_now (wrk->vlib_main);
- hc_stats.elapsed_time = hc_stats.end - hc_stats.start;
+ hc_session->stats.request_count++;
+ hc_session->stats.end = vlib_time_now (wrk->vlib_main);
+ hc_session->stats.elapsed_time =
+ hc_session->stats.end - hc_session->stats.start;
- if (hc_stats.elapsed_time >= hcm->duration &&
- hc_stats.request_count >= hcm->repeat_count)
+ if (hc_session->stats.elapsed_time >= hcm->duration &&
+ hc_session->stats.request_count >= hc_session->stats.req_per_wrk)
{
- vlib_process_signal_event_mt (
- wrk->vlib_main, hcm->cli_node_index, HC_REPEAT_DONE, 0);
hc_session_disconnect_callback (s);
}
else
{
- send_err = hc_request (s, session_err);
+ send_err = hc_request (s, wrk, hc_session, session_err);
if (send_err)
clib_warning ("failed to send request, error %d", send_err);
}
@@ -434,11 +484,13 @@ static int
hc_tx_callback (session_t *s)
{
hc_main_t *hcm = &hc_main;
+ hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index);
u64 to_send;
int rv;
- to_send = vec_len (hcm->data) - hcm->data_offset;
- rv = svm_fifo_enqueue (s->tx_fifo, to_send, hcm->data + hcm->data_offset);
+ to_send = vec_len (hcm->data) - hc_session->data_offset;
+ rv = svm_fifo_enqueue (s->tx_fifo, to_send,
+ hcm->data + hc_session->data_offset);
if (rv <= 0)
{
@@ -448,7 +500,7 @@ hc_tx_callback (session_t *s)
if (rv < to_send)
{
- hcm->data_offset += rv;
+ hc_session->data_offset += rv;
svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
}
@@ -465,7 +517,6 @@ static session_cb_vft_t hc_session_cb_vft = {
.session_reset_callback = hc_session_reset_callback,
.builtin_app_rx_callback = hc_rx_callback,
.builtin_app_tx_callback = hc_tx_callback,
- .half_open_cleanup_callback = hc_ho_cleanup_callback,
};
static clib_error_t *
@@ -474,8 +525,12 @@ hc_attach ()
hc_main_t *hcm = &hc_main;
vnet_app_attach_args_t _a, *a = &_a;
u64 options[18];
+ u32 segment_size = 128 << 20;
int rv;
+ if (hcm->private_segment_size)
+ segment_size = hcm->private_segment_size;
+
clib_memset (a, 0, sizeof (*a));
clib_memset (options, 0, sizeof (options));
@@ -483,7 +538,19 @@ hc_attach ()
a->name = format (0, "http_client");
a->session_cb_vft = &hc_session_cb_vft;
a->options = options;
+ a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
+ a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
+ a->options[APP_OPTIONS_RX_FIFO_SIZE] =
+ hcm->fifo_size ? hcm->fifo_size : 8 << 10;
+ a->options[APP_OPTIONS_TX_FIFO_SIZE] =
+ hcm->fifo_size ? hcm->fifo_size : 32 << 10;
a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
+ a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos;
+ if (hcm->appns_id)
+ {
+ a->namespace_id = hcm->appns_id;
+ a->options[APP_OPTIONS_NAMESPACE_SECRET] = hcm->appns_secret;
+ }
if ((rv = vnet_application_attach (a)))
return clib_error_return (0, "attach returned: %U", format_session_error,
@@ -500,14 +567,19 @@ static int
hc_connect_rpc (void *rpc_args)
{
vnet_connect_args_t *a = rpc_args;
- int rv;
+ int rv = ~0;
+ hc_main_t *hcm = &hc_main;
- rv = vnet_connect (a);
- if (rv > 0)
- clib_warning (0, "connect returned: %U", format_session_error, rv);
+ for (u32 i = 0; i < hcm->max_sessions; i++)
+ {
+ rv = vnet_connect (a);
+ if (rv > 0)
+ clib_warning (0, "connect returned: %U", format_session_error, rv);
+ }
session_endpoint_free_ext_cfgs (&a->sep_ext);
vec_free (a);
+
return rv;
}
@@ -516,14 +588,10 @@ hc_connect ()
{
hc_main_t *hcm = &hc_main;
vnet_connect_args_t *a = 0;
- hc_worker_t *wrk;
- hc_session_t *hc_session;
transport_endpt_ext_cfg_t *ext_cfg;
transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 };
-
vec_validate (a, 0);
clib_memset (a, 0, sizeof (a[0]));
-
clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
a->app_index = hcm->app_index;
@@ -531,15 +599,41 @@ hc_connect ()
&a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (http_cfg));
clib_memcpy (ext_cfg->data, &http_cfg, sizeof (http_cfg));
- /* allocate http session on main thread */
- wrk = hc_worker_get (0);
- hc_session = hc_session_alloc (wrk);
- a->api_context = hc_session->session_index;
-
session_send_rpc_evt_to_thread_force (transport_cl_thread (), hc_connect_rpc,
a);
}
+static void
+hc_get_repeat_stats (vlib_main_t *vm)
+{
+ hc_main_t *hcm = &hc_main;
+ hc_worker_t *wrk;
+ hc_session_t *hc_session;
+
+ if (hcm->repeat)
+ {
+ vec_foreach (wrk, hcm->wrk)
+ {
+ vec_foreach (hc_session, wrk->sessions)
+ {
+ hc_stats.request_count += hc_session->stats.request_count;
+ hc_session->stats.request_count = 0;
+ if (hc_stats.elapsed_time < hc_session->stats.elapsed_time)
+ {
+ hc_stats.elapsed_time = hc_session->stats.elapsed_time;
+ hc_session->stats.elapsed_time = 0;
+ }
+ }
+ }
+ vlib_cli_output (vm,
+ "< %d request(s) in %.6fs\n< avg latency "
+ "%.4fms\n< %.2f req/sec",
+ hc_stats.request_count, hc_stats.elapsed_time,
+ (hc_stats.elapsed_time / hc_stats.request_count) * 1000,
+ hc_stats.request_count / hc_stats.elapsed_time);
+ }
+}
+
static clib_error_t *
hc_get_event (vlib_main_t *vm)
{
@@ -548,6 +642,8 @@ hc_get_event (vlib_main_t *vm)
clib_error_t *err = NULL;
FILE *file_ptr;
u64 event_timeout;
+ hc_worker_t *wrk;
+ hc_session_t *hc_session;
event_timeout = hcm->timeout ? hcm->timeout : 10;
if (event_timeout == hcm->duration)
@@ -558,20 +654,26 @@ hc_get_event (vlib_main_t *vm)
switch (event_type)
{
case ~0:
+ hc_get_repeat_stats (vm);
err = clib_error_return (0, "error: timeout");
break;
case HC_CONNECT_FAILED:
+ hc_get_repeat_stats (vm);
err = clib_error_return (0, "error: failed to connect");
break;
case HC_TRANSPORT_CLOSED:
+ hc_get_repeat_stats (vm);
err = clib_error_return (0, "error: transport closed");
break;
case HC_GENERIC_ERR:
+ hc_get_repeat_stats (vm);
err = clib_error_return (0, "error: unknown");
break;
case HC_REPLY_RECEIVED:
if (hcm->filename)
{
+ wrk = hc_worker_get (hcm->worker_index);
+ hc_session = hc_session_get (wrk->session_index, wrk->thread_index);
file_ptr =
fopen ((char *) format (0, "/tmp/%v", hcm->filename), "a");
if (file_ptr == NULL)
@@ -580,26 +682,27 @@ hc_get_event (vlib_main_t *vm)
}
else
{
- fprintf (file_ptr, "< %s\n< %s\n< %s", hcm->response_status,
- hcm->resp_headers, hcm->http_response);
+ fprintf (file_ptr, "< %s\n< %s\n< %s",
+ hc_session->response_status, hc_session->resp_headers,
+ hc_session->http_response);
fclose (file_ptr);
vlib_cli_output (vm, "file saved (/tmp/%v)", hcm->filename);
}
}
if (hcm->verbose)
- vlib_cli_output (vm, "< %v< %v", hcm->response_status,
- hcm->resp_headers);
- vlib_cli_output (vm, "\n%v\n", hcm->http_response);
+ {
+ wrk = hc_worker_get (hcm->worker_index);
+ hc_session = hc_session_get (wrk->session_index, wrk->thread_index);
+ vlib_cli_output (vm, "< %v< %v", hc_session->response_status,
+ hc_session->resp_headers);
+ vlib_cli_output (vm, "\n%v\n", hc_session->http_response);
+ }
break;
case HC_REPEAT_DONE:
- vlib_cli_output (vm,
- "< %d request(s) in %.6fs\n< avg latency "
- "%.4fms\n< %.2f req/sec",
- hc_stats.request_count, hc_stats.elapsed_time,
- (hc_stats.elapsed_time / hc_stats.request_count) * 1000,
- hc_stats.request_count / hc_stats.elapsed_time);
+ hc_get_repeat_stats (vm);
break;
default:
+ hc_get_repeat_stats (vm);
err = clib_error_return (0, "error: unexpected event %d", event_type);
break;
}
@@ -612,15 +715,17 @@ static clib_error_t *
hc_run (vlib_main_t *vm)
{
hc_main_t *hcm = &hc_main;
- vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads;
hc_worker_t *wrk;
clib_error_t *err;
- num_threads = 1 /* main thread */ + vtm->n_threads;
+ num_threads = 1 /* main thread */ + vlib_num_workers ();
+ if (vlib_num_workers ())
+ clib_spinlock_init (&hcm->lock);
vec_validate (hcm->wrk, num_threads - 1);
vec_foreach (wrk, hcm->wrk)
{
+ wrk->has_common_headers = false;
wrk->thread_index = wrk - hcm->wrk;
/* 4k for headers should be enough */
vec_validate (wrk->headers_buf, 4095);
@@ -657,10 +762,18 @@ hc_detach ()
}
static void
-hcc_worker_cleanup (hc_worker_t *wrk)
+hc_worker_cleanup (hc_worker_t *wrk)
{
- HTTP_DBG (1, "worker cleanup");
+ hc_session_t *hc_session;
+ HTTP_DBG (1, "worker and worker sessions cleanup");
+
vec_free (wrk->headers_buf);
+ vec_foreach (hc_session, wrk->sessions)
+ {
+ vec_free (hc_session->resp_headers);
+ vec_free (hc_session->http_response);
+ vec_free (hc_session->response_status);
+ }
pool_free (wrk->sessions);
}
@@ -673,16 +786,14 @@ hc_cleanup ()
hc_http_header_t *header;
vec_foreach (wrk, hcm->wrk)
- hcc_worker_cleanup (wrk);
+ hc_worker_cleanup (wrk);
vec_free (hcm->uri);
vec_free (hcm->target);
vec_free (hcm->data);
- vec_free (hcm->resp_headers);
- vec_free (hcm->http_response);
- vec_free (hcm->response_status);
vec_free (hcm->wrk);
vec_free (hcm->filename);
+ vec_free (hcm->appns_id);
vec_foreach (header, hcm->custom_header)
{
vec_free (header->name);
@@ -698,6 +809,8 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
hc_main_t *hcm = &hc_main;
clib_error_t *err = 0;
unformat_input_t _line_input, *line_input = &_line_input;
+ u64 mem_size;
+ u8 *appns_id = 0;
u8 *path = 0;
u8 *file_data;
hc_http_header_t new_header;
@@ -708,7 +821,16 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
hcm->repeat_count = 0;
hcm->duration = 0;
hcm->repeat = false;
+ hcm->multi_session = false;
+ hcm->done_count = 0;
+ hcm->connected_counter = 0;
+ hcm->max_sessions = 1;
+ hcm->prealloc_fifos = 0;
+ hcm->private_segment_size = 0;
+ hcm->fifo_size = 0;
+ hcm->was_transport_closed = false;
hc_stats.request_count = 0;
+ hc_stats.elapsed_time = 0;
if (hcm->attached)
return clib_error_return (0, "failed: already running!");
@@ -761,6 +883,29 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
}
else if (unformat (line_input, "duration %f", &hcm->duration))
hcm->repeat = true;
+ else if (unformat (line_input, "sessions %d", &hcm->max_sessions))
+ {
+ hcm->multi_session = true;
+ if (hcm->max_sessions <= 1)
+ {
+ err = clib_error_return (0, "sessions must be > 1");
+ goto done;
+ }
+ }
+ else if (unformat (line_input, "prealloc-fifos %d",
+ &hcm->prealloc_fifos))
+ ;
+ else if (unformat (line_input, "private-segment-size %U",
+ unformat_memory_size, &mem_size))
+ hcm->private_segment_size = mem_size;
+ else if (unformat (line_input, "fifo-size %U", unformat_memory_size,
+ &mem_size))
+ hcm->fifo_size = mem_size;
+ else if (unformat (line_input, "appns %_%v%_", &appns_id))
+ ;
+ else if (unformat (line_input, "secret %lu", &hcm->appns_secret))
+ ;
+
else
{
err = clib_error_return (0, "unknown input `%U'",
@@ -801,6 +946,13 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
goto done;
}
+ if (hcm->multi_session && !hcm->repeat)
+ {
+ err = clib_error_return (
+ 0, "multiple sessions are only supported with request repeating");
+ goto done;
+ }
+
if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep)))
{
err =
@@ -808,6 +960,12 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
goto done;
}
+ if (hcm->duration >= hcm->timeout)
+ {
+ hcm->timeout = hcm->duration + 10;
+ }
+ hcm->appns_id = appns_id;
+
if (hcm->repeat)
vlib_cli_output (vm, "Running, please wait...");
@@ -845,7 +1003,9 @@ VLIB_CLI_COMMAND (hc_command, static) = {
"[post] uri http://<ip-addr> target <origin-form> "
"[data <form-urlencoded> | file <file-path>] [use-ptr] "
"[save-to <filename>] [header <Key:Value>] [verbose] "
- "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>]",
+ "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>] "
+ "[sessions <# of sessions>] [appns <app-ns> secret <appns-secret>] "
+ "[fifo-size <nM|G>] [private-segment-size <nM|G>] [prealloc-fifos <n>]",
.function = hc_command_fn,
.is_mp_safe = 1,
};