summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2024-09-04 18:04:54 +0200
committerFlorin Coras <florin.coras@gmail.com>2024-09-08 22:41:27 +0000
commit9bb0762357015ca49ab8376308eda021d35d6f25 (patch)
treeb7b7e3c0b4e53f6daf88e780f1e66bd16b1b0282
parentc4b5d10115d4370488ac14eb0ba7295b049a0615 (diff)
http: large POST handling
Type: improvement Change-Id: I28b8e8ccbff6f97e669b0048011b187decbfc892 Signed-off-by: Matus Fabian <matfabia@cisco.com>
-rw-r--r--extras/hs-test/http_test.go46
-rw-r--r--src/plugins/hs_apps/http_tps.c154
-rw-r--r--src/plugins/http/http.c71
-rw-r--r--src/plugins/http/http_plugin.rst44
4 files changed, 242 insertions, 73 deletions
diff --git a/extras/hs-test/http_test.go b/extras/hs-test/http_test.go
index 733ca46934d..bfd2d34483c 100644
--- a/extras/hs-test/http_test.go
+++ b/extras/hs-test/http_test.go
@@ -6,6 +6,7 @@ import (
"github.com/onsi/gomega/ghttp"
"github.com/onsi/gomega/gmeasure"
"io"
+ "math/rand"
"net"
"net/http"
"net/http/httptrace"
@@ -30,8 +31,8 @@ func init() {
HttpInvalidContentLengthTest, HttpInvalidTargetSyntaxTest, HttpStaticPathTraversalTest, HttpUriDecodeTest,
HttpHeadersTest, HttpStaticFileHandlerTest, HttpStaticFileHandlerDefaultMaxAgeTest, HttpClientTest, HttpClientErrRespTest, HttpClientPostFormTest,
HttpClientPostFileTest, HttpClientPostFilePtrTest, AuthorityFormTargetTest, HttpRequestLineTest)
- RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest, PromConcurrentConnectionsTest,
- PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest)
+ RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest,
+ PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest)
}
const wwwRootPath = "/tmp/www_root"
@@ -53,18 +54,51 @@ func httpDownloadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data in
experiment.RecordValue("Download Speed", (float64(resp.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
}
-func HttpTpsInterruptModeTest(s *NoTopoSuite) {
- HttpTpsTest(s)
+func HttpGetTpsInterruptModeTest(s *NoTopoSuite) {
+ HttpGetTpsTest(s)
}
-func HttpTpsTest(s *NoTopoSuite) {
+func HttpGetTpsTest(s *NoTopoSuite) {
vpp := s.GetContainerByName("vpp").VppInstance
serverAddress := s.VppAddr()
url := "http://" + serverAddress + ":8080/test_file_10M"
vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
- s.RunBenchmark("HTTP tps 10M", 10, 0, httpDownloadBenchmark, url)
+ s.RunBenchmark("HTTP tps download 10M", 10, 0, httpDownloadBenchmark, url)
+}
+
+func httpUploadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data interface{}) {
+ url, isValid := data.(string)
+ s.AssertEqual(true, isValid)
+ body := make([]byte, 10485760)
+ _, err := rand.Read(body)
+ client := NewHttpClient()
+ req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
+ s.AssertNil(err, fmt.Sprint(err))
+ t := time.Now()
+ resp, err := client.Do(req)
+ s.AssertNil(err, fmt.Sprint(err))
+ defer resp.Body.Close()
+ s.AssertHttpStatus(resp, 200)
+ _, err = io.ReadAll(resp.Body)
+ s.AssertNil(err, fmt.Sprint(err))
+ duration := time.Since(t)
+ experiment.RecordValue("Upload Speed", (float64(req.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
+}
+
+func HttpPostTpsInterruptModeTest(s *NoTopoSuite) {
+ HttpPostTpsTest(s)
+}
+
+func HttpPostTpsTest(s *NoTopoSuite) {
+ vpp := s.GetContainerByName("vpp").VppInstance
+ serverAddress := s.VppAddr()
+ url := "http://" + serverAddress + ":8080/test_file_10M"
+
+ vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
+
+ s.RunBenchmark("HTTP tps upload 10M", 10, 0, httpUploadBenchmark, url)
}
func HttpPersistentConnectionTest(s *NoTopoSuite) {
diff --git a/src/plugins/hs_apps/http_tps.c b/src/plugins/hs_apps/http_tps.c
index 35a5802f476..39046b38d43 100644
--- a/src/plugins/hs_apps/http_tps.c
+++ b/src/plugins/hs_apps/http_tps.c
@@ -20,6 +20,8 @@
#include <http/http_header_names.h>
#include <http/http_content_types.h>
+#define HTS_RX_BUF_SIZE (64 << 10)
+
typedef struct
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -28,6 +30,7 @@ typedef struct
u64 data_len;
u64 data_offset;
u32 vpp_session_index;
+ u32 to_recv;
union
{
/** threshold after which connection is closed */
@@ -36,6 +39,7 @@ typedef struct
u32 close_rate;
};
u8 *uri;
+ u8 *rx_buf;
http_header_t *resp_headers;
} hts_session_t;
@@ -105,6 +109,8 @@ hts_session_free (hts_session_t *hs)
if (htm->debug_level > 0)
clib_warning ("Freeing session %u", hs->session_index);
+ vec_free (hs->rx_buf);
+
if (CLIB_DEBUG)
clib_memset (hs, 0xfa, sizeof (*hs));
@@ -227,6 +233,8 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
http_msg_t msg;
session_t *ts;
u8 *headers_buf = 0;
+ u32 n_segs = 1;
+ svm_fifo_seg_t seg[2];
int rv;
if (vec_len (hs->resp_headers))
@@ -235,6 +243,9 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
vec_free (hs->resp_headers);
msg.data.headers_offset = 0;
msg.data.headers_len = vec_len (headers_buf);
+ seg[1].data = headers_buf;
+ seg[1].len = msg.data.headers_len;
+ n_segs = 2;
}
else
{
@@ -248,17 +259,14 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
msg.data.body_len = hs->data_len;
msg.data.body_offset = msg.data.headers_len;
msg.data.len = msg.data.body_len + msg.data.headers_len;
+ seg[0].data = (u8 *) &msg;
+ seg[0].len = sizeof (msg);
ts = session_get (hs->vpp_session_index, hs->thread_index);
- rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg);
- ASSERT (rv == sizeof (msg));
-
- if (msg.data.headers_len)
- {
- rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf);
- ASSERT (rv == msg.data.headers_len);
- vec_free (headers_buf);
- }
+ rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
+ 0 /* allow partial */);
+ vec_free (headers_buf);
+ ASSERT (rv == (sizeof (msg) + msg.data.headers_len));
if (!msg.data.body_len)
{
@@ -323,6 +331,40 @@ done:
return rc;
}
+static inline void
+hts_session_rx_body (hts_session_t *hs, session_t *ts)
+{
+ hts_main_t *htm = &hts_main;
+ u32 n_deq;
+ int rv;
+
+ n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+ if (!htm->no_zc)
+ {
+ svm_fifo_dequeue_drop_all (ts->rx_fifo);
+ }
+ else
+ {
+ n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE);
+ rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf);
+ ASSERT (rv == n_deq);
+ }
+ hs->to_recv -= n_deq;
+
+ if (hs->close_threshold > 0)
+ {
+ if ((f64) (hs->data_len - hs->to_recv) / hs->data_len >
+ hs->close_threshold)
+ hts_disconnect_transport (hs);
+ }
+
+ if (hs->to_recv == 0)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_OK);
+ vec_free (hs->rx_buf);
+ }
+}
+
static int
hts_ts_rx_callback (session_t *ts)
{
@@ -333,44 +375,77 @@ hts_ts_rx_callback (session_t *ts)
int rv;
hs = hts_session_get (ts->thread_index, ts->opaque);
- hs->data_len = 0;
- hs->resp_headers = 0;
- /* Read the http message header */
- rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
- ASSERT (rv == sizeof (msg));
-
- if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET)
+ if (hs->to_recv == 0)
{
- http_add_header (&hs->resp_headers,
- http_header_name_token (HTTP_HEADER_ALLOW),
- http_token_lit ("GET"));
- hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
- goto done;
- }
+ hs->data_len = 0;
+ hs->resp_headers = 0;
+ hs->rx_buf = 0;
- if (msg.data.target_path_len == 0 ||
- msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
- {
- hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
- goto done;
- }
+ /* Read the http message header */
+ rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
- vec_validate (target, msg.data.target_path_len - 1);
- rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
- msg.data.target_path_len, target);
- ASSERT (rv == msg.data.target_path_len);
+ if (msg.type != HTTP_MSG_REQUEST)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR);
+ goto done;
+ }
+ if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST)
+ {
+ http_add_header (&hs->resp_headers,
+ http_header_name_token (HTTP_HEADER_ALLOW),
+ http_token_lit ("GET, POST"));
+ hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
+ goto done;
+ }
- if (htm->debug_level)
- clib_warning ("Request target: %v", target);
+ if (msg.data.target_path_len == 0 ||
+ msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+ goto done;
+ }
- if (try_test_file (hs, target))
- hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+ vec_validate (target, msg.data.target_path_len - 1);
+ rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
+ msg.data.target_path_len, target);
+ ASSERT (rv == msg.data.target_path_len);
- vec_free (target);
+ if (htm->debug_level)
+ clib_warning ("%s request target: %v",
+ msg.method_type == HTTP_REQ_GET ? "GET" : "POST",
+ target);
+
+ if (msg.method_type == HTTP_REQ_GET)
+ {
+ if (try_test_file (hs, target))
+ hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+ vec_free (target);
+ }
+ else
+ {
+ vec_free (target);
+ if (!msg.data.body_len)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+ goto done;
+ }
+ /* drop everything up to body */
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+ hs->to_recv = msg.data.body_len;
+ if (htm->no_zc)
+ vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1);
+ hts_session_rx_body (hs, ts);
+ return 0;
+ }
+
+ done:
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
+ }
+ else
+ hts_session_rx_body (hs, ts);
-done:
- svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
return 0;
}
@@ -397,6 +472,7 @@ hts_ts_accept_callback (session_t *ts)
hs = hts_session_alloc (ts->thread_index);
hs->vpp_session_index = ts->session_index;
+ hs->to_recv = 0;
ts->opaque = hs->session_index;
ts->session_state = SESSION_STATE_READY;
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index a32f6f6cb84..d01ee7f0fc6 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -527,13 +527,18 @@ v_find_index (u8 *vec, u32 offset, u32 num, char *str)
static void
http_identify_optional_query (http_conn_t *hc)
{
- u32 pos = vec_search (hc->rx_buf, '?');
- if (~0 != pos)
+ int i;
+ for (i = hc->target_path_offset;
+ i < (hc->target_path_offset + hc->target_path_len); i++)
{
- hc->target_query_offset = pos + 1;
- hc->target_query_len =
- hc->target_path_offset + hc->target_path_len - hc->target_query_offset;
- hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
+ if (hc->rx_buf[i] == '?')
+ {
+ hc->target_query_offset = i + 1;
+ hc->target_query_len = hc->target_path_offset + hc->target_path_len -
+ hc->target_query_offset;
+ hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
+ break;
+ }
}
}
@@ -674,7 +679,9 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec)
}
/* parse request-target */
+ HTTP_DBG (0, "http at %d", i);
target_len = i - hc->target_path_offset;
+ HTTP_DBG (0, "target_len %d", target_len);
if (target_len < 1)
{
clib_warning ("request-target not present");
@@ -911,7 +918,7 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
http_msg_t msg = {};
app_worker_t *app_wrk;
session_t *as;
- u32 len, max_enq;
+ u32 len, max_enq, body_sent;
http_status_code_t ec;
http_main_t *hm = &http_main;
@@ -972,16 +979,16 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
http_read_message_drop (hc, len);
- if (hc->body_len == 0)
+ body_sent = len - hc->control_data_len;
+ hc->to_recv = hc->body_len - body_sent;
+ if (hc->to_recv == 0)
{
- /* no response body, we are done */
- hc->to_recv = 0;
+ /* all sent, we are done */
http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
}
else
{
- /* stream response body */
- hc->to_recv = hc->body_len;
+ /* stream rest of the response body */
http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
}
@@ -1006,7 +1013,7 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
http_msg_t msg;
session_t *as;
int rv;
- u32 len, max_enq;
+ u32 len, max_enq, max_deq, body_sent;
rv = http_read_message (hc);
@@ -1034,16 +1041,20 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
if (rv)
goto error;
- /* send "control data" and request body */
+ /* send at least "control data" which is necessary minimum,
+ * if there is some space send also portion of body */
as = session_get_from_handle (hc->h_pa_session_handle);
- len = hc->control_data_len + hc->body_len;
max_enq = svm_fifo_max_enqueue (as->rx_fifo);
- if (max_enq < len)
+ if (max_enq < hc->control_data_len)
{
- /* TODO stream body of large POST */
- clib_warning ("not enough room for data in app's rx fifo");
+ clib_warning ("not enough room for control data in app's rx fifo");
+ ec = HTTP_STATUS_INTERNAL_ERROR;
goto error;
}
+ /* do not dequeue more than one HTTP request, we do not support pipelining */
+ max_deq =
+ clib_min (hc->control_data_len + hc->body_len, vec_len (hc->rx_buf));
+ len = clib_min (max_enq, max_deq);
msg.type = HTTP_MSG_REQUEST;
msg.method_type = hc->method;
@@ -1065,9 +1076,21 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */);
ASSERT (rv == (sizeof (msg) + len));
- /* drop everything, we do not support pipelining */
- http_read_message_drop_all (hc);
- http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
+ body_sent = len - hc->control_data_len;
+ hc->to_recv = hc->body_len - body_sent;
+ if (hc->to_recv == 0)
+ {
+ /* drop everything, we do not support pipelining */
+ http_read_message_drop_all (hc);
+ /* all sent, we are done */
+ http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
+ }
+ else
+ {
+ http_read_message_drop (hc, len);
+ /* stream rest of the response body */
+ http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
+ }
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
if (app_wrk)
@@ -1408,8 +1431,12 @@ http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp)
hc->to_recv -= rv;
HTTP_DBG (1, "drained %d from ts; remains %d", rv, hc->to_recv);
+ /* Finished transaction:
+ * server back to HTTP_STATE_WAIT_APP_REPLY
+ * client to HTTP_STATE_WAIT_APP_METHOD */
if (hc->to_recv == 0)
- http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
+ http_state_change (hc, hc->is_server ? HTTP_STATE_WAIT_APP_REPLY :
+ HTTP_STATE_WAIT_APP_METHOD);
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
if (app_wrk)
diff --git a/src/plugins/http/http_plugin.rst b/src/plugins/http/http_plugin.rst
index 4daef79b9cf..feb2c7fe039 100644
--- a/src/plugins/http/http_plugin.rst
+++ b/src/plugins/http/http_plugin.rst
@@ -144,16 +144,48 @@ Following example shows how to parse headers:
vec_free (headers);
}
-Finally application reads body:
+Finally application reads body (if any), which might be received in multiple pieces (depends on size), so we might need some state machine in ``builtin_app_rx_callback``.
+We will add following members to our session context structure:
+
+.. code-block:: C
+
+ typedef struct
+ {
+ /* ... */
+ u32 to_recv;
+ u8 *resp_body;
+ } session_ctx_t;
+
+First we prepare vector for response body, do it only once when you are reading metadata:
+
+.. code-block:: C
+
+ /* drop everything up to body */
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+ ctx->to_recv = msg.data.body_len;
+ /* prepare vector for response body */
+ vec_validate (ctx->resp_body, msg.data.body_len - 1);
+ vec_reset_length (ctx->resp_body);
+
+Now we can start reading body content, following block of code could be executed multiple times:
.. code-block:: C
- u8 *body = 0;
- if (msg.data.body_len)
+ /* dequeue */
+ u32 n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+ /* current offset */
+ u32 curr = vec_len (ctx->resp_body);
+ rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr);
+ ASSERT (rv == n_deq);
+ /* update length of the vector */
+ vec_set_len (ctx->resp_body, curr + n_deq);
+ /* update number of remaining bytes to receive */
+ ctx->to_recv -= rv;
+ /* check if all data received */
+ if (ctx->to_recv == 0)
{
- vec_validate (body, msg.data.body_len - 1);
- rv = svm_fifo_peek (ts->rx_fifo, msg.data.body_offset, msg.data.body_len, body);
- ASSERT (rv == msg.data.body_len);
+ /* we are done */
+ /* send 200 OK response */
}
Sending data