aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--extras/hs-test/proxy_test.go36
-rw-r--r--src/plugins/hs_apps/proxy.c240
-rw-r--r--src/plugins/hs_apps/proxy.h1
-rw-r--r--src/plugins/http/http.c163
-rw-r--r--src/plugins/http/http.h3
5 files changed, 368 insertions, 75 deletions
diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go
index b914242d34f..5ca151f6228 100644
--- a/extras/hs-test/proxy_test.go
+++ b/extras/hs-test/proxy_test.go
@@ -7,7 +7,8 @@ import (
)
func init() {
- RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest)
+ RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest,
+ VppConnectProxyGetTest, VppConnectProxyPutTest)
RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest)
RegisterNginxProxyTests(NginxMirroringTest)
RegisterNginxProxySoloTests(MirrorMultiThreadTest)
@@ -15,14 +16,11 @@ func init() {
func configureVppProxy(s *VppProxySuite, proto string, proxyPort uint16) {
vppProxy := s.GetContainerByName(VppProxyContainerName).VppInstance
- output := vppProxy.Vppctl(
- "test proxy server server-uri %s://%s/%d client-uri tcp://%s/%d",
- proto,
- s.VppProxyAddr(),
- proxyPort,
- s.NginxAddr(),
- s.NginxPort(),
- )
+ cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri %s://%s/%d", proto, s.VppProxyAddr(), proxyPort)
+ if proto != "http" {
+ cmd += fmt.Sprintf(" client-uri tcp://%s/%d", s.NginxAddr(), s.NginxPort())
+ }
+ output := vppProxy.Vppctl(cmd)
s.Log("proxy configured: " + output)
}
@@ -83,3 +81,23 @@ func nginxMirroring(s *NginxProxySuite, multiThreadWorkers bool) {
uri := fmt.Sprintf("http://%s:%d/httpTestFile", s.ProxyAddr(), s.ProxyPort())
s.CurlDownloadResource(uri)
}
+
+func VppConnectProxyGetTest(s *VppProxySuite) {
+ var proxyPort uint16 = 8080
+
+ configureVppProxy(s, "http", proxyPort)
+
+ targetUri := fmt.Sprintf("http://%s:%d/httpTestFile", s.NginxAddr(), s.NginxPort())
+ proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort)
+ s.CurlDownloadResourceViaTunnel(targetUri, proxyUri)
+}
+
+func VppConnectProxyPutTest(s *VppProxySuite) {
+ var proxyPort uint16 = 8080
+
+ configureVppProxy(s, "http", proxyPort)
+
+ proxyUri := fmt.Sprintf("http://%s:%d", s.VppProxyAddr(), proxyPort)
+ targetUri := fmt.Sprintf("http://%s:%d/upload/testFile", s.NginxAddr(), s.NginxPort())
+ s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile)
+}
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c
index 999f8d15c20..0d24ebc6525 100644
--- a/src/plugins/hs_apps/proxy.c
+++ b/src/plugins/hs_apps/proxy.c
@@ -19,6 +19,8 @@
#include <vnet/session/application_interface.h>
#include <hs_apps/proxy.h>
#include <vnet/tcp/tcp.h>
+#include <http/http.h>
+#include <http/http_header_names.h>
proxy_main_t proxy_main;
@@ -50,6 +52,41 @@ proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index)
}
static void
+proxy_send_http_resp (session_t *s, http_status_code_t sc,
+ http_header_t *resp_headers)
+{
+ http_msg_t msg;
+ int rv;
+ u8 *headers_buf = 0;
+
+ if (vec_len (resp_headers))
+ {
+ headers_buf = http_serialize_headers (resp_headers);
+ msg.data.len = msg.data.headers_len = vec_len (headers_buf);
+ }
+ else
+ msg.data.len = msg.data.headers_len = 0;
+
+ msg.type = HTTP_MSG_REPLY;
+ msg.code = sc;
+ msg.data.type = HTTP_MSG_DATA_INLINE;
+ msg.data.headers_offset = 0;
+ msg.data.body_len = 0;
+ msg.data.body_offset = 0;
+ rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
+ if (msg.data.headers_len)
+ {
+ rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf);
+ ASSERT (rv == vec_len (headers_buf));
+ vec_free (headers_buf);
+ }
+
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+}
+
+static void
proxy_do_connect (vnet_connect_args_t *a)
{
ASSERT (session_vlib_thread_is_cl_thread ());
@@ -387,6 +424,7 @@ proxy_accept_callback (session_t * s)
proxy_session_side_ctx_t *sc;
proxy_session_t *ps;
proxy_worker_t *wrk;
+ transport_proto_t tp = session_get_transport_proto (s);
wrk = proxy_worker_get (s->thread_index);
sc = proxy_session_side_ctx_alloc (wrk);
@@ -402,6 +440,7 @@ proxy_accept_callback (session_t * s)
ps->ao.session_handle = SESSION_INVALID_HANDLE;
sc->ps_index = ps->ps_index;
+ sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -450,6 +489,7 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
proxy_main_t *pm = &proxy_main;
u32 max_dequeue, ps_index;
proxy_session_t *ps;
+ transport_proto_t tp = session_get_transport_proto (s);
clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -467,20 +507,79 @@ proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
clib_spinlock_unlock_if_init (&pm->sessions_lock);
- max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
- if (PREDICT_FALSE (max_dequeue == 0))
- return;
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ http_msg_t msg;
+ u8 *target_buf = 0;
+ http_uri_t target_uri;
+ http_header_t *resp_headers = 0;
+ session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL;
+ int rv;
- max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
- actual_transfer = svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */,
- max_dequeue, pm->rx_buf[s->thread_index]);
+ rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
- /* Expectation is that here actual data just received is parsed and based
- * on its contents, the destination and parameters of the connect to the
- * upstream are decided
- */
+ if (msg.type != HTTP_MSG_REQUEST)
+ {
+ proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0);
+ return;
+ }
+ if (msg.method_type != HTTP_REQ_CONNECT)
+ {
+ http_add_header (&resp_headers,
+ http_header_name_token (HTTP_HEADER_ALLOW),
+ http_token_lit ("CONNECT"));
+ proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED,
+ resp_headers);
+ vec_free (resp_headers);
+ return;
+ }
+
+ if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM ||
+ msg.data.target_path_len == 0)
+ {
+ proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+ return;
+ }
+
+ /* read target uri */
+ target_buf = vec_new (u8, msg.data.target_path_len);
+ rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset,
+ msg.data.target_path_len, target_buf);
+ ASSERT (rv == msg.data.target_path_len);
+ svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len);
+ rv = http_parse_authority_form_target (target_buf, &target_uri);
+ vec_free (target_buf);
+ if (rv)
+ {
+ proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+ return;
+ }
+ target_sep.is_ip4 = target_uri.is_ip4;
+ target_sep.ip = target_uri.ip;
+ target_sep.port = target_uri.port;
+ target_sep.transport_proto = TRANSPORT_PROTO_TCP;
+ clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep));
+ }
+ else
+ {
+ max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+ if (PREDICT_FALSE (max_dequeue == 0))
+ return;
+
+ max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
+ actual_transfer =
+ svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue,
+ pm->rx_buf[s->thread_index]);
+
+ /* Expectation is that here actual data just received is parsed and based
+ * on its contents, the destination and parameters of the connect to the
+ * upstream are decided
+ */
+
+ clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
+ }
- clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
a->api_context = ps_index;
a->app_index = pm->active_open_app_index;
@@ -664,6 +763,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
proxy_session_t *ps;
proxy_worker_t *wrk;
proxy_session_side_ctx_t *sc;
+ session_t *po_s;
+ transport_proto_t tp;
/* Connection failed */
if (err)
@@ -671,6 +772,12 @@ active_open_connected_callback (u32 app_index, u32 opaque,
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (opaque);
+ po_s = session_get_from_handle (ps->po.session_handle);
+ tp = session_get_transport_proto (po_s);
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0);
+ }
ps->ao_disconnected = 1;
proxy_session_close_po (ps);
@@ -700,6 +807,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
return -1;
}
+ po_s = session_get_from_handle (ps->po.session_handle);
+ tp = session_get_transport_proto (po_s);
+
sc = proxy_session_side_ctx_alloc (wrk);
sc->pair = ps->po;
sc->ps_index = ps->ps_index;
@@ -708,13 +818,21 @@ active_open_connected_callback (u32 app_index, u32 opaque,
sc->state = PROXY_SC_S_ESTABLISHED;
s->opaque = sc->sc_index;
+ sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
- /*
- * Send event for active open tx fifo
- */
- ASSERT (s->thread_index == vlib_get_thread_index ());
- if (svm_fifo_set_event (s->tx_fifo))
- session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0);
+ }
+ else
+ {
+ /*
+ * Send event for active open tx fifo
+ */
+ ASSERT (s->thread_index == vlib_get_thread_index ());
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
+ }
return 0;
}
@@ -881,11 +999,21 @@ active_open_tx_callback (session_t * ao_s)
if (sc->state < PROXY_SC_S_ESTABLISHED)
return 0;
- /* Force ack on proxy side to update rcv wnd */
- void *arg = uword_to_pointer (sc->pair.session_handle, void *);
- session_send_rpc_evt_to_thread (
- session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
- arg);
+ if (sc->is_http)
+ {
+ /* notify HTTP transport */
+ session_t *po = session_get_from_handle (sc->pair.session_handle);
+ session_send_io_evt_to_thread_custom (
+ &po->session_index, po->thread_index, SESSION_IO_EVT_RX);
+ }
+ else
+ {
+ /* Force ack on proxy side to update rcv wnd */
+ void *arg = uword_to_pointer (sc->pair.session_handle, void *);
+ session_send_rpc_evt_to_thread (
+ session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+ arg);
+ }
return 0;
}
@@ -1066,11 +1194,6 @@ proxy_server_create (vlib_main_t * vm)
clib_warning ("failed to attach server app");
return -1;
}
- if (proxy_server_listen ())
- {
- clib_warning ("failed to start listening");
- return -1;
- }
if (active_open_attach ())
{
clib_warning ("failed to attach active open app");
@@ -1147,38 +1270,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
default_server_uri);
server_uri = format (0, "%s%c", default_server_uri, 0);
}
- if (!client_uri)
- {
- clib_warning ("No client-uri provided, Using default: %s",
- default_client_uri);
- client_uri = format (0, "%s%c", default_client_uri, 0);
- }
-
if (parse_uri ((char *) server_uri, &pm->server_sep))
{
error = clib_error_return (0, "Invalid server uri %v", server_uri);
goto done;
}
- if (parse_uri ((char *) client_uri, &pm->client_sep))
+
+ /* http proxy get target within request */
+ if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP)
{
- error = clib_error_return (0, "Invalid client uri %v", client_uri);
- goto done;
+ if (!client_uri)
+ {
+ clib_warning ("No client-uri provided, Using default: %s",
+ default_client_uri);
+ client_uri = format (0, "%s%c", default_client_uri, 0);
+ }
+ if (parse_uri ((char *) client_uri, &pm->client_sep))
+ {
+ error = clib_error_return (0, "Invalid client uri %v", client_uri);
+ goto done;
+ }
}
- session_enable_disable_args_t args = { .is_en = 1,
- .rt_engine_type =
- RT_BACKEND_ENGINE_RULE_TABLE };
- vnet_session_enable_disable (vm, &args);
-
- rv = proxy_server_create (vm);
- switch (rv)
+ if (pm->server_app_index == APP_INVALID_INDEX)
{
- case 0:
- break;
- default:
- error = clib_error_return (0, "server_create returned %d", rv);
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vnet_session_enable_disable (vm, &args);
+ rv = proxy_server_create (vm);
+ if (rv)
+ {
+ error = clib_error_return (0, "server_create returned %d", rv);
+ goto done;
+ }
}
+ if (proxy_server_listen ())
+ error = clib_error_return (0, "failed to start listening");
+
done:
unformat_free (line_input);
vec_free (client_uri);
@@ -1186,14 +1316,13 @@ done:
return error;
}
-VLIB_CLI_COMMAND (proxy_create_command, static) =
-{
+VLIB_CLI_COMMAND (proxy_create_command, static) = {
.path = "test proxy server",
- .short_help = "test proxy server [server-uri <tcp://ip/port>]"
- "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
- "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
- "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
- "[private-segment-size <mem>][private-segment-count <nn>]",
+ .short_help = "test proxy server [server-uri <proto://ip/port>]"
+ "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
+ "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
+ "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
+ "[private-segment-size <mem>][private-segment-count <nn>]",
.function = proxy_server_create_command_fn,
};
@@ -1203,6 +1332,7 @@ proxy_main_init (vlib_main_t * vm)
proxy_main_t *pm = &proxy_main;
pm->server_client_index = ~0;
pm->active_open_client_index = ~0;
+ pm->server_app_index = APP_INVALID_INDEX;
return 0;
}
diff --git a/src/plugins/hs_apps/proxy.h b/src/plugins/hs_apps/proxy.h
index 86db69c21ae..789e5613520 100644
--- a/src/plugins/hs_apps/proxy.h
+++ b/src/plugins/hs_apps/proxy.h
@@ -51,6 +51,7 @@ typedef struct proxy_session_side_ctx_
proxy_session_side_state_t state;
u32 sc_index;
u32 ps_index;
+ u8 is_http;
} proxy_session_side_ctx_t;
typedef struct
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index 1a92797c50b..6659de9689f 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -447,9 +447,9 @@ static const char *http_error_template = "HTTP/1.1 %s\r\n"
*/
static const char *http_response_template = "HTTP/1.1 %s\r\n"
"Date: %U GMT\r\n"
- "Server: %v\r\n"
- "Content-Length: %llu\r\n"
- "%s";
+ "Server: %v\r\n";
+
+static const char *content_len_template = "Content-Length: %llu\r\n";
/**
* http request boilerplate
@@ -705,6 +705,13 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec)
hc->method = HTTP_REQ_POST;
hc->target_path_offset = method_offset + 5;
}
+ else if (!memcmp (hc->rx_buf + method_offset, "CONNECT ", 8))
+ {
+ HTTP_DBG (0, "CONNECT method");
+ hc->method = HTTP_REQ_CONNECT;
+ hc->target_path_offset = method_offset + 8;
+ hc->is_tunnel = 1;
+ }
else
{
if (hc->rx_buf[method_offset] - 'A' <= 'Z' - 'A')
@@ -930,6 +937,11 @@ http_identify_message_body (http_conn_t *hc, http_status_code_t *ec)
HTTP_DBG (2, "no header, no message-body");
return 0;
}
+ if (hc->is_tunnel)
+ {
+ HTTP_DBG (2, "tunnel, no message-body");
+ return 0;
+ }
/* TODO check for chunked transfer coding */
@@ -1271,11 +1283,21 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp)
/* Date */
format_clib_timebase_time, now,
/* Server */
- hc->app_name,
- /* Length */
- msg.data.body_len,
- /* Any headers from app? */
- msg.data.headers_len ? "" : "\r\n");
+ hc->app_name);
+
+ /* RFC9110 9.3.6: A server MUST NOT send Content-Length header field in a
+ * 2xx (Successful) response to CONNECT. */
+ if (hc->is_tunnel && http_status_code_str[msg.code][0] == '2')
+ {
+ ASSERT (msg.data.body_len == 0);
+ hc->state = HTTP_CONN_STATE_TUNNEL;
+ /* cleanup some stuff we don't need anymore in tunnel mode */
+ http_conn_timer_stop (hc);
+ vec_free (hc->rx_buf);
+ http_buffer_free (&hc->tx_buf);
+ }
+ else
+ response = format (response, content_len_template, msg.data.body_len);
/* Add headers from app (if any) */
if (msg.data.headers_len)
@@ -1298,6 +1320,11 @@ http_state_wait_app_reply (http_conn_t *hc, transport_send_params_t *sp)
ASSERT (rv == msg.data.headers_len);
}
}
+ else
+ {
+ /* No headers from app */
+ response = format (response, "\r\n");
+ }
HTTP_DBG (3, "%v", response);
sent = http_send_data (hc, response, vec_len (response));
@@ -1650,6 +1677,47 @@ http_req_run_state_machine (http_conn_t *hc, transport_send_params_t *sp)
}
static int
+http_tunnel_rx (session_t *ts, http_conn_t *hc)
+{
+ u32 max_deq, max_enq, max_read, n_segs = 2;
+ svm_fifo_seg_t segs[n_segs];
+ int n_written = 0;
+ session_t *as;
+ app_worker_t *app_wrk;
+
+ HTTP_DBG (1, "tunnel received data from client");
+
+ as = session_get_from_handle (hc->h_pa_session_handle);
+
+ max_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+ if (PREDICT_FALSE (max_deq == 0))
+ {
+ HTTP_DBG (1, "max_deq == 0");
+ return 0;
+ }
+ max_enq = svm_fifo_max_enqueue (as->rx_fifo);
+ if (max_enq == 0)
+ {
+ HTTP_DBG (1, "app's rx fifo full");
+ svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ return 0;
+ }
+ max_read = clib_min (max_enq, max_deq);
+ svm_fifo_segments (ts->rx_fifo, 0, segs, &n_segs, max_read);
+ n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0);
+ ASSERT (n_written > 0);
+ HTTP_DBG (1, "transfered %u bytes", n_written);
+ svm_fifo_dequeue_drop (ts->rx_fifo, n_written);
+ app_wrk = app_worker_get_if_valid (as->app_wrk_index);
+ if (app_wrk)
+ app_worker_rx_notify (app_wrk, as);
+ if (svm_fifo_max_dequeue_cons (ts->rx_fifo))
+ session_program_rx_io_evt (session_handle (ts));
+
+ return 0;
+}
+
+static int
http_ts_rx_callback (session_t *ts)
{
http_conn_t *hc;
@@ -1665,6 +1733,9 @@ http_ts_rx_callback (session_t *ts)
return 0;
}
+ if (hc->state == HTTP_CONN_STATE_TUNNEL)
+ return http_tunnel_rx (ts, hc);
+
if (!http_state_is_rx_valid (hc))
{
if (hc->state != HTTP_CONN_STATE_CLOSED)
@@ -1691,6 +1762,7 @@ http_ts_builtin_tx_callback (session_t *ts)
http_conn_t *hc;
hc = http_conn_get_w_thread (ts->opaque, ts->thread_index);
+ HTTP_DBG (1, "transport connection reschedule");
transport_connection_reschedule (&hc->connection);
return 0;
@@ -2018,6 +2090,54 @@ http_transport_get_listener (u32 listener_index)
}
static int
+http_tunnel_tx (http_conn_t *hc, session_t *as, transport_send_params_t *sp)
+{
+ u32 max_deq, max_enq, max_read, n_segs = 2;
+ svm_fifo_seg_t segs[n_segs];
+ session_t *ts;
+ int n_written = 0;
+
+ HTTP_DBG (1, "tunnel received data from target");
+
+ ts = session_get_from_handle (hc->h_tc_session_handle);
+
+ max_deq = svm_fifo_max_dequeue_cons (as->tx_fifo);
+ if (PREDICT_FALSE (max_deq == 0))
+ {
+ HTTP_DBG (1, "max_deq == 0");
+ goto check_fifo;
+ }
+ max_enq = svm_fifo_max_enqueue_prod (ts->tx_fifo);
+ if (max_enq == 0)
+ {
+ HTTP_DBG (1, "ts tx fifo full");
+ goto check_fifo;
+ }
+ max_read = clib_min (max_enq, max_deq);
+ max_read = clib_min (max_read, sp->max_burst_size);
+ svm_fifo_segments (as->tx_fifo, 0, segs, &n_segs, max_read);
+ n_written = svm_fifo_enqueue_segments (ts->tx_fifo, segs, n_segs, 0);
+ ASSERT (n_written > 0);
+ HTTP_DBG (1, "transfered %u bytes", n_written);
+ sp->bytes_dequeued += n_written;
+ sp->max_burst_size -= n_written;
+ svm_fifo_dequeue_drop (as->tx_fifo, n_written);
+ if (svm_fifo_set_event (ts->tx_fifo))
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
+
+check_fifo:
+ /* Deschedule and wait for deq notification if ts fifo is almost full */
+ if (svm_fifo_max_enqueue (ts->tx_fifo) < HTTP_FIFO_THRESH)
+ {
+ svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ transport_connection_deschedule (&hc->connection);
+ sp->flags |= TRANSPORT_SND_F_DESCHED;
+ }
+
+ return n_written > 0 ? clib_max (n_written / TRANSPORT_PACER_MIN_MSS, 1) : 0;
+}
+
+static int
http_app_tx_callback (void *session, transport_send_params_t *sp)
{
session_t *as = (session_t *) session;
@@ -2027,6 +2147,13 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
HTTP_DBG (1, "hc [%u]%x", as->thread_index, as->connection_index);
hc = http_conn_get_w_thread (as->connection_index, as->thread_index);
+
+ max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS;
+ sp->max_burst_size = max_burst_sz;
+
+ if (hc->state == HTTP_CONN_STATE_TUNNEL)
+ return http_tunnel_tx (hc, as, sp);
+
if (!http_state_is_tx_valid (hc))
{
if (hc->state != HTTP_CONN_STATE_CLOSED)
@@ -2040,9 +2167,6 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
return 0;
}
- max_burst_sz = sp->max_burst_size * TRANSPORT_PACER_MIN_MSS;
- sp->max_burst_size = max_burst_sz;
-
HTTP_DBG (1, "run state machine");
http_req_run_state_machine (hc, sp);
@@ -2057,6 +2181,19 @@ http_app_tx_callback (void *session, transport_send_params_t *sp)
return sent > 0 ? clib_max (sent / TRANSPORT_PACER_MIN_MSS, 1) : 0;
}
+static int
+http_app_rx_evt_cb (transport_connection_t *tc)
+{
+ http_conn_t *hc = (http_conn_t *) tc;
+ HTTP_DBG (1, "hc [%u]%x", vlib_get_thread_index (), hc->h_hc_index);
+ session_t *ts = session_get_from_handle (hc->h_tc_session_handle);
+
+ if (hc->state == HTTP_CONN_STATE_TUNNEL)
+ return http_tunnel_rx (ts, hc);
+
+ return 0;
+}
+
static void
http_transport_get_endpoint (u32 hc_index, u32 thread_index,
transport_endpoint_t *tep, u8 is_lcl)
@@ -2114,6 +2251,9 @@ format_http_conn_state (u8 *s, va_list *args)
case HTTP_CONN_STATE_ESTABLISHED:
s = format (s, "ESTABLISHED");
break;
+ case HTTP_CONN_STATE_TUNNEL:
+ s = format (s, "TUNNEL");
+ break;
case HTTP_CONN_STATE_TRANSPORT_CLOSED:
s = format (s, "TRANSPORT_CLOSED");
break;
@@ -2212,6 +2352,7 @@ static const transport_proto_vft_t http_proto = {
.close = http_transport_close,
.cleanup_ho = http_transport_cleanup_ho,
.custom_tx = http_app_tx_callback,
+ .app_rx_evt = http_app_rx_evt_cb,
.get_connection = http_transport_get_connection,
.get_listener = http_transport_get_listener,
.get_half_open = http_transport_get_ho,
diff --git a/src/plugins/http/http.h b/src/plugins/http/http.h
index 04c53d15ecb..a117f374efa 100644
--- a/src/plugins/http/http.h
+++ b/src/plugins/http/http.h
@@ -64,6 +64,7 @@ typedef enum http_conn_state_
HTTP_CONN_STATE_LISTEN,
HTTP_CONN_STATE_CONNECTING,
HTTP_CONN_STATE_ESTABLISHED,
+ HTTP_CONN_STATE_TUNNEL,
HTTP_CONN_STATE_TRANSPORT_CLOSED,
HTTP_CONN_STATE_APP_CLOSED,
HTTP_CONN_STATE_CLOSED
@@ -85,6 +86,7 @@ typedef enum http_req_method_
{
HTTP_REQ_GET = 0,
HTTP_REQ_POST,
+ HTTP_REQ_CONNECT,
} http_req_method_t;
typedef enum http_msg_type_
@@ -415,6 +417,7 @@ typedef struct http_tc_
u32 body_offset;
u64 body_len;
u16 status_code;
+ u8 is_tunnel;
} http_conn_t;
typedef struct http_worker_