diff options
author | Matus Fabian <matfabia@cisco.com> | 2024-09-20 16:34:59 +0200 |
---|---|---|
committer | Florin Coras <florin.coras@gmail.com> | 2024-11-01 22:25:45 +0000 |
commit | afce28764522fcc6c94f9ebbbc580fcd1250b63d (patch) | |
tree | af00c7e4eab074dc320e3b74c9a45645cfce7e28 | |
parent | 7e70b1f0c48bda18905f37353317e20e766dd800 (diff) |
http: CONNECT method for tunnelling
Type: improvement
Change-Id: I6af16ddcc6734bb831227ce65cb39e87294fc4cd
Signed-off-by: Matus Fabian <matfabia@cisco.com>
-rw-r--r-- | extras/hs-test/proxy_test.go | 36 | ||||
-rw-r--r-- | src/plugins/hs_apps/proxy.c | 240 | ||||
-rw-r--r-- | src/plugins/hs_apps/proxy.h | 1 | ||||
-rw-r--r-- | src/plugins/http/http.c | 163 | ||||
-rw-r--r-- | src/plugins/http/http.h | 3 |
5 files changed, 368 insertions, 75 deletions
diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go index b914242d34f..5ca151f6228 100644 --- a/extras/hs-test/proxy_test.go +++ b/extras/hs-test/proxy_test.go @@ -7,7 +7,8 @@ import ( ) func init() { - RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest) + RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest, + VppConnectProxyGetTest, VppConnectProxyPutTest) RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest) RegisterNginxProxyTests(NginxMirroringTest) RegisterNginxProxySoloTests(MirrorMultiThreadTest) @@ -15,14 +16,11 @@ func init() { func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) { vppProxy := s.GetContainerByName(VppProxyContainerName).VppInstance - output := vppProxy.Vppctl( - "test proxy server server-uri %s://%s/%d client-uri tcp://%s/%d", - proto, - s.VppProxyAddr(), - proxyPort, - s.NginxAddr(), - s.NginxPort(), - ) + cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s/%d", proto, s.VppProxyAddr(), proxyPort) + if proto != "http" { + cmd += fmt.Sprintf(" client-uri tcp://%s/%d", s.NginxAddr(), s.NginxPort()) + } + output := vppProxy.Vppctl(cmd) s.Log("proxy configured: " + output) } @@ -83,3 +81,23 @@ func nginxMirroring(s *NginxProxySuite, multiThreadWorkers bool) { uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ProxyAddr(), s.ProxyPort()) s.CurlDownloadResource(uri) } + +func VppConnectProxyGetTest(s *VppProxySuite) { + var proxyPort uint16 = 8080 + + configureVppProxy(s, "http", proxyPort) + + targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.NginxAddr(), s.NginxPort()) + proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort) + s.CurlDownloadResourceViaTunnel(targetUri, proxyUri) +} + +func VppConnectProxyPutTest(s *VppProxySuite) { + var proxyPort uint16 = 8080 + + configureVppProxy(s, "http", proxyPort) + + proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort) + targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.NginxAddr(), s.NginxPort()) + s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile) +} diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index 999f8d15c20..0d24ebc6525 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -19,6 +19,8 @@ #include <vnet/session/application_interface.h> #include <hs_apps/proxy.h> #include <vnet/tcp/tcp.h> +#include <http/http.h> +#include <http/http_header_names.h> proxy_main_t proxy_main; @@ -50,6 +52,41 @@ proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index) } static void +proxy_send_http_resp (session_t *s, http_status_code_t sc, + http_header_t *resp_headers) +{ + http_msg_t msg; + int rv; + u8 *headers_buf = 0; + + if (vec_len (resp_headers)) + { + headers_buf = http_serialize_headers (resp_headers); + msg.data.len = msg.data.headers_len = vec_len (headers_buf); + } + else + msg.data.len = msg.data.headers_len = 0; + + msg.type = HTTP_MSG_REPLY; + msg.code = sc; + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.headers_offset = 0; + msg.data.body_len = 0; + msg.data.body_offset = 0; + rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + if (msg.data.headers_len) + { + rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf); + ASSERT (rv == vec_len (headers_buf)); + vec_free (headers_buf); + } + + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); +} + +static void proxy_do_connect (vnet_connect_args_t *a) { ASSERT (session_vlib_thread_is_cl_thread ()); @@ -387,6 +424,7 @@ proxy_accept_callback (session_t * s) proxy_session_side_ctx_t *sc; proxy_session_t *ps; proxy_worker_t *wrk; + transport_proto_t tp = session_get_transport_proto (s); wrk = proxy_worker_get (s->thread_index); sc = proxy_session_side_ctx_alloc (wrk); @@ -402,6 +440,7 @@ proxy_accept_callback (session_t * s) ps->ao.session_handle = SESSION_INVALID_HANDLE; sc->ps_index = ps->ps_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -450,6 +489,7 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s) proxy_main_t *pm = &proxy_main; u32 max_dequeue, ps_index; proxy_session_t *ps; + transport_proto_t tp = session_get_transport_proto (s); clib_spinlock_lock_if_init (&pm->sessions_lock); @@ -467,20 +507,79 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s) clib_spinlock_unlock_if_init (&pm->sessions_lock); - max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); - if (PREDICT_FALSE (max_dequeue == 0)) - return; + if (tp == TRANSPORT_PROTO_HTTP) + { + http_msg_t msg; + u8 *target_buf = 0; + http_uri_t target_uri; + http_header_t *resp_headers = 0; + session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL; + int rv; - max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); - actual_transfer = svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, - max_dequeue, pm->rx_buf[s->thread_index]); + rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); - /* Expectation is that here actual data just received is parsed and based - * on its contents, the destination and parameters of the connect to the - * upstream are decided - */ + if (msg.type != HTTP_MSG_REQUEST) + { + proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0); + return; + } + if (msg.method_type != HTTP_REQ_CONNECT) + { + http_add_header (&resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("CONNECT")); + proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED, + resp_headers); + vec_free (resp_headers); + return; + } + + if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM || + msg.data.target_path_len == 0) + { + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; + } + + /* read target uri */ + target_buf = vec_new (u8, msg.data.target_path_len); + rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset, + msg.data.target_path_len, target_buf); + ASSERT (rv == msg.data.target_path_len); + svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len); + rv = http_parse_authority_form_target (target_buf, &target_uri); + vec_free (target_buf); + if (rv) + { + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; + } + target_sep.is_ip4 = target_uri.is_ip4; + target_sep.ip = target_uri.ip; + target_sep.port = target_uri.port; + target_sep.transport_proto = TRANSPORT_PROTO_TCP; + clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep)); + } + else + { + max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (max_dequeue == 0)) + return; + + max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); + actual_transfer = + svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue, + pm->rx_buf[s->thread_index]); + + /* Expectation is that here actual data just received is parsed and based + * on its contents, the destination and parameters of the connect to the + * upstream are decided + */ + + clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); + } - clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); a->api_context = ps_index; a->app_index = pm->active_open_app_index; @@ -664,6 +763,8 @@ active_open_connected_callback (u32 app_index, u32 opaque, proxy_session_t *ps; proxy_worker_t *wrk; proxy_session_side_ctx_t *sc; + session_t *po_s; + transport_proto_t tp; /* Connection failed */ if (err) @@ -671,6 +772,12 @@ active_open_connected_callback (u32 app_index, u32 opaque, clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (opaque); + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0); + } ps->ao_disconnected = 1; proxy_session_close_po (ps); @@ -700,6 +807,9 @@ active_open_connected_callback (u32 app_index, u32 opaque, return -1; } + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + sc = proxy_session_side_ctx_alloc (wrk); sc->pair = ps->po; sc->ps_index = ps->ps_index; @@ -708,13 +818,21 @@ active_open_connected_callback (u32 app_index, u32 opaque, sc->state = PROXY_SC_S_ESTABLISHED; s->opaque = sc->sc_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; - /* - * Send event for active open tx fifo - */ - ASSERT (s->thread_index == vlib_get_thread_index ()); - if (svm_fifo_set_event (s->tx_fifo)) - session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX); + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0); + } + else + { + /* + * Send event for active open tx fifo + */ + ASSERT (s->thread_index == vlib_get_thread_index ()); + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX); + } return 0; } @@ -881,11 +999,21 @@ active_open_tx_callback (session_t * ao_s) if (sc->state < PROXY_SC_S_ESTABLISHED) return 0; - /* Force ack on proxy side to update rcv wnd */ - void *arg = uword_to_pointer (sc->pair.session_handle, void *); - session_send_rpc_evt_to_thread ( - session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, - arg); + if (sc->is_http) + { + /* notify HTTP transport */ + session_t *po = session_get_from_handle (sc->pair.session_handle); + session_send_io_evt_to_thread_custom ( + &po->session_index, po->thread_index, SESSION_IO_EVT_RX); + } + else + { + /* Force ack on proxy side to update rcv wnd */ + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); + } return 0; } @@ -1066,11 +1194,6 @@ proxy_server_create (vlib_main_t * vm) clib_warning ("failed to attach server app"); return -1; } - if (proxy_server_listen ()) - { - clib_warning ("failed to start listening"); - return -1; - } if (active_open_attach ()) { clib_warning ("failed to attach active open app"); @@ -1147,38 +1270,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, default_server_uri); server_uri = format (0, "%s%c", default_server_uri, 0); } - if (!client_uri) - { - clib_warning ("No client-uri provided, Using default: %s", - default_client_uri); - client_uri = format (0, "%s%c", default_client_uri, 0); - } - if (parse_uri ((char *) server_uri, &pm->server_sep)) { error = clib_error_return (0, "Invalid server uri %v", server_uri); goto done; } - if (parse_uri ((char *) client_uri, &pm->client_sep)) + + /* http proxy get target within request */ + if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP) { - error = clib_error_return (0, "Invalid client uri %v", client_uri); - goto done; + if (!client_uri) + { + clib_warning ("No client-uri provided, Using default: %s", + default_client_uri); + client_uri = format (0, "%s%c", default_client_uri, 0); + } + if (parse_uri ((char *) client_uri, &pm->client_sep)) + { + error = clib_error_return (0, "Invalid client uri %v", client_uri); + goto done; + } } - session_enable_disable_args_t args = { .is_en = 1, - .rt_engine_type = - RT_BACKEND_ENGINE_RULE_TABLE }; - vnet_session_enable_disable (vm, &args); - - rv = proxy_server_create (vm); - switch (rv) + if (pm->server_app_index == APP_INVALID_INDEX) { - case 0: - break; - default: - error = clib_error_return (0, "server_create returned %d", rv); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); + rv = proxy_server_create (vm); + if (rv) + { + error = clib_error_return (0, "server_create returned %d", rv); + goto done; + } } + if (proxy_server_listen ()) + error = clib_error_return (0, "failed to start listening"); + done: unformat_free (line_input); vec_free (client_uri); @@ -1186,14 +1316,13 @@ done: return error; } -VLIB_CLI_COMMAND (proxy_create_command, static) = -{ +VLIB_CLI_COMMAND (proxy_create_command, static) = { .path = "test proxy server", - .short_help = "test proxy server [server-uri <tcp://ip/port>]" - "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" - "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" - "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" - "[private-segment-size <mem>][private-segment-count <nn>]", + .short_help = "test proxy server [server-uri <proto://ip/port>]" + "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" + "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" + "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" + "[private-segment-size <mem>][private-segment-count <nn>]", .function = proxy_server_create_command_fn, }; @@ -1203,6 +1332,7 @@ proxy_main_init (vlib_main_t * vm) proxy_main_t *pm = &proxy_main; pm->server_client_index = ~0; pm->active_open_client_index = ~0; + pm->server_app_index = APP_INVALID_INDEX; return 0; } diff --git a/src/plugins/hs_apps/proxy.h b/src/plugins/hs_apps/proxy.h index 86db69c21ae..789e5613520 100644 --- a/src/plugins/hs_apps/proxy.h +++ b/src/plugins/hs_apps/proxy.h @@ -51,6 +51,7 @@ typedef struct proxy_session_side_ctx_ proxy_session_side_state_t state; u32 sc_index; u32 ps_index; + u8 is_http; } proxy_session_side_ctx_t; typedef struct diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index 1a92797c50b..6659de9689f 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -447,9 +447,9 @@ static const char *http_error_template = "HTTP/1.1 %s\r\n" */ static const char *http_response_template = "HTTP/1.1 %s\r\n" "Date: %U GMT\r\n" - "Server: %v\r\n" - "Content-Length: %llu\r\n" - "%s"; + "Server: %v\r\n"; + +static const char *content_len_template = "Content-Length: %llu\r\n"; /** * http request boilerplate @@ -705,6 +705,13 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec) hc->method = HTTP_REQ_POST; hc->target_path_offset = method_offset + 5; } + else if (!memcmp (hc->rx_buf + method_offset, "CONNECT ", 8)) + { + HTTP_DBG (0, "CONNECT method"); + hc->method = HTTP_REQ_CONNECT; + hc->target_path_offset = method_offset + 8; + hc->is_tunnel = 1; + } else { if (hc->rx_buf[method_offset] - 'A' <= 'Z' - 'A') @@ -930,6 +937,11 @@ http_identify_message_body (http_conn_t *hc, http_status_code_t *ec) HTTP_DBG (2, "no header, no message-body"); return 0; } + if (hc->is_tunnel) + { + HTTP_DBG (2, "tunnel, no message-body"); + return 0; + } /* TODO check for chunked transfer coding */ @@ -1271,11 +1283,21 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp) /* Date */ format_clib_timebase_time, now, /* Server */ - hc->app_name, - /* Length */ - msg.data.body_len, - /* Any headers from app? */ - msg.data.headers_len ? "" : "\r\n"); + hc->app_name); + + /* RFC9110 9.3.6: A server MUST NOT send Content-Length header field in a + * 2xx (Successful) response to CONNECT. */ + if (hc->is_tunnel && http_status_code_str[msg.code][0] == '2') + { + ASSERT (msg.data.body_len == 0); + hc->state = HTTP_CONN_STATE_TUNNEL; + /* cleanup some stuff we don't need anymore in tunnel mode */ + http_conn_timer_stop (hc); + vec_free (hc->rx_buf); + http_buffer_free (&hc->tx_buf); + } + else + response = format (response, content_len_template, msg.data.body_len); /* Add headers from app (if any) */ if (msg.data.headers_len) @@ -1298,6 +1320,11 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp) ASSERT (rv == msg.data.headers_len); } } + else + { + /* No headers from app */ + response = format (response, "\r\n"); + } HTTP_DBG (3, "%v", response); sent = http_send_data (hc, response, vec_len (response)); @@ -1650,6 +1677,47 @@ http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp) } static int +http_tunnel_rx (session_t *ts, http_conn_t *hc) +{ + u32 max_deq, max_enq, max_read, n_segs = 2; + svm_fifo_seg_t segs[n_segs]; + int n_written = 0; + session_t *as; + app_worker_t *app_wrk; + + HTTP_DBG (1, "tunnel received data from client"); + + as = session_get_from_handle (hc->h_pa_session_handle); + + max_deq = svm_fifo_max_dequeue (ts->rx_fifo); + if (PREDICT_FALSE (max_deq == 0)) + { + HTTP_DBG (1, "max_deq == 0"); + return 0; + } + max_enq = svm_fifo_max_enqueue (as->rx_fifo); + if (max_enq == 0) + { + HTTP_DBG (1, "app's rx fifo full"); + svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; + } + max_read = clib_min (max_enq, max_deq); + svm_fifo_segments (ts->rx_fifo, 0, segs, &n_segs, max_read); + n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0); + ASSERT (n_written > 0); + HTTP_DBG (1, "transfered %u bytes", n_written); + svm_fifo_dequeue_drop (ts->rx_fifo, n_written); + app_wrk = app_worker_get_if_valid (as->app_wrk_index); + if (app_wrk) + app_worker_rx_notify (app_wrk, as); + if (svm_fifo_max_dequeue_cons (ts->rx_fifo)) + session_program_rx_io_evt (session_handle (ts)); + + return 0; +} + +static int http_ts_rx_callback (session_t *ts) { http_conn_t *hc; @@ -1665,6 +1733,9 @@ http_ts_rx_callback (session_t *ts) return 0; } + if (hc->state == HTTP_CONN_STATE_TUNNEL) + return http_tunnel_rx (ts, hc); + if (!http_state_is_rx_valid (hc)) { if (hc->state != HTTP_CONN_STATE_CLOSED) @@ -1691,6 +1762,7 @@ http_ts_builtin_tx_callback (session_t *ts) http_conn_t *hc; hc = http_conn_get_w_thread (ts->opaque, ts->thread_index); + HTTP_DBG (1, "transport connection reschedule"); transport_connection_reschedule (&hc->connection); return 0; @@ -2018,6 +2090,54 @@ http_transport_get_listener (u32 listener_index) } static int +http_tunnel_tx (http_conn_t *hc, session_t *as, transport_send_params_t *sp) +{ + u32 max_deq, max_enq, max_read, n_segs = 2; + svm_fifo_seg_t segs[n_segs]; + session_t *ts; + int n_written = 0; + + HTTP_DBG (1, "tunnel received data from target"); + + ts = session_get_from_handle (hc->h_tc_session_handle); + + max_deq = svm_fifo_max_dequeue_cons (as->tx_fifo); + if (PREDICT_FALSE (max_deq == 0)) + { + HTTP_DBG (1, "max_deq == 0"); + goto check_fifo; + } + max_enq = svm_fifo_max_enqueue_prod (ts->tx_fifo); + if (max_enq == 0) + { + HTTP_DBG (1, "ts tx fifo full"); + goto check_fifo; + } + max_read = clib_min (max_enq, max_deq); + max_read = clib_min (max_read, sp->max_burst_size); + svm_fifo_segments (as->tx_fifo, 0, segs, &n_segs, max_read); + n_written = svm_fifo_enqueue_segments (ts->tx_fifo, segs, n_segs, 0); + ASSERT (n_written > 0); + HTTP_DBG (1, "transfered %u bytes", n_written); + sp->bytes_dequeued += n_written; + sp->max_burst_size -= n_written; + svm_fifo_dequeue_drop (as->tx_fifo, n_written); + if (svm_fifo_set_event (ts->tx_fifo)) + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); + +check_fifo: + /* Deschedule and wait for deq notification if ts fifo is almost full */ + if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH) + { + svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + transport_connection_deschedule (&hc->connection); + sp->flags |= TRANSPORT_SND_F_DESCHED; + } + + return n_written > 0 ? clib_max (n_written / TRANSPORT_PACER_MIN_MSS, 1) : 0; +} + +static int http_app_tx_callback (void *session, transport_send_params_t *sp) { session_t *as = (session_t *) session; @@ -2027,6 +2147,13 @@ http_app_tx_callback (void *session, transport_send_params_t *sp) HTTP_DBG (1, "hc [%u]%x", as->thread_index, as->connection_index); hc = http_conn_get_w_thread (as->connection_index, as->thread_index); + + max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS; + sp->max_burst_size = max_burst_sz; + + if (hc->state == HTTP_CONN_STATE_TUNNEL) + return http_tunnel_tx (hc, as, sp); + if (!http_state_is_tx_valid (hc)) { if (hc->state != HTTP_CONN_STATE_CLOSED) @@ -2040,9 +2167,6 @@ http_app_tx_callback (void *session, transport_send_params_t *sp) return 0; } - max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS; - sp->max_burst_size = max_burst_sz; - HTTP_DBG (1, "run state machine"); http_req_run_state_machine (hc, sp); @@ -2057,6 +2181,19 @@ http_app_tx_callback (void *session, transport_send_params_t *sp) return sent > 0 ? clib_max (sent / TRANSPORT_PACER_MIN_MSS, 1) : 0; } +static int +http_app_rx_evt_cb (transport_connection_t *tc) +{ + http_conn_t *hc = (http_conn_t *) tc; + HTTP_DBG (1, "hc [%u]%x", vlib_get_thread_index (), hc->h_hc_index); + session_t *ts = session_get_from_handle (hc->h_tc_session_handle); + + if (hc->state == HTTP_CONN_STATE_TUNNEL) + return http_tunnel_rx (ts, hc); + + return 0; +} + static void http_transport_get_endpoint (u32 hc_index, u32 thread_index, transport_endpoint_t *tep, u8 is_lcl) @@ -2114,6 +2251,9 @@ format_http_conn_state (u8 *s, va_list *args) case HTTP_CONN_STATE_ESTABLISHED: s = format (s, "ESTABLISHED"); break; + case HTTP_CONN_STATE_TUNNEL: + s = format (s, "TUNNEL"); + break; case HTTP_CONN_STATE_TRANSPORT_CLOSED: s = format (s, "TRANSPORT_CLOSED"); break; @@ -2212,6 +2352,7 @@ static const transport_proto_vft_t http_proto = { .close = http_transport_close, .cleanup_ho = http_transport_cleanup_ho, .custom_tx = http_app_tx_callback, + .app_rx_evt = http_app_rx_evt_cb, .get_connection = http_transport_get_connection, .get_listener = http_transport_get_listener, .get_half_open = http_transport_get_ho, diff --git a/src/plugins/http/http.h b/src/plugins/http/http.h index 04c53d15ecb..a117f374efa 100644 --- a/src/plugins/http/http.h +++ b/src/plugins/http/http.h @@ -64,6 +64,7 @@ typedef enum http_conn_state_ HTTP_CONN_STATE_LISTEN, HTTP_CONN_STATE_CONNECTING, HTTP_CONN_STATE_ESTABLISHED, + HTTP_CONN_STATE_TUNNEL, HTTP_CONN_STATE_TRANSPORT_CLOSED, HTTP_CONN_STATE_APP_CLOSED, HTTP_CONN_STATE_CLOSED @@ -85,6 +86,7 @@ typedef enum http_req_method_ { HTTP_REQ_GET = 0, HTTP_REQ_POST, + HTTP_REQ_CONNECT, } http_req_method_t; typedef enum http_msg_type_ @@ -415,6 +417,7 @@ typedef struct http_tc_ u32 body_offset; u64 body_len; u16 status_code; + u8 is_tunnel; } http_conn_t; typedef struct http_worker_ |