From 9bb0762357015ca49ab8376308eda021d35d6f25 Mon Sep 17 00:00:00 2001 From: Matus Fabian Date: Wed, 4 Sep 2024 18:04:54 +0200 Subject: http: large POST handling Type: improvement Change-Id: I28b8e8ccbff6f97e669b0048011b187decbfc892 Signed-off-by: Matus Fabian --- extras/hs-test/http_test.go | 46 ++++++++++-- src/plugins/hs_apps/http_tps.c | 154 +++++++++++++++++++++++++++++---------- src/plugins/http/http.c | 71 ++++++++++++------ src/plugins/http/http_plugin.rst | 44 +++++++++-- 4 files changed, 242 insertions(+), 73 deletions(-) diff --git a/extras/hs-test/http_test.go b/extras/hs-test/http_test.go index 733ca46934d..bfd2d34483c 100644 --- a/extras/hs-test/http_test.go +++ b/extras/hs-test/http_test.go @@ -6,6 +6,7 @@ import ( "github.com/onsi/gomega/ghttp" "github.com/onsi/gomega/gmeasure" "io" + "math/rand" "net" "net/http" "net/http/httptrace" @@ -30,8 +31,8 @@ func init() { HttpInvalidContentLengthTest, HttpInvalidTargetSyntaxTest, HttpStaticPathTraversalTest, HttpUriDecodeTest, HttpHeadersTest, HttpStaticFileHandlerTest, HttpStaticFileHandlerDefaultMaxAgeTest, HttpClientTest, HttpClientErrRespTest, HttpClientPostFormTest, HttpClientPostFileTest, HttpClientPostFilePtrTest, AuthorityFormTargetTest, HttpRequestLineTest) - RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest, PromConcurrentConnectionsTest, - PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest) + RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest, + PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest) } const wwwRootPath = "/tmp/www_root" @@ -53,18 +54,51 @@ func httpDownloadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data in experiment.RecordValue("Download Speed", (float64(resp.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2)) } -func HttpTpsInterruptModeTest(s *NoTopoSuite) { - HttpTpsTest(s) +func HttpGetTpsInterruptModeTest(s *NoTopoSuite) { + HttpGetTpsTest(s) } -func HttpTpsTest(s *NoTopoSuite) { +func HttpGetTpsTest(s *NoTopoSuite) { vpp := s.GetContainerByName("vpp").VppInstance serverAddress := s.VppAddr() url := "http://" + serverAddress + ":8080/test_file_10M" vpp.Vppctl("http tps uri tcp://0.0.0.0/8080") - s.RunBenchmark("HTTP tps 10M", 10, 0, httpDownloadBenchmark, url) + s.RunBenchmark("HTTP tps download 10M", 10, 0, httpDownloadBenchmark, url) +} + +func httpUploadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data interface{}) { + url, isValid := data.(string) + s.AssertEqual(true, isValid) + body := make([]byte, 10485760) + _, err := rand.Read(body) + client := NewHttpClient() + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + s.AssertNil(err, fmt.Sprint(err)) + t := time.Now() + resp, err := client.Do(req) + s.AssertNil(err, fmt.Sprint(err)) + defer resp.Body.Close() + s.AssertHttpStatus(resp, 200) + _, err = io.ReadAll(resp.Body) + s.AssertNil(err, fmt.Sprint(err)) + duration := time.Since(t) + experiment.RecordValue("Upload Speed", (float64(req.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2)) +} + +func HttpPostTpsInterruptModeTest(s *NoTopoSuite) { + HttpPostTpsTest(s) +} + +func HttpPostTpsTest(s *NoTopoSuite) { + vpp := s.GetContainerByName("vpp").VppInstance + serverAddress := s.VppAddr() + url := "http://" + serverAddress + ":8080/test_file_10M" + + vpp.Vppctl("http tps uri tcp://0.0.0.0/8080") + + s.RunBenchmark("HTTP tps upload 10M", 10, 0, httpUploadBenchmark, url) } func HttpPersistentConnectionTest(s *NoTopoSuite) { diff --git a/src/plugins/hs_apps/http_tps.c b/src/plugins/hs_apps/http_tps.c index 35a5802f476..39046b38d43 100644 --- a/src/plugins/hs_apps/http_tps.c +++ b/src/plugins/hs_apps/http_tps.c @@ -20,6 +20,8 @@ #include #include +#define HTS_RX_BUF_SIZE (64 << 10) + typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -28,6 +30,7 @@ typedef struct u64 data_len; u64 data_offset; u32 vpp_session_index; + u32 to_recv; union { /** threshold after which connection is closed */ @@ -36,6 +39,7 @@ typedef struct u32 close_rate; }; u8 *uri; + u8 *rx_buf; http_header_t *resp_headers; } hts_session_t; @@ -105,6 +109,8 @@ hts_session_free (hts_session_t *hs) if (htm->debug_level > 0) clib_warning ("Freeing session %u", hs->session_index); + vec_free (hs->rx_buf); + if (CLIB_DEBUG) clib_memset (hs, 0xfa, sizeof (*hs)); @@ -227,6 +233,8 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status) http_msg_t msg; session_t *ts; u8 *headers_buf = 0; + u32 n_segs = 1; + svm_fifo_seg_t seg[2]; int rv; if (vec_len (hs->resp_headers)) @@ -235,6 +243,9 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status) vec_free (hs->resp_headers); msg.data.headers_offset = 0; msg.data.headers_len = vec_len (headers_buf); + seg[1].data = headers_buf; + seg[1].len = msg.data.headers_len; + n_segs = 2; } else { @@ -248,17 +259,14 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status) msg.data.body_len = hs->data_len; msg.data.body_offset = msg.data.headers_len; msg.data.len = msg.data.body_len + msg.data.headers_len; + seg[0].data = (u8 *) &msg; + seg[0].len = sizeof (msg); ts = session_get (hs->vpp_session_index, hs->thread_index); - rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg); - ASSERT (rv == sizeof (msg)); - - if (msg.data.headers_len) - { - rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf); - ASSERT (rv == msg.data.headers_len); - vec_free (headers_buf); - } + rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs, + 0 /* allow partial */); + vec_free (headers_buf); + ASSERT (rv == (sizeof (msg) + msg.data.headers_len)); if (!msg.data.body_len) { @@ -323,6 +331,40 @@ done: return rc; } +static inline void +hts_session_rx_body (hts_session_t *hs, session_t *ts) +{ + hts_main_t *htm = &hts_main; + u32 n_deq; + int rv; + + n_deq = svm_fifo_max_dequeue (ts->rx_fifo); + if (!htm->no_zc) + { + svm_fifo_dequeue_drop_all (ts->rx_fifo); + } + else + { + n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE); + rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf); + ASSERT (rv == n_deq); + } + hs->to_recv -= n_deq; + + if (hs->close_threshold > 0) + { + if ((f64) (hs->data_len - hs->to_recv) / hs->data_len > + hs->close_threshold) + hts_disconnect_transport (hs); + } + + if (hs->to_recv == 0) + { + hts_start_send_data (hs, HTTP_STATUS_OK); + vec_free (hs->rx_buf); + } +} + static int hts_ts_rx_callback (session_t *ts) { @@ -333,44 +375,77 @@ hts_ts_rx_callback (session_t *ts) int rv; hs = hts_session_get (ts->thread_index, ts->opaque); - hs->data_len = 0; - hs->resp_headers = 0; - /* Read the http message header */ - rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); - ASSERT (rv == sizeof (msg)); - - if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET) + if (hs->to_recv == 0) { - http_add_header (&hs->resp_headers, - http_header_name_token (HTTP_HEADER_ALLOW), - http_token_lit ("GET")); - hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED); - goto done; - } + hs->data_len = 0; + hs->resp_headers = 0; + hs->rx_buf = 0; - if (msg.data.target_path_len == 0 || - msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) - { - hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); - goto done; - } + /* Read the http message header */ + rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); - vec_validate (target, msg.data.target_path_len - 1); - rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset, - msg.data.target_path_len, target); - ASSERT (rv == msg.data.target_path_len); + if (msg.type != HTTP_MSG_REQUEST) + { + hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR); + goto done; + } + if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST) + { + http_add_header (&hs->resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("GET, POST")); + hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED); + goto done; + } - if (htm->debug_level) - clib_warning ("Request target: %v", target); + if (msg.data.target_path_len == 0 || + msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) + { + hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); + goto done; + } - if (try_test_file (hs, target)) - hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND); + vec_validate (target, msg.data.target_path_len - 1); + rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset, + msg.data.target_path_len, target); + ASSERT (rv == msg.data.target_path_len); - vec_free (target); + if (htm->debug_level) + clib_warning ("%s request target: %v", + msg.method_type == HTTP_REQ_GET ? "GET" : "POST", + target); + + if (msg.method_type == HTTP_REQ_GET) + { + if (try_test_file (hs, target)) + hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND); + vec_free (target); + } + else + { + vec_free (target); + if (!msg.data.body_len) + { + hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); + goto done; + } + /* drop everything up to body */ + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset); + hs->to_recv = msg.data.body_len; + if (htm->no_zc) + vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1); + hts_session_rx_body (hs, ts); + return 0; + } + + done: + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len); + } + else + hts_session_rx_body (hs, ts); -done: - svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len); return 0; } @@ -397,6 +472,7 @@ hts_ts_accept_callback (session_t *ts) hs = hts_session_alloc (ts->thread_index); hs->vpp_session_index = ts->session_index; + hs->to_recv = 0; ts->opaque = hs->session_index; ts->session_state = SESSION_STATE_READY; diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index a32f6f6cb84..d01ee7f0fc6 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -527,13 +527,18 @@ v_find_index (u8 *vec, u32 offset, u32 num, char *str) static void http_identify_optional_query (http_conn_t *hc) { - u32 pos = vec_search (hc->rx_buf, '?'); - if (~0 != pos) + int i; + for (i = hc->target_path_offset; + i < (hc->target_path_offset + hc->target_path_len); i++) { - hc->target_query_offset = pos + 1; - hc->target_query_len = - hc->target_path_offset + hc->target_path_len - hc->target_query_offset; - hc->target_path_len = hc->target_path_len - hc->target_query_len - 1; + if (hc->rx_buf[i] == '?') + { + hc->target_query_offset = i + 1; + hc->target_query_len = hc->target_path_offset + hc->target_path_len - + hc->target_query_offset; + hc->target_path_len = hc->target_path_len - hc->target_query_len - 1; + break; + } } } @@ -674,7 +679,9 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec) } /* parse request-target */ + HTTP_DBG (0, "http at %d", i); target_len = i - hc->target_path_offset; + HTTP_DBG (0, "target_len %d", target_len); if (target_len < 1) { clib_warning ("request-target not present"); @@ -911,7 +918,7 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp) http_msg_t msg = {}; app_worker_t *app_wrk; session_t *as; - u32 len, max_enq; + u32 len, max_enq, body_sent; http_status_code_t ec; http_main_t *hm = &http_main; @@ -972,16 +979,16 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp) http_read_message_drop (hc, len); - if (hc->body_len == 0) + body_sent = len - hc->control_data_len; + hc->to_recv = hc->body_len - body_sent; + if (hc->to_recv == 0) { - /* no response body, we are done */ - hc->to_recv = 0; + /* all sent, we are done */ http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD); } else { - /* stream response body */ - hc->to_recv = hc->body_len; + /* stream rest of the response body */ http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA); } @@ -1006,7 +1013,7 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp) http_msg_t msg; session_t *as; int rv; - u32 len, max_enq; + u32 len, max_enq, max_deq, body_sent; rv = http_read_message (hc); @@ -1034,16 +1041,20 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp) if (rv) goto error; - /* send "control data" and request body */ + /* send at least "control data" which is necessary minimum, + * if there is some space send also portion of body */ as = session_get_from_handle (hc->h_pa_session_handle); - len = hc->control_data_len + hc->body_len; max_enq = svm_fifo_max_enqueue (as->rx_fifo); - if (max_enq < len) + if (max_enq < hc->control_data_len) { - /* TODO stream body of large POST */ - clib_warning ("not enough room for data in app's rx fifo"); + clib_warning ("not enough room for control data in app's rx fifo"); + ec = HTTP_STATUS_INTERNAL_ERROR; goto error; } + /* do not dequeue more than one HTTP request, we do not support pipelining */ + max_deq = + clib_min (hc->control_data_len + hc->body_len, vec_len (hc->rx_buf)); + len = clib_min (max_enq, max_deq); msg.type = HTTP_MSG_REQUEST; msg.method_type = hc->method; @@ -1065,9 +1076,21 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp) rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */); ASSERT (rv == (sizeof (msg) + len)); - /* drop everything, we do not support pipelining */ - http_read_message_drop_all (hc); - http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY); + body_sent = len - hc->control_data_len; + hc->to_recv = hc->body_len - body_sent; + if (hc->to_recv == 0) + { + /* drop everything, we do not support pipelining */ + http_read_message_drop_all (hc); + /* all sent, we are done */ + http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY); + } + else + { + http_read_message_drop (hc, len); + /* stream rest of the response body */ + http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA); + } app_wrk = app_worker_get_if_valid (as->app_wrk_index); if (app_wrk) @@ -1408,8 +1431,12 @@ http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp) hc->to_recv -= rv; HTTP_DBG (1, "drained %d from ts; remains %d", rv, hc->to_recv); + /* Finished transaction: + * server back to HTTP_STATE_WAIT_APP_REPLY + * client to HTTP_STATE_WAIT_APP_METHOD */ if (hc->to_recv == 0) - http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD); + http_state_change (hc, hc->is_server ? HTTP_STATE_WAIT_APP_REPLY : + HTTP_STATE_WAIT_APP_METHOD); app_wrk = app_worker_get_if_valid (as->app_wrk_index); if (app_wrk) diff --git a/src/plugins/http/http_plugin.rst b/src/plugins/http/http_plugin.rst index 4daef79b9cf..feb2c7fe039 100644 --- a/src/plugins/http/http_plugin.rst +++ b/src/plugins/http/http_plugin.rst @@ -144,16 +144,48 @@ Following example shows how to parse headers: vec_free (headers); } -Finally application reads body: +Finally application reads body (if any), which might be received in multiple pieces (depends on size), so we might need some state machine in ``builtin_app_rx_callback``. +We will add following members to our session context structure: + +.. code-block:: C + + typedef struct + { + /* ... */ + u32 to_recv; + u8 *resp_body; + } session_ctx_t; + +First we prepare vector for response body, do it only once when you are reading metadata: + +.. code-block:: C + + /* drop everything up to body */ + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset); + ctx->to_recv = msg.data.body_len; + /* prepare vector for response body */ + vec_validate (ctx->resp_body, msg.data.body_len - 1); + vec_reset_length (ctx->resp_body); + +Now we can start reading body content, following block of code could be executed multiple times: .. code-block:: C - u8 *body = 0; - if (msg.data.body_len) + /* dequeue */ + u32 n_deq = svm_fifo_max_dequeue (ts->rx_fifo); + /* current offset */ + u32 curr = vec_len (ctx->resp_body); + rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr); + ASSERT (rv == n_deq); + /* update length of the vector */ + vec_set_len (ctx->resp_body, curr + n_deq); + /* update number of remaining bytes to receive */ + ctx->to_recv -= rv; + /* check if all data received */ + if (ctx->to_recv == 0) { - vec_validate (body, msg.data.body_len - 1); - rv = svm_fifo_peek (ts->rx_fifo, msg.data.body_offset, msg.data.body_len, body); - ASSERT (rv == msg.data.body_len); + /* we are done */ + /* send 200 OK response */ } Sending data -- cgit 1.2.3-korg