aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/hs_apps
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/hs_apps')
-rw-r--r--src/plugins/hs_apps/CMakeLists.txt2
-rw-r--r--src/plugins/hs_apps/echo_client.c14
-rw-r--r--src/plugins/hs_apps/echo_server.c21
-rw-r--r--src/plugins/hs_apps/http_cli.c226
-rw-r--r--src/plugins/hs_apps/http_client.c743
-rw-r--r--src/plugins/hs_apps/http_client_cli.c141
-rw-r--r--src/plugins/hs_apps/http_tps.c192
-rw-r--r--src/plugins/hs_apps/proxy.c857
-rw-r--r--src/plugins/hs_apps/proxy.h52
-rw-r--r--src/plugins/hs_apps/sapi/vpp_echo_common.c4
-rw-r--r--src/plugins/hs_apps/test_builtins.c192
-rw-r--r--src/plugins/hs_apps/vcl/vcl_test.h35
-rw-r--r--src/plugins/hs_apps/vcl/vcl_test_client.c9
-rw-r--r--src/plugins/hs_apps/vcl/vcl_test_protos.c429
-rw-r--r--src/plugins/hs_apps/vcl/vcl_test_server.c57
15 files changed, 2585 insertions, 389 deletions
diff --git a/src/plugins/hs_apps/CMakeLists.txt b/src/plugins/hs_apps/CMakeLists.txt
index 179c9c7a4c4..eae100949d4 100644
--- a/src/plugins/hs_apps/CMakeLists.txt
+++ b/src/plugins/hs_apps/CMakeLists.txt
@@ -21,8 +21,10 @@ add_vpp_plugin(hs_apps
hs_apps.c
http_cli.c
http_client_cli.c
+ http_client.c
http_tps.c
proxy.c
+ test_builtins.c
)
##############################################################################
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index d1443e75e80..d5edffbd02e 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -429,8 +429,11 @@ ec_init (vlib_main_t *vm)
ecm->app_is_init = 1;
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
vlib_worker_thread_barrier_sync (vm);
- vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
+ vnet_session_enable_disable (vm, &args);
/* Turn on the builtin client input nodes */
foreach_vlib_main ()
@@ -943,15 +946,16 @@ ec_connect_rpc (void *args)
a->api_context = ci;
if (needs_crypto)
{
- session_endpoint_alloc_ext_cfg (&a->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = ecm->ckpair_index;
}
rv = vnet_connect (a);
if (needs_crypto)
- clib_mem_free (a->sep_ext.ext_cfg);
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
if (rv)
{
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c
index 0243252434a..b981e775b57 100644
--- a/src/plugins/hs_apps/echo_server.c
+++ b/src/plugins/hs_apps/echo_server.c
@@ -591,6 +591,7 @@ echo_server_listen ()
i32 rv;
echo_server_main_t *esm = &echo_server_main;
vnet_listen_args_t _args = {}, *args = &_args;
+ int needs_crypto;
if ((rv = parse_uri (esm->server_uri, &args->sep_ext)))
{
@@ -598,11 +599,14 @@ echo_server_listen ()
}
args->app_index = esm->app_index;
args->sep_ext.port = hs_make_data_port (args->sep_ext.port);
- if (echo_client_transport_needs_crypto (args->sep_ext.transport_proto))
+ needs_crypto =
+ echo_client_transport_needs_crypto (args->sep_ext.transport_proto);
+ if (needs_crypto)
{
- session_endpoint_alloc_ext_cfg (&args->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- args->sep_ext.ext_cfg->crypto.ckpair_index = esm->ckpair_index;
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &args->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = esm->ckpair_index;
}
if (args->sep_ext.transport_proto == TRANSPORT_PROTO_UDP)
@@ -612,8 +616,8 @@ echo_server_listen ()
rv = vnet_listen (args);
esm->listener_handle = args->handle;
- if (args->sep_ext.ext_cfg)
- clib_mem_free (args->sep_ext.ext_cfg);
+ if (needs_crypto)
+ session_endpoint_free_ext_cfgs (&args->sep_ext);
return rv;
}
@@ -736,7 +740,10 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
goto cleanup;
}
- vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vnet_session_enable_disable (vm, &args);
if (!server_uri_set)
{
diff --git a/src/plugins/hs_apps/http_cli.c b/src/plugins/hs_apps/http_cli.c
index 4970da72012..dfa90f9eced 100644
--- a/src/plugins/hs_apps/http_cli.c
+++ b/src/plugins/hs_apps/http_cli.c
@@ -17,6 +17,8 @@
#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
#include <http/http.h>
+#include <http/http_header_names.h>
+#include <http/http_content_types.h>
#define HCS_DEBUG 0
@@ -28,6 +30,12 @@
typedef struct
{
+ u32 handle;
+ u8 *uri;
+} hcs_uri_map_t;
+
+typedef struct
+{
u32 hs_index;
u32 thread_index;
u64 node_index;
@@ -43,6 +51,7 @@ typedef struct
u8 *tx_buf;
u32 tx_offset;
u32 vpp_session_index;
+ http_header_t *resp_headers;
} hcs_session_t;
typedef struct
@@ -59,6 +68,16 @@ typedef struct
u32 fifo_size;
u8 *uri;
vlib_main_t *vlib_main;
+
+ /* hash table to store uri -> uri map pool index */
+ uword *index_by_uri;
+
+ /* pool of uri maps */
+ hcs_uri_map_t *uri_map_pool;
+
+ /* for appns */
+ u8 *appns_id;
+ u64 appns_secret;
} hcs_main_t;
static hcs_main_t hcs_main;
@@ -148,31 +167,52 @@ hcs_cli_output (uword arg, u8 *buffer, uword buffer_bytes)
}
static void
-start_send_data (hcs_session_t *hs, http_status_code_t status,
- http_content_type_t type)
+start_send_data (hcs_session_t *hs, http_status_code_t status)
{
http_msg_t msg;
session_t *ts;
+ u8 *headers_buf = 0;
int rv;
+ if (vec_len (hs->resp_headers))
+ {
+ headers_buf = http_serialize_headers (hs->resp_headers);
+ vec_free (hs->resp_headers);
+ msg.data.headers_offset = 0;
+ msg.data.headers_len = vec_len (headers_buf);
+ }
+ else
+ {
+ msg.data.headers_offset = 0;
+ msg.data.headers_len = 0;
+ }
+
msg.type = HTTP_MSG_REPLY;
msg.code = status;
- msg.content_type = type;
msg.data.type = HTTP_MSG_DATA_INLINE;
- msg.data.len = vec_len (hs->tx_buf);
+ msg.data.body_len = vec_len (hs->tx_buf);
+ msg.data.body_offset = msg.data.headers_len;
+ msg.data.len = msg.data.body_len + msg.data.headers_len;
ts = session_get (hs->vpp_session_index, hs->thread_index);
rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
- if (!msg.data.len)
+ if (msg.data.headers_len)
+ {
+ rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf);
+ ASSERT (rv == msg.data.headers_len);
+ vec_free (headers_buf);
+ }
+
+ if (!msg.data.body_len)
goto done;
rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (hs->tx_buf), hs->tx_buf);
if (rv != vec_len (hs->tx_buf))
{
- hs->tx_offset = rv;
+ hs->tx_offset = (rv > 0) ? rv : 0;
svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
}
else
@@ -183,7 +223,7 @@ start_send_data (hcs_session_t *hs, http_status_code_t status,
done:
if (svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
}
static void
@@ -203,7 +243,12 @@ send_data_to_http (void *rpc_args)
hs->tx_buf = args->buf;
if (args->plain_text)
type = HTTP_CONTENT_TEXT_PLAIN;
- start_send_data (hs, HTTP_STATUS_OK, type);
+
+ http_add_header (&hs->resp_headers,
+ http_header_name_token (HTTP_HEADER_CONTENT_TYPE),
+ http_content_type_token (type));
+
+ start_send_data (hs, HTTP_STATUS_OK);
cleanup:
@@ -325,6 +370,7 @@ hcs_ts_rx_callback (session_t *ts)
hs = hcs_session_get (ts->thread_index, ts->opaque);
hs->tx_buf = 0;
+ hs->resp_headers = 0;
/* Read the http message header */
rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
@@ -332,16 +378,17 @@ hcs_ts_rx_callback (session_t *ts)
if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET)
{
- start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED,
- HTTP_CONTENT_TEXT_HTML);
+ http_add_header (&hs->resp_headers,
+ http_header_name_token (HTTP_HEADER_ALLOW),
+ http_token_lit ("GET"));
+ start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
goto done;
}
if (msg.data.target_path_len == 0 ||
msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
{
- hs->tx_buf = 0;
- start_send_data (hs, HTTP_STATUS_BAD_REQUEST, HTTP_CONTENT_TEXT_HTML);
+ start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
goto done;
}
@@ -353,13 +400,13 @@ hcs_ts_rx_callback (session_t *ts)
HCS_DBG ("%v", args.buf);
if (http_validate_abs_path_syntax (args.buf, &is_encoded))
{
- start_send_data (hs, HTTP_STATUS_BAD_REQUEST, HTTP_CONTENT_TEXT_HTML);
+ start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
vec_free (args.buf);
goto done;
}
if (is_encoded)
{
- u8 *decoded = http_percent_decode (args.buf);
+ u8 *decoded = http_percent_decode (args.buf, vec_len (args.buf));
vec_free (args.buf);
args.buf = decoded;
}
@@ -374,13 +421,13 @@ hcs_ts_rx_callback (session_t *ts)
ASSERT (rv == msg.data.headers_len);
if (http_parse_headers (headers, &ht))
{
- start_send_data (hs, HTTP_STATUS_BAD_REQUEST,
- HTTP_CONTENT_TEXT_HTML);
+ start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
vec_free (args.buf);
vec_free (headers);
goto done;
}
- const char *accept_value = http_get_header (ht, HTTP_HEADER_ACCEPT);
+ const char *accept_value =
+ http_get_header (ht, http_header_name_str (HTTP_HEADER_ACCEPT));
if (accept_value)
{
HCS_DBG ("client accept: %s", accept_value);
@@ -438,7 +485,7 @@ hcs_ts_tx_callback (session_t *ts)
}
if (svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
return 0;
}
@@ -554,6 +601,11 @@ hcs_attach ()
hcm->fifo_size ? hcm->fifo_size : 32 << 10;
a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos;
+ if (hcm->appns_id)
+ {
+ a->namespace_id = hcm->appns_id;
+ a->options[APP_OPTIONS_NAMESPACE_SECRET] = hcm->appns_secret;
+ }
if (vnet_application_attach (a))
{
@@ -588,15 +640,15 @@ hcs_listen ()
session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
hcs_main_t *hcm = &hcs_main;
vnet_listen_args_t _a, *a = &_a;
- char *uri = "tcp://0.0.0.0/80";
u8 need_crypto;
int rv;
+ char *uri;
clib_memset (a, 0, sizeof (*a));
a->app_index = hcm->app_index;
- if (hcm->uri)
- uri = (char *) hcm->uri;
+ uri = (char *) hcm->uri;
+ ASSERT (uri);
if (parse_uri (uri, &sep))
return -1;
@@ -608,15 +660,24 @@ hcs_listen ()
if (need_crypto)
{
- session_endpoint_alloc_ext_cfg (&a->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- a->sep_ext.ext_cfg->crypto.ckpair_index = hcm->ckpair_index;
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = hcm->ckpair_index;
}
rv = vnet_listen (a);
+ if (rv == 0)
+ {
+ hcs_uri_map_t *map;
+ pool_get_zero (hcm->uri_map_pool, map);
+ map->uri = vec_dup (uri);
+ map->handle = a->handle;
+ hash_set_mem (hcm->index_by_uri, map->uri, map - hcm->uri_map_pool);
+ }
if (need_crypto)
- clib_mem_free (a->sep_ext.ext_cfg);
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
return rv;
}
@@ -633,6 +694,43 @@ hcs_detach ()
}
static int
+hcs_unlisten ()
+{
+ hcs_main_t *hcm = &hcs_main;
+ vnet_unlisten_args_t _a, *a = &_a;
+ char *uri;
+ int rv = 0;
+ uword *value;
+
+ clib_memset (a, 0, sizeof (*a));
+ a->app_index = hcm->app_index;
+
+ uri = (char *) hcm->uri;
+ ASSERT (uri);
+
+ value = hash_get_mem (hcm->index_by_uri, uri);
+ if (value)
+ {
+ hcs_uri_map_t *map = pool_elt_at_index (hcm->uri_map_pool, *value);
+
+ a->handle = map->handle;
+ rv = vnet_unlisten (a);
+ if (rv == 0)
+ {
+ hash_unset_mem (hcm->index_by_uri, uri);
+ vec_free (map->uri);
+ pool_put (hcm->uri_map_pool, map);
+ if (pool_elts (hcm->uri_map_pool) == 0)
+ hcs_detach ();
+ }
+ }
+ else
+ return -1;
+
+ return rv;
+}
+
+static int
hcs_create (vlib_main_t *vm)
{
vlib_thread_main_t *vtm = vlib_get_thread_main ();
@@ -665,6 +763,8 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
hcs_main_t *hcm = &hcs_main;
u64 seg_size;
int rv;
+ u32 listener_add = ~0;
+ clib_error_t *error = 0;
hcm->prealloc_fifos = 0;
hcm->private_segment_size = 0;
@@ -683,13 +783,32 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
hcm->private_segment_size = seg_size;
else if (unformat (line_input, "fifo-size %d", &hcm->fifo_size))
hcm->fifo_size <<= 10;
- else if (unformat (line_input, "uri %s", &hcm->uri))
+ else if (unformat (line_input, "uri %_%v%_", &hcm->uri))
+ ;
+ else if (unformat (line_input, "appns %_%v%_", &hcm->appns_id))
;
+ else if (unformat (line_input, "secret %lu", &hcm->appns_secret))
+ ;
+ else if (unformat (line_input, "listener"))
+ {
+ if (unformat (line_input, "add"))
+ listener_add = 1;
+ else if (unformat (line_input, "del"))
+ listener_add = 0;
+ else
+ {
+ unformat_free (line_input);
+ error = clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, line_input);
+ goto done;
+ }
+ }
else
{
unformat_free (line_input);
- return clib_error_return (0, "unknown input `%U'",
- format_unformat_error, line_input);
+ error = clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, line_input);
+ goto done;
}
}
@@ -697,10 +816,43 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
start_server:
+ if (hcm->uri == 0)
+ hcm->uri = format (0, "tcp://0.0.0.0/80");
+
if (hcm->app_index != (u32) ~0)
- return clib_error_return (0, "test http server is already running");
+ {
+ if (hcm->appns_id && (listener_add != ~0))
+ {
+ error = clib_error_return (
+ 0, "appns must not be specified for listener add/del");
+ goto done;
+ }
+ if (listener_add == 1)
+ {
+ if (hcs_listen ())
+ error =
+ clib_error_return (0, "failed to start listening %v", hcm->uri);
+ goto done;
+ }
+ else if (listener_add == 0)
+ {
+ rv = hcs_unlisten ();
+ if (rv != 0)
+ error = clib_error_return (
+ 0, "failed to stop listening %v, rv = %d", hcm->uri, rv);
+ goto done;
+ }
+ else
+ {
+ error = clib_error_return (0, "test http server is already running");
+ goto done;
+ }
+ }
- vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vnet_session_enable_disable (vm, &args);
rv = hcs_create (vm);
switch (rv)
@@ -708,16 +860,23 @@ start_server:
case 0:
break;
default:
- return clib_error_return (0, "server_create returned %d", rv);
+ {
+ error = clib_error_return (0, "server_create returned %d", rv);
+ goto done;
+ }
}
- return 0;
+done:
+ vec_free (hcm->appns_id);
+ vec_free (hcm->uri);
+ return error;
}
VLIB_CLI_COMMAND (hcs_create_command, static) = {
.path = "http cli server",
.short_help = "http cli server [uri <uri>] [fifo-size <nbytes>] "
- "[private-segment-size <nMG>] [prealloc-fifos <n>]",
+ "[private-segment-size <nMG>] [prealloc-fifos <n>] "
+ "[listener <add|del>] [appns <app-ns> secret <appns-secret>]",
.function = hcs_create_command_fn,
};
@@ -728,6 +887,7 @@ hcs_main_init (vlib_main_t *vm)
hcs->app_index = ~0;
hcs->vlib_main = vm;
+ hcs->index_by_uri = hash_create_vec (0, sizeof (u8), sizeof (uword));
return 0;
}
diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c
new file mode 100644
index 00000000000..05a87ec7de8
--- /dev/null
+++ b/src/plugins/hs_apps/http_client.c
@@ -0,0 +1,743 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2024 Cisco Systems, Inc.
+ */
+
+#include <vnet/session/application.h>
+#include <vnet/session/application_interface.h>
+#include <vnet/session/session.h>
+#include <http/http.h>
+#include <http/http_header_names.h>
+#include <http/http_content_types.h>
+#include <http/http_status_codes.h>
+#include <vppinfra/unix.h>
+
+typedef struct
+{
+ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+ u32 session_index;
+ u32 thread_index;
+ u32 vpp_session_index;
+ u64 to_recv;
+ u8 is_closed;
+} hc_session_t;
+
+typedef struct
+{
+ hc_session_t *sessions;
+ u32 thread_index;
+ vlib_main_t *vlib_main;
+} hc_worker_t;
+
+typedef struct
+{
+ u32 app_index;
+ u32 cli_node_index;
+ u8 attached;
+ u8 *uri;
+ session_endpoint_cfg_t connect_sep;
+ u8 *target;
+ u8 *headers_buf;
+ u8 *data;
+ u64 data_offset;
+ hc_worker_t *wrk;
+ u8 *resp_headers;
+ u8 *http_response;
+ u8 *response_status;
+ http_header_ht_t *custom_header;
+ u8 is_file;
+ u8 use_ptr;
+ u8 *filename;
+ bool verbose;
+ f64 timeout;
+ http_req_method_t req_method;
+} hc_main_t;
+
+typedef enum
+{
+ HC_CONNECT_FAILED = 1,
+ HC_TRANSPORT_CLOSED,
+ HC_REPLY_RECEIVED,
+} hc_cli_signal_t;
+
+static hc_main_t hc_main;
+
+static inline hc_worker_t *
+hc_worker_get (u32 thread_index)
+{
+ return &hc_main.wrk[thread_index];
+}
+
+static inline hc_session_t *
+hc_session_get (u32 session_index, u32 thread_index)
+{
+ hc_worker_t *wrk = hc_worker_get (thread_index);
+ wrk->vlib_main = vlib_get_main_by_index (thread_index);
+ return pool_elt_at_index (wrk->sessions, session_index);
+}
+
+static void
+hc_ho_session_free (u32 hs_index)
+{
+ hc_worker_t *wrk = hc_worker_get (0);
+ pool_put_index (wrk->sessions, hs_index);
+}
+
+static hc_session_t *
+hc_session_alloc (hc_worker_t *wrk)
+{
+ hc_session_t *s;
+
+ pool_get_zero (wrk->sessions, s);
+ s->session_index = s - wrk->sessions;
+ s->thread_index = wrk->thread_index;
+
+ return s;
+}
+
+static int
+hc_session_connected_callback (u32 app_index, u32 hc_session_index,
+ session_t *s, session_error_t err)
+{
+ hc_main_t *hcm = &hc_main;
+ hc_session_t *hc_session, *new_hc_session;
+ hc_worker_t *wrk;
+ http_msg_t msg;
+ u64 to_send;
+ u32 n_enq;
+ u8 n_segs;
+ int rv;
+ http_header_ht_t *header;
+ http_header_t *req_headers = 0;
+ u32 new_hc_index;
+
+ HTTP_DBG (1, "ho hc_index: %d", hc_session_index);
+
+ if (err)
+ {
+ clib_warning ("hc_session_index[%d] connected error: %U",
+ hc_session_index, format_session_error, err);
+ vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index,
+ HC_CONNECT_FAILED, 0);
+ return -1;
+ }
+
+ hc_session = hc_session_get (hc_session_index, 0);
+ wrk = hc_worker_get (s->thread_index);
+ new_hc_session = hc_session_alloc (wrk);
+ new_hc_index = new_hc_session->session_index;
+ clib_memcpy_fast (new_hc_session, hc_session, sizeof (*hc_session));
+ hc_session->vpp_session_index = s->session_index;
+
+ new_hc_session->session_index = new_hc_index;
+ new_hc_session->thread_index = s->thread_index;
+ new_hc_session->vpp_session_index = s->session_index;
+ HTTP_DBG (1, "new hc_index: %d", new_hc_session->session_index);
+ s->opaque = new_hc_index;
+
+ if (hcm->req_method == HTTP_REQ_POST)
+ {
+ if (hcm->is_file)
+ http_add_header (
+ &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE),
+ http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
+ else
+ http_add_header (
+ &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE),
+ http_content_type_token (HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED));
+ }
+
+ vec_foreach (header, hcm->custom_header)
+ http_add_header (&req_headers, (const char *) header->name,
+ vec_len (header->name), (const char *) header->value,
+ vec_len (header->value));
+
+ hcm->headers_buf = http_serialize_headers (req_headers);
+ vec_free (req_headers);
+
+ msg.method_type = hcm->req_method;
+ if (hcm->req_method == HTTP_REQ_POST)
+ msg.data.body_len = vec_len (hcm->data);
+ else
+ msg.data.body_len = 0;
+
+ msg.type = HTTP_MSG_REQUEST;
+ /* request target */
+ msg.data.target_form = HTTP_TARGET_ORIGIN_FORM;
+ msg.data.target_path_len = vec_len (hcm->target);
+ /* custom headers */
+ msg.data.headers_len = vec_len (hcm->headers_buf);
+ /* total length */
+ msg.data.len =
+ msg.data.target_path_len + msg.data.headers_len + msg.data.body_len;
+
+ if (hcm->use_ptr)
+ {
+ uword target = pointer_to_uword (hcm->target);
+ uword headers = pointer_to_uword (hcm->headers_buf);
+ uword body = pointer_to_uword (hcm->data);
+ msg.data.type = HTTP_MSG_DATA_PTR;
+ svm_fifo_seg_t segs[4] = {
+ { (u8 *) &msg, sizeof (msg) },
+ { (u8 *) &target, sizeof (target) },
+ { (u8 *) &headers, sizeof (headers) },
+ { (u8 *) &body, sizeof (body) },
+ };
+
+ n_segs = (hcm->req_method == HTTP_REQ_GET) ? 3 : 4;
+ rv = svm_fifo_enqueue_segments (s->tx_fifo, segs, n_segs,
+ 0 /* allow partial */);
+ if (hcm->req_method == HTTP_REQ_POST)
+ ASSERT (rv == (sizeof (msg) + sizeof (target) + sizeof (headers) +
+ sizeof (body)));
+ else
+ ASSERT (rv == (sizeof (msg) + sizeof (target) + sizeof (headers)));
+ goto done;
+ }
+
+ msg.data.type = HTTP_MSG_DATA_INLINE;
+ msg.data.target_path_offset = 0;
+ msg.data.headers_offset = msg.data.target_path_len;
+ msg.data.body_offset = msg.data.headers_offset + msg.data.headers_len;
+
+ rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
+
+ rv = svm_fifo_enqueue (s->tx_fifo, vec_len (hcm->target), hcm->target);
+ ASSERT (rv == vec_len (hcm->target));
+
+ rv = svm_fifo_enqueue (s->tx_fifo, vec_len (hcm->headers_buf),
+ hcm->headers_buf);
+ ASSERT (rv == msg.data.headers_len);
+
+ if (hcm->req_method == HTTP_REQ_POST)
+ {
+ to_send = vec_len (hcm->data);
+ n_enq = clib_min (svm_fifo_size (s->tx_fifo), to_send);
+
+ rv = svm_fifo_enqueue (s->tx_fifo, n_enq, hcm->data);
+ if (rv < to_send)
+ {
+ hcm->data_offset = (rv > 0) ? rv : 0;
+ svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ }
+ }
+
+done:
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+
+ return 0;
+}
+
+static void
+hc_session_disconnect_callback (session_t *s)
+{
+ hc_main_t *hcm = &hc_main;
+ vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ int rv;
+
+ a->handle = session_handle (s);
+ a->app_index = hcm->app_index;
+ if ((rv = vnet_disconnect_session (a)))
+ clib_warning ("warning: disconnect returned: %U", format_session_error,
+ rv);
+}
+
+static void
+hc_session_transport_closed_callback (session_t *s)
+{
+ hc_main_t *hcm = &hc_main;
+ vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index,
+ HC_TRANSPORT_CLOSED, 0);
+}
+
+static void
+hc_ho_cleanup_callback (session_t *ts)
+{
+ HTTP_DBG (1, "ho hc_index: %d:", ts->opaque);
+ hc_ho_session_free (ts->opaque);
+}
+
+static void
+hc_session_reset_callback (session_t *s)
+{
+ hc_main_t *hcm = &hc_main;
+ hc_session_t *hc_session;
+ vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+ int rv;
+
+ hc_session = hc_session_get (s->opaque, s->thread_index);
+ hc_session->is_closed = 1;
+
+ a->handle = session_handle (s);
+ a->app_index = hcm->app_index;
+ if ((rv = vnet_disconnect_session (a)))
+ clib_warning ("warning: disconnect returned: %U", format_session_error,
+ rv);
+}
+
+static int
+hc_rx_callback (session_t *s)
+{
+ hc_main_t *hcm = &hc_main;
+ hc_session_t *hc_session;
+ http_msg_t msg;
+ int rv;
+
+ hc_session = hc_session_get (s->opaque, s->thread_index);
+
+ if (hc_session->is_closed)
+ {
+ clib_warning ("hc_session_index[%d] is closed", s->opaque);
+ return -1;
+ }
+
+ if (hc_session->to_recv == 0)
+ {
+ rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
+
+ if (msg.type != HTTP_MSG_REPLY)
+ {
+ clib_warning ("unexpected msg type %d", msg.type);
+ return -1;
+ }
+
+ if (msg.data.headers_len)
+ {
+ http_header_table_t *ht;
+ vec_validate (hcm->resp_headers, msg.data.headers_len - 1);
+ rv = svm_fifo_peek (s->rx_fifo, msg.data.headers_offset,
+ msg.data.headers_len, hcm->resp_headers);
+
+ ASSERT (rv == msg.data.headers_len);
+ HTTP_DBG (1, (char *) hcm->resp_headers);
+
+ if (http_parse_headers (hcm->resp_headers, &ht))
+ {
+ clib_warning ("invalid headers received");
+ return -1;
+ }
+ http_free_header_table (ht);
+
+ hcm->response_status =
+ format (0, "%U", format_http_status_code, msg.code);
+ }
+
+ if (msg.data.body_len == 0)
+ {
+ svm_fifo_dequeue_drop_all (s->rx_fifo);
+ goto done;
+ }
+
+ /* drop everything up to body */
+ svm_fifo_dequeue_drop (s->rx_fifo, msg.data.body_offset);
+ hc_session->to_recv = msg.data.body_len;
+ if (msg.code != HTTP_STATUS_OK && hc_session->to_recv == 0)
+ {
+ goto done;
+ }
+ vec_validate (hcm->http_response, msg.data.body_len - 1);
+ vec_reset_length (hcm->http_response);
+ }
+
+ u32 max_deq = svm_fifo_max_dequeue (s->rx_fifo);
+
+ u32 n_deq = clib_min (hc_session->to_recv, max_deq);
+ u32 curr = vec_len (hcm->http_response);
+ rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hcm->http_response + curr);
+ if (rv < 0)
+ {
+ clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv);
+ return -1;
+ }
+
+ ASSERT (rv == n_deq);
+ vec_set_len (hcm->http_response, curr + n_deq);
+ ASSERT (hc_session->to_recv >= rv);
+ hc_session->to_recv -= rv;
+
+done:
+ if (hc_session->to_recv == 0)
+ {
+ hc_session_disconnect_callback (s);
+ vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index,
+ HC_REPLY_RECEIVED, 0);
+ }
+
+ return 0;
+}
+
+static int
+hc_tx_callback (session_t *s)
+{
+ hc_main_t *hcm = &hc_main;
+ u64 to_send;
+ int rv;
+
+ to_send = vec_len (hcm->data) - hcm->data_offset;
+ rv = svm_fifo_enqueue (s->tx_fifo, to_send, hcm->data + hcm->data_offset);
+
+ if (rv <= 0)
+ {
+ svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ return 0;
+ }
+
+ if (rv < to_send)
+ {
+ hcm->data_offset += rv;
+ svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ }
+
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+
+ return 0;
+}
+
+static session_cb_vft_t hc_session_cb_vft = {
+ .session_connected_callback = hc_session_connected_callback,
+ .session_disconnect_callback = hc_session_disconnect_callback,
+ .session_transport_closed_callback = hc_session_transport_closed_callback,
+ .session_reset_callback = hc_session_reset_callback,
+ .builtin_app_rx_callback = hc_rx_callback,
+ .builtin_app_tx_callback = hc_tx_callback,
+ .half_open_cleanup_callback = hc_ho_cleanup_callback,
+};
+
+static clib_error_t *
+hc_attach ()
+{
+ hc_main_t *hcm = &hc_main;
+ vnet_app_attach_args_t _a, *a = &_a;
+ u64 options[18];
+ int rv;
+
+ clib_memset (a, 0, sizeof (*a));
+ clib_memset (options, 0, sizeof (options));
+
+ a->api_client_index = APP_INVALID_INDEX;
+ a->name = format (0, "http_client");
+ a->session_cb_vft = &hc_session_cb_vft;
+ a->options = options;
+ a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
+
+ if ((rv = vnet_application_attach (a)))
+ return clib_error_return (0, "attach returned: %U", format_session_error,
+ rv);
+
+ hcm->app_index = a->app_index;
+ vec_free (a->name);
+ hcm->attached = 1;
+
+ return 0;
+}
+
+static int
+hc_connect_rpc (void *rpc_args)
+{
+ vnet_connect_args_t *a = rpc_args;
+ int rv;
+
+ rv = vnet_connect (a);
+ if (rv > 0)
+ clib_warning (0, "connect returned: %U", format_session_error, rv);
+
+ vec_free (a);
+ return rv;
+}
+
+static void
+hc_connect ()
+{
+ hc_main_t *hcm = &hc_main;
+ vnet_connect_args_t *a = 0;
+ hc_worker_t *wrk;
+ hc_session_t *hc_session;
+
+ vec_validate (a, 0);
+ clib_memset (a, 0, sizeof (a[0]));
+
+ clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
+ a->app_index = hcm->app_index;
+
+ /* allocate http session on main thread */
+ wrk = hc_worker_get (0);
+ hc_session = hc_session_alloc (wrk);
+ a->api_context = hc_session->session_index;
+
+ session_send_rpc_evt_to_thread_force (transport_cl_thread (), hc_connect_rpc,
+ a);
+}
+
+static clib_error_t *
+hc_run (vlib_main_t *vm)
+{
+ hc_main_t *hcm = &hc_main;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u32 num_threads;
+ hc_worker_t *wrk;
+ uword event_type, *event_data = 0;
+ clib_error_t *err;
+ FILE *file_ptr;
+
+ num_threads = 1 /* main thread */ + vtm->n_threads;
+ vec_validate (hcm->wrk, num_threads - 1);
+ vec_foreach (wrk, hcm->wrk)
+ wrk->thread_index = wrk - hcm->wrk;
+
+ if ((err = hc_attach ()))
+ return clib_error_return (0, "http client attach: %U", format_clib_error,
+ err);
+
+ hc_connect ();
+
+ vlib_process_wait_for_event_or_clock (vm, hcm->timeout);
+ event_type = vlib_process_get_events (vm, &event_data);
+ switch (event_type)
+ {
+ case ~0:
+ err = clib_error_return (0, "error: timeout");
+ break;
+ case HC_CONNECT_FAILED:
+ err = clib_error_return (0, "error: failed to connect");
+ break;
+ case HC_TRANSPORT_CLOSED:
+ err = clib_error_return (0, "error: transport closed");
+ break;
+ case HC_REPLY_RECEIVED:
+ if (hcm->filename)
+ {
+ file_ptr =
+ fopen ((char *) format (0, "/tmp/%v", hcm->filename), "w");
+ if (file_ptr == NULL)
+ {
+ vlib_cli_output (vm, "couldn't open file %v", hcm->filename);
+ }
+ else
+ {
+ fprintf (file_ptr, "< %s\n< %s\n< %s", hcm->response_status,
+ hcm->resp_headers, hcm->http_response);
+ fclose (file_ptr);
+ vlib_cli_output (vm, "file saved (/tmp/%v)", hcm->filename);
+ }
+ }
+ if (hcm->verbose)
+ vlib_cli_output (vm, "< %v\n< %v", hcm->response_status,
+ hcm->resp_headers);
+ vlib_cli_output (vm, "<\n%v", hcm->http_response);
+
+ break;
+ default:
+ err = clib_error_return (0, "error: unexpected event %d", event_type);
+ break;
+ }
+
+ vec_free (event_data);
+ return err;
+}
+
+static int
+hc_detach ()
+{
+ hc_main_t *hcm = &hc_main;
+ vnet_app_detach_args_t _da, *da = &_da;
+ int rv;
+
+ if (!hcm->attached)
+ return 0;
+
+ da->app_index = hcm->app_index;
+ da->api_client_index = APP_INVALID_INDEX;
+ rv = vnet_application_detach (da);
+ hcm->attached = 0;
+ hcm->app_index = APP_INVALID_INDEX;
+
+ return rv;
+}
+
+static void
+hcc_worker_cleanup (hc_worker_t *wrk)
+{
+ pool_free (wrk->sessions);
+}
+
+static void
+hc_cleanup ()
+{
+ hc_main_t *hcm = &hc_main;
+ hc_worker_t *wrk;
+ http_header_ht_t *header;
+
+ vec_foreach (wrk, hcm->wrk)
+ hcc_worker_cleanup (wrk);
+
+ vec_free (hcm->uri);
+ vec_free (hcm->target);
+ vec_free (hcm->headers_buf);
+ vec_free (hcm->data);
+ vec_free (hcm->resp_headers);
+ vec_free (hcm->http_response);
+ vec_free (hcm->response_status);
+ vec_free (hcm->wrk);
+ vec_free (hcm->filename);
+ vec_foreach (header, hcm->custom_header)
+ {
+ vec_free (header->name);
+ vec_free (header->value);
+ }
+ vec_free (hcm->custom_header);
+}
+
+static clib_error_t *
+hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ hc_main_t *hcm = &hc_main;
+ clib_error_t *err = 0;
+ unformat_input_t _line_input, *line_input = &_line_input;
+ u8 *path = 0;
+ u8 *file_data;
+ http_header_ht_t new_header;
+ u8 *name;
+ u8 *value;
+ int rv;
+ hcm->timeout = 10;
+
+ if (hcm->attached)
+ return clib_error_return (0, "failed: already running!");
+
+ hcm->use_ptr = 0;
+
+ /* Get a line of input. */
+ if (!unformat_user (input, unformat_line_input, line_input))
+ return clib_error_return (0, "expected required arguments");
+
+ hcm->req_method =
+ (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) &&
+ unformat (line_input, "post") ?
+ HTTP_REQ_POST :
+ HTTP_REQ_GET;
+
+ while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (line_input, "uri %s", &hcm->uri))
+ ;
+ else if (unformat (line_input, "data %v", &hcm->data))
+ hcm->is_file = 0;
+ else if (unformat (line_input, "target %s", &hcm->target))
+ ;
+ else if (unformat (line_input, "file %s", &path))
+ hcm->is_file = 1;
+ else if (unformat (line_input, "use-ptr"))
+ hcm->use_ptr = 1;
+ else if (unformat (line_input, "save-to %s", &hcm->filename))
+ {
+ if (strstr ((char *) hcm->filename, "..") ||
+ strchr ((char *) hcm->filename, '/'))
+ {
+ err = clib_error_return (
+ 0, "illegal characters in filename '%v'", hcm->filename);
+ goto done;
+ }
+ }
+ else if (unformat (line_input, "header %v:%v", &name, &value))
+ {
+ new_header.name = name;
+ new_header.value = value;
+ vec_add1 (hcm->custom_header, new_header);
+ }
+ else if (unformat (line_input, "verbose"))
+ hcm->verbose = true;
+ else if (unformat (line_input, "timeout %f", &hcm->timeout))
+ ;
+ else
+ {
+ err = clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, line_input);
+ goto done;
+ }
+ }
+
+ if (!hcm->uri)
+ {
+ err = clib_error_return (0, "URI not defined");
+ goto done;
+ }
+ if (!hcm->target)
+ {
+ err = clib_error_return (0, "target not defined");
+ goto done;
+ }
+ if (!hcm->data && hcm->req_method == HTTP_REQ_POST)
+ {
+ if (path)
+ {
+ err = clib_file_contents ((char *) path, &file_data);
+ if (err)
+ goto done;
+ hcm->data = file_data;
+ }
+ else
+ {
+ err = clib_error_return (0, "data not defined");
+ goto done;
+ }
+ }
+
+ if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep)))
+ {
+ err =
+ clib_error_return (0, "URI parse error: %U", format_session_error, rv);
+ goto done;
+ }
+
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vlib_worker_thread_barrier_sync (vm);
+ vnet_session_enable_disable (vm, &args);
+ vlib_worker_thread_barrier_release (vm);
+
+ hcm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
+
+ err = hc_run (vm);
+
+ if ((rv = hc_detach ()))
+ {
+ /* don't override last error */
+ if (!err)
+ err = clib_error_return (0, "detach returned: %U",
+ format_session_error, rv);
+ else
+ clib_warning ("warning: detach returned: %U", format_session_error,
+ rv);
+ }
+
+done:
+ vec_free (path);
+ hc_cleanup ();
+ unformat_free (line_input);
+ return err;
+}
+
+VLIB_CLI_COMMAND (hc_command, static) = {
+ .path = "http client",
+ .short_help = "[post] uri http://<ip-addr> target <origin-form> "
+ "[data <form-urlencoded> | file <file-path>] [use-ptr] "
+ "[save-to <filename>] [header <Key:Value>] [verbose] "
+ "[timeout <seconds> (default = 10)]",
+ .function = hc_command_fn,
+ .is_mp_safe = 1,
+};
+
+static clib_error_t *
+hc_main_init ()
+{
+ hc_main_t *hcm = &hc_main;
+ hcm->app_index = APP_INVALID_INDEX;
+ return 0;
+}
+
+VLIB_INIT_FUNCTION (hc_main_init);
diff --git a/src/plugins/hs_apps/http_client_cli.c b/src/plugins/hs_apps/http_client_cli.c
index a99169bafea..861af7f03e2 100644
--- a/src/plugins/hs_apps/http_client_cli.c
+++ b/src/plugins/hs_apps/http_client_cli.c
@@ -16,6 +16,9 @@
#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
#include <http/http.h>
+#include <http/http_header_names.h>
+#include <http/http_content_types.h>
+#include <http/http_status_codes.h>
#define HCC_DEBUG 0
@@ -32,14 +35,14 @@ typedef struct
u32 thread_index;
u32 rx_offset;
u32 vpp_session_index;
- u32 to_recv;
+ u64 to_recv;
u8 is_closed;
+ http_header_t *req_headers;
} hcc_session_t;
typedef struct
{
hcc_session_t *sessions;
- u8 *rx_buf;
u32 thread_index;
} hcc_worker_t;
@@ -96,10 +99,10 @@ hcc_session_get (u32 hs_index, u32 thread_index)
}
static void
-hcc_session_free (u32 thread_index, hcc_session_t *hs)
+hcc_ho_session_free (u32 hs_index)
{
- hcc_worker_t *wrk = hcc_worker_get (thread_index);
- pool_put (wrk->sessions, hs);
+ hcc_worker_t *wrk = hcc_worker_get (0);
+ pool_put_index (wrk->sessions, hs_index);
}
static int
@@ -128,9 +131,11 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as,
hcc_session_t *hs, *new_hs;
hcc_worker_t *wrk;
http_msg_t msg;
+ u8 *headers_buf;
+ u32 new_hs_index;
int rv;
- HCC_DBG ("hc_index: %d", hc_index);
+ HCC_DBG ("ho hc_index: %d", hc_index);
if (err)
{
@@ -141,32 +146,53 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as,
return -1;
}
- /* TODO delete half open session once the support is added in http layer */
hs = hcc_session_get (hc_index, 0);
wrk = hcc_worker_get (as->thread_index);
new_hs = hcc_session_alloc (wrk);
+ new_hs_index = new_hs->session_index;
clib_memcpy_fast (new_hs, hs, sizeof (*hs));
-
- hs->vpp_session_index = as->session_index;
+ new_hs->session_index = new_hs_index;
+ new_hs->thread_index = as->thread_index;
+ new_hs->vpp_session_index = as->session_index;
+ HCC_DBG ("new hc_index: %d", new_hs->session_index);
+ as->opaque = new_hs_index;
+
+ http_add_header (&new_hs->req_headers,
+ http_header_name_token (HTTP_HEADER_ACCEPT),
+ http_content_type_token (HTTP_CONTENT_TEXT_HTML));
+ headers_buf = http_serialize_headers (new_hs->req_headers);
+ vec_free (new_hs->req_headers);
msg.type = HTTP_MSG_REQUEST;
msg.method_type = HTTP_REQ_GET;
- msg.content_type = HTTP_CONTENT_TEXT_HTML;
+ /* request target */
+ msg.data.target_form = HTTP_TARGET_ORIGIN_FORM;
+ msg.data.target_path_offset = 0;
+ msg.data.target_path_len = vec_len (hcm->http_query);
+ /* custom headers */
+ msg.data.headers_offset = msg.data.target_path_len;
+ msg.data.headers_len = vec_len (headers_buf);
+ /* request body */
+ msg.data.body_len = 0;
+ /* data type and total length */
msg.data.type = HTTP_MSG_DATA_INLINE;
- msg.data.len = vec_len (hcm->http_query);
+ msg.data.len =
+ msg.data.target_path_len + msg.data.headers_len + msg.data.body_len;
- svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) },
- { hcm->http_query, vec_len (hcm->http_query) } };
+ svm_fifo_seg_t segs[3] = { { (u8 *) &msg, sizeof (msg) },
+ { hcm->http_query, vec_len (hcm->http_query) },
+ { headers_buf, vec_len (headers_buf) } };
- rv = svm_fifo_enqueue_segments (as->tx_fifo, segs, 2, 0 /* allow partial */);
- if (rv < 0 || rv != sizeof (msg) + vec_len (hcm->http_query))
+ rv = svm_fifo_enqueue_segments (as->tx_fifo, segs, 3, 0 /* allow partial */);
+ vec_free (headers_buf);
+ if (rv < 0 || rv != sizeof (msg) + msg.data.len)
{
clib_warning ("failed app enqueue");
return -1;
}
if (svm_fifo_set_event (as->tx_fifo))
- session_send_io_evt_to_thread (as->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (as->handle, SESSION_IO_EVT_TX);
return 0;
}
@@ -221,23 +247,32 @@ hcc_ts_rx_callback (session_t *ts)
if (hs->to_recv == 0)
{
+ /* read the http message header */
rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
- if (msg.type != HTTP_MSG_REPLY || msg.code != HTTP_STATUS_OK)
+ if (msg.type != HTTP_MSG_REPLY)
{
clib_warning ("unexpected msg type %d", msg.type);
return 0;
}
- vec_validate (hcm->http_response, msg.data.len - 1);
+ /* drop everything up to body */
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+ hs->to_recv = msg.data.body_len;
+ if (msg.code != HTTP_STATUS_OK && hs->to_recv == 0)
+ {
+ hcm->http_response = format (0, "request failed, response code: %U",
+ format_http_status_code, msg.code);
+ goto done;
+ }
+ vec_validate (hcm->http_response, msg.data.body_len - 1);
vec_reset_length (hcm->http_response);
- hs->to_recv = msg.data.len;
}
u32 max_deq = svm_fifo_max_dequeue (ts->rx_fifo);
u32 n_deq = clib_min (hs->to_recv, max_deq);
- u32 curr = vec_len (hcm->http_response);
+ u64 curr = vec_len (hcm->http_response);
rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hcm->http_response + curr);
if (rv < 0)
{
@@ -251,10 +286,12 @@ hcc_ts_rx_callback (session_t *ts)
vec_set_len (hcm->http_response, curr + n_deq);
ASSERT (hs->to_recv >= rv);
hs->to_recv -= rv;
- HCC_DBG ("app rcvd %d, remains %d", rv, hs->to_recv);
+ HCC_DBG ("app rcvd %d, remains %llu", rv, hs->to_recv);
+done:
if (hs->to_recv == 0)
{
+ HCC_DBG ("all data received, going to disconnect");
hcc_session_disconnect (ts);
vlib_process_signal_event_mt (hcm->vlib_main, hcm->cli_node_index,
HCC_REPLY_RECEIVED, 0);
@@ -264,18 +301,6 @@ hcc_ts_rx_callback (session_t *ts)
}
static void
-hcc_ts_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
-{
- hcc_session_t *hs;
-
- hs = hcc_session_get (s->thread_index, s->opaque);
- if (!hs)
- return;
-
- hcc_session_free (s->thread_index, hs);
-}
-
-static void
hcc_ts_transport_closed (session_t *s)
{
hcc_main_t *hcm = &hcc_main;
@@ -286,6 +311,13 @@ hcc_ts_transport_closed (session_t *s)
HCC_TRANSPORT_CLOSED, 0);
}
+static void
+hcc_ho_cleanup_callback (session_t *ts)
+{
+ HCC_DBG ("ho hc_index: %d:", ts->opaque);
+ hcc_ho_session_free (ts->opaque);
+}
+
static session_cb_vft_t hcc_session_cb_vft = {
.session_accept_callback = hcc_ts_accept_callback,
.session_disconnect_callback = hcc_ts_disconnect_callback,
@@ -293,8 +325,8 @@ static session_cb_vft_t hcc_session_cb_vft = {
.builtin_app_rx_callback = hcc_ts_rx_callback,
.builtin_app_tx_callback = hcc_ts_tx_callback,
.session_reset_callback = hcc_ts_reset_callback,
- .session_cleanup_callback = hcc_ts_cleanup_callback,
.session_transport_closed_callback = hcc_ts_transport_closed,
+ .half_open_cleanup_callback = hcc_ho_cleanup_callback,
};
static clib_error_t *
@@ -349,6 +381,7 @@ hcc_connect_rpc (void *rpc_args)
if (rv)
clib_warning (0, "connect returned: %U", format_session_error, rv);
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
vec_free (a);
return rv;
}
@@ -367,6 +400,7 @@ hcc_connect ()
hcc_main_t *hcm = &hcc_main;
hcc_worker_t *wrk;
hcc_session_t *hs;
+ transport_endpt_ext_cfg_t *ext_cfg;
vec_validate (a, 0);
clib_memset (a, 0, sizeof (a[0]));
@@ -374,6 +408,11 @@ hcc_connect ()
clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
a->app_index = hcm->app_index;
+ /* set http (response) timeout to 10 seconds */
+ ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (ext_cfg->opaque));
+ ext_cfg->opaque = 10;
+
/* allocate http session on main thread */
wrk = hcc_worker_get (0);
hs = hcc_session_alloc (wrk);
@@ -394,7 +433,7 @@ hcc_run (vlib_main_t *vm, int print_output)
hcc_worker_t *wrk;
num_threads = 1 /* main thread */ + vtm->n_threads;
- vec_validate (hcm->wrk, num_threads);
+ vec_validate (hcm->wrk, num_threads - 1);
vec_foreach (wrk, hcm->wrk)
{
wrk->thread_index = wrk - hcm->wrk;
@@ -423,7 +462,6 @@ hcc_run (vlib_main_t *vm, int print_output)
case HCC_REPLY_RECEIVED:
if (print_output)
vlib_cli_output (vm, "%v", hcm->http_response);
- vec_free (hcm->http_response);
break;
case HCC_TRANSPORT_CLOSED:
err = clib_error_return (0, "error, transport closed");
@@ -460,6 +498,28 @@ hcc_detach ()
return rv;
}
+static void
+hcc_worker_cleanup (hcc_worker_t *wrk)
+{
+ pool_free (wrk->sessions);
+}
+
+static void
+hcc_cleanup ()
+{
+ hcc_main_t *hcm = &hcc_main;
+ hcc_worker_t *wrk;
+
+ vec_foreach (wrk, hcm->wrk)
+ hcc_worker_cleanup (wrk);
+
+ vec_free (hcm->uri);
+ vec_free (hcm->http_query);
+ vec_free (hcm->http_response);
+ vec_free (hcm->appns_id);
+ vec_free (hcm->wrk);
+}
+
static clib_error_t *
hcc_command_fn (vlib_main_t *vm, unformat_input_t *input,
vlib_cli_command_t *cmd)
@@ -509,7 +569,6 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input,
}
}
- vec_free (hcm->appns_id);
hcm->appns_id = appns_id;
hcm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index;
@@ -525,8 +584,11 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input,
goto done;
}
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
vlib_worker_thread_barrier_sync (vm);
- vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */);
+ vnet_session_enable_disable (vm, &args);
vlib_worker_thread_barrier_release (vm);
err = hcc_run (vm, print_output);
@@ -540,8 +602,7 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input,
}
done:
- vec_free (hcm->uri);
- vec_free (hcm->http_query);
+ hcc_cleanup ();
unformat_free (line_input);
return err;
}
diff --git a/src/plugins/hs_apps/http_tps.c b/src/plugins/hs_apps/http_tps.c
index 3a086501f86..a40a31caf63 100644
--- a/src/plugins/hs_apps/http_tps.c
+++ b/src/plugins/hs_apps/http_tps.c
@@ -17,6 +17,10 @@
#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
#include <http/http.h>
+#include <http/http_header_names.h>
+#include <http/http_content_types.h>
+
+#define HTS_RX_BUF_SIZE (64 << 10)
typedef struct
{
@@ -26,6 +30,8 @@ typedef struct
u64 data_len;
u64 data_offset;
u32 vpp_session_index;
+ u64 left_recv;
+ u64 total_recv;
union
{
/** threshold after which connection is closed */
@@ -34,6 +40,8 @@ typedef struct
u32 close_rate;
};
u8 *uri;
+ u8 *rx_buf;
+ http_header_t *resp_headers;
} hts_session_t;
typedef struct hts_listen_cfg_
@@ -102,6 +110,8 @@ hts_session_free (hts_session_t *hs)
if (htm->debug_level > 0)
clib_warning ("Freeing session %u", hs->session_index);
+ vec_free (hs->rx_buf);
+
if (CLIB_DEBUG)
clib_memset (hs, 0xfa, sizeof (*hs));
@@ -151,7 +161,7 @@ hts_session_tx_zc (hts_session_t *hs, session_t *ts)
svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
}
static void
@@ -198,7 +208,7 @@ hts_session_tx_no_zc (hts_session_t *hs, session_t *ts)
svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
}
static inline void
@@ -223,22 +233,46 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
{
http_msg_t msg;
session_t *ts;
+ u8 *headers_buf = 0;
+ u32 n_segs = 1;
+ svm_fifo_seg_t seg[2];
int rv;
+ if (vec_len (hs->resp_headers))
+ {
+ headers_buf = http_serialize_headers (hs->resp_headers);
+ vec_free (hs->resp_headers);
+ msg.data.headers_offset = 0;
+ msg.data.headers_len = vec_len (headers_buf);
+ seg[1].data = headers_buf;
+ seg[1].len = msg.data.headers_len;
+ n_segs = 2;
+ }
+ else
+ {
+ msg.data.headers_offset = 0;
+ msg.data.headers_len = 0;
+ }
+
msg.type = HTTP_MSG_REPLY;
msg.code = status;
- msg.content_type = HTTP_CONTENT_APP_OCTET_STREAM;
msg.data.type = HTTP_MSG_DATA_INLINE;
- msg.data.len = hs->data_len;
+ msg.data.body_len = hs->data_len;
+ msg.data.body_offset = msg.data.headers_len;
+ msg.data.len = msg.data.body_len + msg.data.headers_len;
+ seg[0].data = (u8 *) &msg;
+ seg[0].len = sizeof (msg);
ts = session_get (hs->vpp_session_index, hs->thread_index);
- rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg);
- ASSERT (rv == sizeof (msg));
+ rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
+ 0 /* allow partial */);
+ vec_free (headers_buf);
+ ASSERT (rv == (sizeof (msg) + msg.data.headers_len));
- if (!msg.data.len)
+ if (!msg.data.body_len)
{
if (svm_fifo_set_event (ts->tx_fifo))
- session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX);
+ session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX);
return;
}
@@ -286,6 +320,10 @@ try_test_file (hts_session_t *hs, u8 *target)
}
}
+ http_add_header (&hs->resp_headers,
+ http_header_name_token (HTTP_HEADER_CONTENT_TYPE),
+ http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
+
hts_start_send_data (hs, HTTP_STATUS_OK);
done:
@@ -294,6 +332,40 @@ done:
return rc;
}
+static inline void
+hts_session_rx_body (hts_session_t *hs, session_t *ts)
+{
+ hts_main_t *htm = &hts_main;
+ u32 n_deq;
+ int rv;
+
+ n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+ if (!htm->no_zc)
+ {
+ svm_fifo_dequeue_drop_all (ts->rx_fifo);
+ }
+ else
+ {
+ n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE);
+ rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf);
+ ASSERT (rv == n_deq);
+ }
+ hs->left_recv -= n_deq;
+
+ if (hs->close_threshold > 0)
+ {
+ if ((f64) (hs->total_recv - hs->left_recv) / hs->total_recv >
+ hs->close_threshold)
+ hts_disconnect_transport (hs);
+ }
+
+ if (hs->left_recv == 0)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_OK);
+ vec_free (hs->rx_buf);
+ }
+}
+
static int
hts_ts_rx_callback (session_t *ts)
{
@@ -305,38 +377,77 @@ hts_ts_rx_callback (session_t *ts)
hs = hts_session_get (ts->thread_index, ts->opaque);
- /* Read the http message header */
- rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
- ASSERT (rv == sizeof (msg));
-
- if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET)
+ if (hs->left_recv == 0)
{
- hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
- goto done;
- }
+ hs->data_len = 0;
+ hs->resp_headers = 0;
+ hs->rx_buf = 0;
- if (msg.data.target_path_len == 0 ||
- msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
- {
- hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
- goto done;
- }
+ /* Read the http message header */
+ rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
- vec_validate (target, msg.data.target_path_len - 1);
- rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
- msg.data.target_path_len, target);
- ASSERT (rv == msg.data.target_path_len);
+ if (msg.type != HTTP_MSG_REQUEST)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR);
+ goto done;
+ }
+ if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST)
+ {
+ http_add_header (&hs->resp_headers,
+ http_header_name_token (HTTP_HEADER_ALLOW),
+ http_token_lit ("GET, POST"));
+ hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
+ goto done;
+ }
- if (htm->debug_level)
- clib_warning ("Request target: %v", target);
+ if (msg.data.target_path_len == 0 ||
+ msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+ goto done;
+ }
- if (try_test_file (hs, target))
- hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+ vec_validate (target, msg.data.target_path_len - 1);
+ rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
+ msg.data.target_path_len, target);
+ ASSERT (rv == msg.data.target_path_len);
- vec_free (target);
+ if (htm->debug_level)
+ clib_warning ("%s request target: %v",
+ msg.method_type == HTTP_REQ_GET ? "GET" : "POST",
+ target);
+
+ if (msg.method_type == HTTP_REQ_GET)
+ {
+ if (try_test_file (hs, target))
+ hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+ vec_free (target);
+ }
+ else
+ {
+ vec_free (target);
+ if (!msg.data.body_len)
+ {
+ hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+ goto done;
+ }
+ /* drop everything up to body */
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+ hs->left_recv = msg.data.body_len;
+ hs->total_recv = msg.data.body_len;
+ if (htm->no_zc)
+ vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1);
+ hts_session_rx_body (hs, ts);
+ return 0;
+ }
+
+ done:
+ svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
+ }
+ else
+ hts_session_rx_body (hs, ts);
-done:
- svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
return 0;
}
@@ -363,6 +474,7 @@ hts_ts_accept_callback (session_t *ts)
hs = hts_session_alloc (ts->thread_index);
hs->vpp_session_index = ts->session_index;
+ hs->left_recv = 0;
ts->opaque = hs->session_index;
ts->session_state = SESSION_STATE_READY;
@@ -529,15 +641,16 @@ hts_start_listen (hts_main_t *htm, session_endpoint_cfg_t *sep, u8 *uri,
if (need_crypto)
{
- session_endpoint_alloc_ext_cfg (&a->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- a->sep_ext.ext_cfg->crypto.ckpair_index = htm->ckpair_index;
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = htm->ckpair_index;
}
rv = vnet_listen (a);
if (need_crypto)
- clib_mem_free (a->sep_ext.ext_cfg);
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
if (rv)
return rv;
@@ -726,7 +839,10 @@ start_server:
if (htm->app_index == (u32) ~0)
{
- vnet_session_enable_disable (vm, 1 /* is_enable */);
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vnet_session_enable_disable (vm, &args);
if (hts_create (vm))
{
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c
index e8fedf921a5..d7fe6fb54df 100644
--- a/src/plugins/hs_apps/proxy.c
+++ b/src/plugins/hs_apps/proxy.c
@@ -19,50 +19,145 @@
#include <vnet/session/application_interface.h>
#include <hs_apps/proxy.h>
#include <vnet/tcp/tcp.h>
+#include <http/http.h>
+#include <http/http_header_names.h>
proxy_main_t proxy_main;
#define TCP_MSS 1460
-typedef struct
+static proxy_session_side_ctx_t *
+proxy_session_side_ctx_alloc (proxy_worker_t *wrk)
{
- session_endpoint_cfg_t sep;
- u32 app_index;
- u32 api_context;
-} proxy_connect_args_t;
+ proxy_session_side_ctx_t *ctx;
+
+ pool_get_zero (wrk->ctx_pool, ctx);
+ ctx->sc_index = ctx - wrk->ctx_pool;
+ ctx->ps_index = ~0;
+
+ return ctx;
+}
static void
-proxy_cb_fn (void *data, u32 data_len)
+proxy_session_side_ctx_free (proxy_worker_t *wrk,
+ proxy_session_side_ctx_t *ctx)
{
- proxy_connect_args_t *pa = (proxy_connect_args_t *) data;
- vnet_connect_args_t a;
+ pool_put (wrk->ctx_pool, ctx);
+}
- clib_memset (&a, 0, sizeof (a));
- a.api_context = pa->api_context;
- a.app_index = pa->app_index;
- clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep));
- vnet_connect (&a);
- if (a.sep_ext.ext_cfg)
- clib_mem_free (a.sep_ext.ext_cfg);
+static proxy_session_side_ctx_t *
+proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index)
+{
+ return pool_elt_at_index (wrk->ctx_pool, ctx_index);
}
static void
-proxy_call_main_thread (vnet_connect_args_t * a)
+proxy_send_http_resp (session_t *s, http_status_code_t sc,
+ http_header_t *resp_headers)
{
- if (vlib_get_thread_index () == 0)
+ http_msg_t msg;
+ int rv;
+ u8 *headers_buf = 0;
+
+ if (vec_len (resp_headers))
{
- vnet_connect (a);
- if (a->sep_ext.ext_cfg)
- clib_mem_free (a->sep_ext.ext_cfg);
+ headers_buf = http_serialize_headers (resp_headers);
+ msg.data.len = msg.data.headers_len = vec_len (headers_buf);
}
else
+ msg.data.len = msg.data.headers_len = 0;
+
+ msg.type = HTTP_MSG_REPLY;
+ msg.code = sc;
+ msg.data.type = HTTP_MSG_DATA_INLINE;
+ msg.data.headers_offset = 0;
+ msg.data.body_len = 0;
+ msg.data.body_offset = 0;
+ rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
+ if (msg.data.headers_len)
+ {
+ rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf);
+ ASSERT (rv == vec_len (headers_buf));
+ vec_free (headers_buf);
+ }
+
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+}
+
+static void
+proxy_do_connect (vnet_connect_args_t *a)
+{
+ ASSERT (session_vlib_thread_is_cl_thread ());
+ vnet_connect (a);
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
+}
+
+static void
+proxy_handle_connects_rpc (void *args)
+{
+ u32 thread_index = pointer_to_uword (args), n_connects = 0, n_pending;
+ proxy_worker_t *wrk;
+ u32 max_connects;
+
+ wrk = proxy_worker_get (thread_index);
+
+ clib_spinlock_lock (&wrk->pending_connects_lock);
+
+ n_pending = clib_fifo_elts (wrk->pending_connects);
+ max_connects = clib_min (32, n_pending);
+ vec_validate (wrk->burst_connects, max_connects);
+
+ while (n_connects < max_connects)
+ clib_fifo_sub1 (wrk->pending_connects, wrk->burst_connects[n_connects++]);
+
+ clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+ /* Do connects without locking pending_connects */
+ n_connects = 0;
+ while (n_connects < max_connects)
{
- proxy_connect_args_t args;
- args.api_context = a->api_context;
- args.app_index = a->app_index;
- clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext));
- vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args));
+ proxy_do_connect (&wrk->burst_connects[n_connects]);
+ n_connects += 1;
}
+
+ /* More work to do, program rpc */
+ if (max_connects < n_pending)
+ session_send_rpc_evt_to_thread_force (
+ transport_cl_thread (), proxy_handle_connects_rpc,
+ uword_to_pointer ((uword) thread_index, void *));
+}
+
+static void
+proxy_program_connect (vnet_connect_args_t *a)
+{
+ u32 connects_thread = transport_cl_thread (), thread_index, n_pending;
+ proxy_worker_t *wrk;
+
+ thread_index = vlib_get_thread_index ();
+
+ /* If already on first worker, handle request */
+ if (thread_index == connects_thread)
+ {
+ proxy_do_connect (a);
+ return;
+ }
+
+ /* If not on first worker, queue request */
+ wrk = proxy_worker_get (thread_index);
+
+ clib_spinlock_lock (&wrk->pending_connects_lock);
+
+ clib_fifo_add1 (wrk->pending_connects, *a);
+ n_pending = clib_fifo_elts (wrk->pending_connects);
+
+ clib_spinlock_unlock (&wrk->pending_connects_lock);
+
+ if (n_pending == 1)
+ session_send_rpc_evt_to_thread_force (
+ connects_thread, proxy_handle_connects_rpc,
+ uword_to_pointer ((uword) thread_index, void *));
}
static proxy_session_t *
@@ -85,16 +180,6 @@ proxy_session_get (u32 ps_index)
return pool_elt_at_index (pm->sessions, ps_index);
}
-static inline proxy_session_t *
-proxy_session_get_if_valid (u32 ps_index)
-{
- proxy_main_t *pm = &proxy_main;
-
- if (pool_is_free_index (pm->sessions, ps_index))
- return 0;
- return pool_elt_at_index (pm->sessions, ps_index);
-}
-
static void
proxy_session_free (proxy_session_t *ps)
{
@@ -115,7 +200,7 @@ proxy_session_postponed_free_rpc (void *arg)
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (ps_index);
- segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo);
+ segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo);
proxy_session_free (ps);
clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -126,54 +211,79 @@ proxy_session_postponed_free_rpc (void *arg)
static void
proxy_session_postponed_free (proxy_session_t *ps)
{
- session_send_rpc_evt_to_thread (ps->po_thread_index,
+ /* Passive open session handle has been invalidated so we don't have thread
+ * index at this point */
+ session_send_rpc_evt_to_thread (ps->po.rx_fifo->master_thread_index,
proxy_session_postponed_free_rpc,
uword_to_pointer (ps->ps_index, void *));
}
static void
+proxy_session_close_po (proxy_session_t *ps)
+{
+ vnet_disconnect_args_t _a = {}, *a = &_a;
+ proxy_main_t *pm = &proxy_main;
+
+ ASSERT (!vlib_num_workers () ||
+ CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock));
+
+ a->handle = ps->po.session_handle;
+ a->app_index = pm->server_app_index;
+ vnet_disconnect_session (a);
+
+ ps->po_disconnected = 1;
+}
+
+static void
+proxy_session_close_ao (proxy_session_t *ps)
+{
+ vnet_disconnect_args_t _a = {}, *a = &_a;
+ proxy_main_t *pm = &proxy_main;
+
+ ASSERT (!vlib_num_workers () ||
+ CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock));
+
+ a->handle = ps->ao.session_handle;
+ a->app_index = pm->active_open_app_index;
+ vnet_disconnect_session (a);
+
+ ps->ao_disconnected = 1;
+}
+
+static void
proxy_try_close_session (session_t * s, int is_active_open)
{
proxy_main_t *pm = &proxy_main;
- proxy_session_t *ps = 0;
- vnet_disconnect_args_t _a, *a = &_a;
+ proxy_session_side_ctx_t *sc;
+ proxy_session_t *ps;
+ proxy_worker_t *wrk;
+
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, s->opaque);
clib_spinlock_lock_if_init (&pm->sessions_lock);
- ps = proxy_session_get (s->opaque);
+ ps = proxy_session_get (sc->ps_index);
if (is_active_open)
{
- a->handle = ps->vpp_active_open_handle;
- a->app_index = pm->active_open_app_index;
- vnet_disconnect_session (a);
- ps->ao_disconnected = 1;
+ proxy_session_close_ao (ps);
if (!ps->po_disconnected)
{
- ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE);
- a->handle = ps->vpp_server_handle;
- a->app_index = pm->server_app_index;
- vnet_disconnect_session (a);
- ps->po_disconnected = 1;
+ ASSERT (ps->po.session_handle != SESSION_INVALID_HANDLE);
+ proxy_session_close_po (ps);
}
}
else
{
- a->handle = ps->vpp_server_handle;
- a->app_index = pm->server_app_index;
- vnet_disconnect_session (a);
- ps->po_disconnected = 1;
+ proxy_session_close_po (ps);
if (!ps->ao_disconnected && !ps->active_open_establishing)
{
/* Proxy session closed before active open */
- if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)
- {
- a->handle = ps->vpp_active_open_handle;
- a->app_index = pm->active_open_app_index;
- vnet_disconnect_session (a);
- }
+ if (ps->ao.session_handle != SESSION_INVALID_HANDLE)
+ proxy_session_close_ao (ps);
ps->ao_disconnected = 1;
}
}
@@ -181,29 +291,63 @@ proxy_try_close_session (session_t * s, int is_active_open)
}
static void
+proxy_try_side_ctx_cleanup (session_t *s)
+{
+ proxy_main_t *pm = &proxy_main;
+ proxy_session_t *ps;
+ proxy_session_side_ctx_t *sc;
+ proxy_worker_t *wrk;
+
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, s->opaque);
+ if (sc->state == PROXY_SC_S_CREATED)
+ return;
+
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (sc->ps_index);
+
+ if (!ps->po_disconnected)
+ proxy_session_close_po (ps);
+
+ if (!ps->ao_disconnected)
+ proxy_session_close_ao (ps);
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+}
+
+static void
proxy_try_delete_session (session_t * s, u8 is_active_open)
{
proxy_main_t *pm = &proxy_main;
proxy_session_t *ps = 0;
+ proxy_session_side_ctx_t *sc;
+ proxy_worker_t *wrk;
+ u32 ps_index;
+
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, s->opaque);
+ ps_index = sc->ps_index;
+
+ proxy_session_side_ctx_free (wrk, sc);
clib_spinlock_lock_if_init (&pm->sessions_lock);
- ps = proxy_session_get (s->opaque);
+ ps = proxy_session_get (ps_index);
if (is_active_open)
{
- ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
+ ps->ao.session_handle = SESSION_INVALID_HANDLE;
/* Revert master thread index change on connect notification */
- ps->server_rx_fifo->master_thread_index = ps->po_thread_index;
+ ps->po.rx_fifo->master_thread_index =
+ ps->po.tx_fifo->master_thread_index;
/* Passive open already cleaned up */
- if (ps->vpp_server_handle == SESSION_INVALID_HANDLE)
+ if (ps->po.session_handle == SESSION_INVALID_HANDLE)
{
- ASSERT (s->rx_fifo->refcnt == 1);
-
/* The two sides of the proxy on different threads */
- if (ps->po_thread_index != s->thread_index)
+ if (ps->po.tx_fifo->master_thread_index != s->thread_index)
{
/* This is not the right thread to delete the fifos */
s->rx_fifo = 0;
@@ -211,14 +355,17 @@ proxy_try_delete_session (session_t * s, u8 is_active_open)
proxy_session_postponed_free (ps);
}
else
- proxy_session_free (ps);
+ {
+ ASSERT (s->rx_fifo->refcnt == 1);
+ proxy_session_free (ps);
+ }
}
}
else
{
- ps->vpp_server_handle = SESSION_INVALID_HANDLE;
+ ps->po.session_handle = SESSION_INVALID_HANDLE;
- if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
+ if (ps->ao.session_handle == SESSION_INVALID_HANDLE)
{
if (!ps->active_open_establishing)
proxy_session_free (ps);
@@ -275,16 +422,26 @@ static int
proxy_accept_callback (session_t * s)
{
proxy_main_t *pm = &proxy_main;
+ proxy_session_side_ctx_t *sc;
proxy_session_t *ps;
+ proxy_worker_t *wrk;
+ transport_proto_t tp = session_get_transport_proto (s);
+
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_alloc (wrk);
+ s->opaque = sc->sc_index;
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_alloc ();
- ps->vpp_server_handle = session_handle (s);
- ps->vpp_active_open_handle = SESSION_INVALID_HANDLE;
- ps->po_thread_index = s->thread_index;
- s->opaque = ps->ps_index;
+ ps->po.session_handle = session_handle (s);
+ ps->po.rx_fifo = s->rx_fifo;
+ ps->po.tx_fifo = s->tx_fifo;
+
+ ps->ao.session_handle = SESSION_INVALID_HANDLE;
+ sc->ps_index = ps->ps_index;
+ sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -325,92 +482,167 @@ proxy_transport_needs_crypto (transport_proto_t proto)
return proto == TRANSPORT_PROTO_TLS;
}
-static int
-proxy_rx_callback (session_t * s)
+static void
+proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s)
{
+ int actual_transfer __attribute__ ((unused));
+ vnet_connect_args_t _a = {}, *a = &_a;
proxy_main_t *pm = &proxy_main;
- u32 thread_index = vlib_get_thread_index ();
- svm_fifo_t *ao_tx_fifo;
+ u32 max_dequeue, ps_index;
proxy_session_t *ps;
-
- ASSERT (s->thread_index == thread_index);
+ transport_proto_t tp = session_get_transport_proto (s);
clib_spinlock_lock_if_init (&pm->sessions_lock);
- ps = proxy_session_get (s->opaque);
+ ps = proxy_session_get (sc->ps_index);
- if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE))
+ /* maybe we were already here */
+ if (ps->active_open_establishing)
{
clib_spinlock_unlock_if_init (&pm->sessions_lock);
+ return;
+ }
- ao_tx_fifo = s->rx_fifo;
+ ps->active_open_establishing = 1;
+ ps_index = ps->ps_index;
- /*
- * Send event for active open tx fifo
- */
- if (svm_fifo_set_event (ao_tx_fifo))
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ http_msg_t msg;
+ u8 *target_buf = 0;
+ http_uri_t target_uri;
+ http_header_t *resp_headers = 0;
+ session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL;
+ int rv;
+
+ rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
+ ASSERT (rv == sizeof (msg));
+
+ if (msg.type != HTTP_MSG_REQUEST)
+ {
+ proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0);
+ return;
+ }
+ if (msg.method_type != HTTP_REQ_CONNECT)
+ {
+ http_add_header (&resp_headers,
+ http_header_name_token (HTTP_HEADER_ALLOW),
+ http_token_lit ("CONNECT"));
+ proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED,
+ resp_headers);
+ vec_free (resp_headers);
+ return;
+ }
+
+ if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM ||
+ msg.data.target_path_len == 0)
{
- u32 ao_thread_index = ao_tx_fifo->master_thread_index;
- u32 ao_session_index = ao_tx_fifo->shr->master_session_index;
- if (session_send_io_evt_to_thread_custom (&ao_session_index,
- ao_thread_index,
- SESSION_IO_EVT_TX))
- clib_warning ("failed to enqueue tx evt");
+ proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+ return;
}
- if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
- svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+ /* read target uri */
+ target_buf = vec_new (u8, msg.data.target_path_len);
+ rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset,
+ msg.data.target_path_len, target_buf);
+ ASSERT (rv == msg.data.target_path_len);
+ svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len);
+ rv = http_parse_authority_form_target (target_buf, &target_uri);
+ vec_free (target_buf);
+ if (rv)
+ {
+ proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0);
+ return;
+ }
+ target_sep.is_ip4 = target_uri.is_ip4;
+ target_sep.ip = target_uri.ip;
+ target_sep.port = target_uri.port;
+ target_sep.transport_proto = TRANSPORT_PROTO_TCP;
+ clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep));
}
else
{
- vnet_connect_args_t _a, *a = &_a;
- svm_fifo_t *tx_fifo, *rx_fifo;
- u32 max_dequeue, ps_index;
- int actual_transfer __attribute__ ((unused));
+ max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+ if (PREDICT_FALSE (max_dequeue == 0))
+ return;
- rx_fifo = s->rx_fifo;
- tx_fifo = s->tx_fifo;
+ max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
+ actual_transfer =
+ svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue,
+ pm->rx_buf[s->thread_index]);
- ASSERT (rx_fifo->master_thread_index == thread_index);
- ASSERT (tx_fifo->master_thread_index == thread_index);
+ /* Expectation is that here actual data just received is parsed and based
+ * on its contents, the destination and parameters of the connect to the
+ * upstream are decided
+ */
- max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo);
+ clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
+ }
- if (PREDICT_FALSE (max_dequeue == 0))
- {
- clib_spinlock_unlock_if_init (&pm->sessions_lock);
- return 0;
- }
+ a->api_context = ps_index;
+ a->app_index = pm->active_open_app_index;
- max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue);
- actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ ,
- max_dequeue, pm->rx_buf[thread_index]);
+ if (proxy_transport_needs_crypto (a->sep.transport_proto))
+ {
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = pm->ckpair_index;
+ }
- /* $$$ your message in this space: parse url, etc. */
+ proxy_program_connect (a);
+}
- clib_memset (a, 0, sizeof (*a));
+static int
+proxy_rx_callback (session_t *s)
+{
+ proxy_session_side_ctx_t *sc;
+ svm_fifo_t *ao_tx_fifo;
+ proxy_session_t *ps;
+ proxy_worker_t *wrk;
- ps->server_rx_fifo = rx_fifo;
- ps->server_tx_fifo = tx_fifo;
- ps->active_open_establishing = 1;
- ps_index = ps->ps_index;
+ ASSERT (s->thread_index == vlib_get_thread_index ());
- clib_spinlock_unlock_if_init (&pm->sessions_lock);
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, s->opaque);
- clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep));
- a->api_context = ps_index;
- a->app_index = pm->active_open_app_index;
+ if (PREDICT_FALSE (sc->state < PROXY_SC_S_ESTABLISHED))
+ {
+ proxy_main_t *pm = &proxy_main;
- if (proxy_transport_needs_crypto (a->sep.transport_proto))
+ if (sc->state == PROXY_SC_S_CREATED)
{
- session_endpoint_alloc_ext_cfg (&a->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
+ proxy_session_start_connect (sc, s);
+ sc->state = PROXY_SC_S_CONNECTING;
+ return 0;
}
- proxy_call_main_thread (a);
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (sc->ps_index);
+ sc->pair = ps->ao;
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ if (sc->pair.session_handle == SESSION_INVALID_HANDLE)
+ return 0;
+
+ sc->state = PROXY_SC_S_ESTABLISHED;
}
+ ao_tx_fifo = s->rx_fifo;
+
+ /*
+ * Send event for active open tx fifo
+ */
+ if (svm_fifo_set_event (ao_tx_fifo))
+ session_program_tx_io_evt (sc->pair.session_handle, SESSION_IO_EVT_TX);
+
+ if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS)
+ svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+
return 0;
}
@@ -418,20 +650,20 @@ static void
proxy_force_ack (void *handlep)
{
transport_connection_t *tc;
- session_t *ao_s;
+ session_t *s;
- ao_s = session_get_from_handle (pointer_to_uword (handlep));
- if (session_get_transport_proto (ao_s) != TRANSPORT_PROTO_TCP)
+ s = session_get_from_handle (pointer_to_uword (handlep));
+ if (session_get_transport_proto (s) != TRANSPORT_PROTO_TCP)
return;
- tc = session_get_transport (ao_s);
+ tc = session_get_transport (s);
tcp_send_ack ((tcp_connection_t *) tc);
}
static int
proxy_tx_callback (session_t * proxy_s)
{
- proxy_main_t *pm = &proxy_main;
- proxy_session_t *ps;
+ proxy_session_side_ctx_t *sc;
+ proxy_worker_t *wrk;
u32 min_free;
min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
@@ -441,21 +673,17 @@ proxy_tx_callback (session_t * proxy_s)
return 0;
}
- clib_spinlock_lock_if_init (&pm->sessions_lock);
-
- ps = proxy_session_get (proxy_s->opaque);
-
- if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE)
- goto unlock;
+ wrk = proxy_worker_get (proxy_s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, proxy_s->opaque);
+ if (sc->state < PROXY_SC_S_ESTABLISHED)
+ return 0;
/* Force ack on active open side to update rcv wnd. Make sure it's done on
* the right thread */
- void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *);
- session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index,
- proxy_force_ack, arg);
-
-unlock:
- clib_spinlock_unlock_if_init (&pm->sessions_lock);
+ void *arg = uword_to_pointer (sc->pair.session_handle, void *);
+ session_send_rpc_evt_to_thread (
+ session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+ arg);
return 0;
}
@@ -464,7 +692,10 @@ static void
proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
{
if (ntf == SESSION_CLEANUP_TRANSPORT)
- return;
+ {
+ proxy_try_side_ctx_cleanup (s);
+ return;
+ }
proxy_try_delete_session (s, 0 /* is_active_open */ );
}
@@ -490,10 +721,17 @@ active_open_alloc_session_fifos (session_t *s)
clib_spinlock_lock_if_init (&pm->sessions_lock);
+ /* Active open opaque is pointing at proxy session */
ps = proxy_session_get (s->opaque);
- txf = ps->server_rx_fifo;
- rxf = ps->server_tx_fifo;
+ if (ps->po_disconnected)
+ {
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+ return SESSION_E_ALLOC;
+ }
+
+ txf = ps->po.rx_fifo;
+ rxf = ps->po.tx_fifo;
/*
* Reset the active-open tx-fifo master indices so the active-open session
@@ -524,31 +762,43 @@ active_open_connected_callback (u32 app_index, u32 opaque,
{
proxy_main_t *pm = &proxy_main;
proxy_session_t *ps;
- u8 thread_index = vlib_get_thread_index ();
-
- /*
- * Setup proxy session handle.
- */
- clib_spinlock_lock_if_init (&pm->sessions_lock);
-
- ps = proxy_session_get (opaque);
+ proxy_worker_t *wrk;
+ proxy_session_side_ctx_t *sc;
+ session_t *po_s;
+ transport_proto_t tp;
/* Connection failed */
if (err)
{
- vnet_disconnect_args_t _a, *a = &_a;
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
- a->handle = ps->vpp_server_handle;
- a->app_index = pm->server_app_index;
- vnet_disconnect_session (a);
- ps->po_disconnected = 1;
- }
- else
- {
- ps->vpp_active_open_handle = session_handle (s);
- ps->active_open_establishing = 0;
+ ps = proxy_session_get (opaque);
+ po_s = session_get_from_handle (ps->po.session_handle);
+ tp = session_get_transport_proto (po_s);
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0);
+ }
+ ps->ao_disconnected = 1;
+ proxy_session_close_po (ps);
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ return 0;
}
+ wrk = proxy_worker_get (s->thread_index);
+
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (opaque);
+
+ ps->ao.rx_fifo = s->rx_fifo;
+ ps->ao.tx_fifo = s->tx_fifo;
+ ps->ao.session_handle = session_handle (s);
+
+ ps->active_open_establishing = 0;
+
/* Passive open session was already closed! */
if (ps->po_disconnected)
{
@@ -558,21 +808,136 @@ active_open_connected_callback (u32 app_index, u32 opaque,
return -1;
}
- s->opaque = opaque;
+ po_s = session_get_from_handle (ps->po.session_handle);
+ tp = session_get_transport_proto (po_s);
+
+ sc = proxy_session_side_ctx_alloc (wrk);
+ sc->pair = ps->po;
+ sc->ps_index = ps->ps_index;
clib_spinlock_unlock_if_init (&pm->sessions_lock);
- /*
- * Send event for active open tx fifo
- */
- ASSERT (s->thread_index == thread_index);
- if (svm_fifo_set_event (s->tx_fifo))
- session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
+ sc->state = PROXY_SC_S_ESTABLISHED;
+ s->opaque = sc->sc_index;
+ sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0;
+
+ if (tp == TRANSPORT_PROTO_HTTP)
+ {
+ proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0);
+ }
+ else
+ {
+ /*
+ * Send event for active open tx fifo
+ */
+ ASSERT (s->thread_index == vlib_get_thread_index ());
+ if (svm_fifo_set_event (s->tx_fifo))
+ session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX);
+ }
return 0;
}
static void
+active_open_migrate_po_fixup_rpc (void *arg)
+{
+ u32 ps_index = pointer_to_uword (arg);
+ proxy_session_side_ctx_t *po_sc;
+ proxy_main_t *pm = &proxy_main;
+ session_handle_t po_sh;
+ proxy_worker_t *wrk;
+ proxy_session_t *ps;
+ session_t *po_s;
+
+ wrk = proxy_worker_get (vlib_get_thread_index ());
+
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (ps_index);
+
+ po_s = session_get_from_handle (ps->po.session_handle);
+ po_s->rx_fifo = ps->po.rx_fifo;
+ po_s->tx_fifo = ps->po.tx_fifo;
+
+ po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque);
+ po_sc->pair = ps->ao;
+ po_sh = ps->po.session_handle;
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ session_program_tx_io_evt (po_sh, SESSION_IO_EVT_TX);
+}
+
+static void
+active_open_migrate_rpc (void *arg)
+{
+ u32 ps_index = pointer_to_uword (arg);
+ proxy_main_t *pm = &proxy_main;
+ proxy_session_side_ctx_t *sc;
+ proxy_worker_t *wrk;
+ proxy_session_t *ps;
+ session_t *s;
+
+ wrk = proxy_worker_get (vlib_get_thread_index ());
+ sc = proxy_session_side_ctx_alloc (wrk);
+
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (ps_index);
+ sc->ps_index = ps->ps_index;
+
+ s = session_get_from_handle (ps->ao.session_handle);
+ s->opaque = sc->sc_index;
+ s->flags &= ~SESSION_F_IS_MIGRATING;
+
+ /* Fixup passive open session because of migration and zc */
+ ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo;
+ ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo;
+
+ ps->po.tx_fifo->shr->master_session_index =
+ session_index_from_handle (ps->po.session_handle);
+ ps->po.tx_fifo->master_thread_index =
+ session_thread_from_handle (ps->po.session_handle);
+
+ sc->pair = ps->po;
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ session_send_rpc_evt_to_thread (
+ session_thread_from_handle (sc->pair.session_handle),
+ active_open_migrate_po_fixup_rpc, uword_to_pointer (sc->ps_index, void *));
+}
+
+static void
+active_open_migrate_callback (session_t *s, session_handle_t new_sh)
+{
+ proxy_main_t *pm = &proxy_main;
+ proxy_session_side_ctx_t *sc;
+ proxy_session_t *ps;
+ proxy_worker_t *wrk;
+
+ wrk = proxy_worker_get (s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, s->opaque);
+
+ /* NOTE: this is just an example. ZC makes this migration rather
+ * tedious. Probably better approaches could be found */
+ clib_spinlock_lock_if_init (&pm->sessions_lock);
+
+ ps = proxy_session_get (sc->ps_index);
+ ps->ao.session_handle = new_sh;
+ ps->ao.rx_fifo = 0;
+ ps->ao.tx_fifo = 0;
+
+ clib_spinlock_unlock_if_init (&pm->sessions_lock);
+
+ session_send_rpc_evt_to_thread (session_thread_from_handle (new_sh),
+ active_open_migrate_rpc,
+ uword_to_pointer (sc->ps_index, void *));
+
+ proxy_session_side_ctx_free (wrk, sc);
+}
+
+static void
active_open_reset_callback (session_t * s)
{
proxy_try_close_session (s, 1 /* is_active_open */ );
@@ -618,10 +983,8 @@ active_open_rx_callback (session_t * s)
static int
active_open_tx_callback (session_t * ao_s)
{
- proxy_main_t *pm = &proxy_main;
- transport_connection_t *tc;
- proxy_session_t *ps;
- session_t *proxy_s;
+ proxy_session_side_ctx_t *sc;
+ proxy_worker_t *wrk;
u32 min_free;
min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
@@ -631,23 +994,27 @@ active_open_tx_callback (session_t * ao_s)
return 0;
}
- clib_spinlock_lock_if_init (&pm->sessions_lock);
+ wrk = proxy_worker_get (ao_s->thread_index);
+ sc = proxy_session_side_ctx_get (wrk, ao_s->opaque);
- ps = proxy_session_get_if_valid (ao_s->opaque);
- if (!ps)
- goto unlock;
-
- if (ps->vpp_server_handle == ~0)
- goto unlock;
-
- proxy_s = session_get_from_handle (ps->vpp_server_handle);
-
- /* Force ack on proxy side to update rcv wnd */
- tc = session_get_transport (proxy_s);
- tcp_send_ack ((tcp_connection_t *) tc);
+ if (sc->state < PROXY_SC_S_ESTABLISHED)
+ return 0;
-unlock:
- clib_spinlock_unlock_if_init (&pm->sessions_lock);
+ if (sc->is_http)
+ {
+ /* notify HTTP transport */
+ session_t *po = session_get_from_handle (sc->pair.session_handle);
+ session_send_io_evt_to_thread_custom (
+ &po->session_index, po->thread_index, SESSION_IO_EVT_RX);
+ }
+ else
+ {
+ /* Force ack on proxy side to update rcv wnd */
+ void *arg = uword_to_pointer (sc->pair.session_handle, void *);
+ session_send_rpc_evt_to_thread (
+ session_thread_from_handle (sc->pair.session_handle), proxy_force_ack,
+ arg);
+ }
return 0;
}
@@ -664,6 +1031,7 @@ active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf)
static session_cb_vft_t active_open_clients = {
.session_reset_callback = active_open_reset_callback,
.session_connected_callback = active_open_connected_callback,
+ .session_migrate_callback = active_open_migrate_callback,
.session_accept_callback = active_open_create_callback,
.session_disconnect_callback = active_open_disconnect_callback,
.session_cleanup_callback = active_open_cleanup_callback,
@@ -756,22 +1124,33 @@ proxy_server_listen ()
{
proxy_main_t *pm = &proxy_main;
vnet_listen_args_t _a, *a = &_a;
- int rv;
+ int rv, need_crypto;
clib_memset (a, 0, sizeof (*a));
a->app_index = pm->server_app_index;
clib_memcpy (&a->sep_ext, &pm->server_sep, sizeof (pm->server_sep));
- if (proxy_transport_needs_crypto (a->sep.transport_proto))
+ /* Make sure listener is marked connected for transports like udp */
+ a->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED;
+ need_crypto = proxy_transport_needs_crypto (a->sep.transport_proto);
+ if (need_crypto)
+ {
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO,
+ sizeof (transport_endpt_crypto_cfg_t));
+ ext_cfg->crypto.ckpair_index = pm->ckpair_index;
+ }
+ /* set http timeout for connect-proxy */
+ if (pm->server_sep.transport_proto == TRANSPORT_PROTO_HTTP)
{
- session_endpoint_alloc_ext_cfg (&a->sep_ext,
- TRANSPORT_ENDPT_EXT_CFG_CRYPTO);
- a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index;
+ transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg (
+ &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (ext_cfg->opaque));
+ ext_cfg->opaque = pm->idle_timeout;
}
rv = vnet_listen (a);
- if (a->sep_ext.ext_cfg)
- clib_mem_free (a->sep_ext.ext_cfg);
+ if (need_crypto)
+ session_endpoint_free_ext_cfgs (&a->sep_ext);
return rv;
}
@@ -797,15 +1176,25 @@ proxy_server_create (vlib_main_t * vm)
{
vlib_thread_main_t *vtm = vlib_get_thread_main ();
proxy_main_t *pm = &proxy_main;
+ proxy_worker_t *wrk;
u32 num_threads;
int i;
+ if (vlib_num_workers ())
+ clib_spinlock_init (&pm->sessions_lock);
+
num_threads = 1 /* main thread */ + vtm->n_threads;
vec_validate (pm->rx_buf, num_threads - 1);
for (i = 0; i < num_threads; i++)
vec_validate (pm->rx_buf[i], pm->rcv_buffer_size);
+ vec_validate (pm->workers, vlib_num_workers ());
+ vec_foreach (wrk, pm->workers)
+ {
+ clib_spinlock_init (&wrk->pending_connects_lock);
+ }
+
proxy_server_add_ckpair ();
if (proxy_server_attach ())
@@ -813,11 +1202,6 @@ proxy_server_create (vlib_main_t * vm)
clib_warning ("failed to attach server app");
return -1;
}
- if (proxy_server_listen ())
- {
- clib_warning ("failed to start listening");
- return -1;
- }
if (active_open_attach ())
{
clib_warning ("failed to attach active open app");
@@ -849,9 +1233,6 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
pm->private_segment_count = 0;
pm->segment_size = 512 << 20;
- if (vlib_num_workers ())
- clib_spinlock_init (&pm->sessions_lock);
-
if (!unformat_user (input, unformat_line_input, line_input))
return 0;
@@ -883,6 +1264,8 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
vec_add1 (server_uri, 0);
else if (unformat (line_input, "client-uri %s", &client_uri))
vec_add1 (client_uri, 0);
+ else if (unformat (line_input, "idle-timeout %d", &pm->idle_timeout))
+ ;
else
{
error = clib_error_return (0, "unknown input `%U'",
@@ -897,35 +1280,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
default_server_uri);
server_uri = format (0, "%s%c", default_server_uri, 0);
}
- if (!client_uri)
- {
- clib_warning ("No client-uri provided, Using default: %s",
- default_client_uri);
- client_uri = format (0, "%s%c", default_client_uri, 0);
- }
-
if (parse_uri ((char *) server_uri, &pm->server_sep))
{
error = clib_error_return (0, "Invalid server uri %v", server_uri);
goto done;
}
- if (parse_uri ((char *) client_uri, &pm->client_sep))
+
+ /* http proxy get target within request */
+ if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP)
{
- error = clib_error_return (0, "Invalid client uri %v", client_uri);
- goto done;
+ if (!client_uri)
+ {
+ clib_warning ("No client-uri provided, Using default: %s",
+ default_client_uri);
+ client_uri = format (0, "%s%c", default_client_uri, 0);
+ }
+ if (parse_uri ((char *) client_uri, &pm->client_sep))
+ {
+ error = clib_error_return (0, "Invalid client uri %v", client_uri);
+ goto done;
+ }
}
- vnet_session_enable_disable (vm, 1 /* turn on session and transport */ );
-
- rv = proxy_server_create (vm);
- switch (rv)
+ if (pm->server_app_index == APP_INVALID_INDEX)
{
- case 0:
- break;
- default:
- error = clib_error_return (0, "server_create returned %d", rv);
+ session_enable_disable_args_t args = { .is_en = 1,
+ .rt_engine_type =
+ RT_BACKEND_ENGINE_RULE_TABLE };
+ vnet_session_enable_disable (vm, &args);
+ rv = proxy_server_create (vm);
+ if (rv)
+ {
+ error = clib_error_return (0, "server_create returned %d", rv);
+ goto done;
+ }
}
+ if (proxy_server_listen ())
+ error = clib_error_return (0, "failed to start listening");
+
done:
unformat_free (line_input);
vec_free (client_uri);
@@ -933,14 +1326,14 @@ done:
return error;
}
-VLIB_CLI_COMMAND (proxy_create_command, static) =
-{
+VLIB_CLI_COMMAND (proxy_create_command, static) = {
.path = "test proxy server",
- .short_help = "test proxy server [server-uri <tcp://ip/port>]"
- "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
- "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
- "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
- "[private-segment-size <mem>][private-segment-count <nn>]",
+ .short_help = "test proxy server [server-uri <proto://ip/port>]"
+ "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]"
+ "[max-fifo-size <nn>[k|m]][high-watermark <nn>]"
+ "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]"
+ "[private-segment-size <mem>][private-segment-count <nn>]"
+ "[idle-timeout <nn>]",
.function = proxy_server_create_command_fn,
};
@@ -950,6 +1343,8 @@ proxy_main_init (vlib_main_t * vm)
proxy_main_t *pm = &proxy_main;
pm->server_client_index = ~0;
pm->active_open_client_index = ~0;
+ pm->server_app_index = APP_INVALID_INDEX;
+ pm->idle_timeout = 600; /* connect-proxy default idle timeout 10 minutes */
return 0;
}
diff --git a/src/plugins/hs_apps/proxy.h b/src/plugins/hs_apps/proxy.h
index 26f4de2f729..75567e4c1ba 100644
--- a/src/plugins/hs_apps/proxy.h
+++ b/src/plugins/hs_apps/proxy.h
@@ -26,23 +26,57 @@
#include <vnet/session/session.h>
#include <vnet/session/application_interface.h>
+#define foreach_proxy_session_side_state \
+ _ (CREATED, "created") \
+ _ (CONNECTING, "connecting") \
+ _ (ESTABLISHED, "establiehed") \
+ _ (CLOSED, "closed")
+
+typedef enum proxy_session_side_state_
+{
+#define _(sym, str) PROXY_SC_S_##sym,
+ foreach_proxy_session_side_state
+#undef _
+} proxy_session_side_state_t;
+typedef struct proxy_session_side_
+{
+ session_handle_t session_handle;
+ svm_fifo_t *rx_fifo;
+ svm_fifo_t *tx_fifo;
+} proxy_session_side_t;
+
+typedef struct proxy_session_side_ctx_
+{
+ proxy_session_side_t pair;
+ proxy_session_side_state_t state;
+ u32 sc_index;
+ u32 ps_index;
+ u8 is_http;
+} proxy_session_side_ctx_t;
+
typedef struct
{
- svm_fifo_t *server_rx_fifo;
- svm_fifo_t *server_tx_fifo;
+ proxy_session_side_t po; /**< passive open side */
+ proxy_session_side_t ao; /**< active open side */
- session_handle_t vpp_server_handle;
- session_handle_t vpp_active_open_handle;
volatile int active_open_establishing;
volatile int po_disconnected;
volatile int ao_disconnected;
u32 ps_index;
- u32 po_thread_index;
} proxy_session_t;
+typedef struct proxy_worker_
+{
+ proxy_session_side_ctx_t *ctx_pool;
+ clib_spinlock_t pending_connects_lock;
+ vnet_connect_args_t *pending_connects;
+ vnet_connect_args_t *burst_connects;
+} proxy_worker_t;
+
typedef struct
{
+ proxy_worker_t *workers; /**< per-thread data */
proxy_session_t *sessions; /**< session pool, shared */
clib_spinlock_t sessions_lock; /**< lock for session pool */
u8 **rx_buf; /**< intermediate rx buffers */
@@ -63,6 +97,7 @@ typedef struct
u32 private_segment_count; /**< Number of private fifo segs */
u64 segment_size; /**< size of fifo segs */
u8 prealloc_fifos; /**< Request fifo preallocation */
+ u32 idle_timeout; /**< connect-proxy timeout for idle connections */
int rcv_buffer_size;
session_endpoint_cfg_t server_sep;
session_endpoint_cfg_t client_sep;
@@ -75,6 +110,13 @@ typedef struct
extern proxy_main_t proxy_main;
+static inline proxy_worker_t *
+proxy_worker_get (u32 thread_index)
+{
+ proxy_main_t *pm = &proxy_main;
+ return vec_elt_at_index (pm->workers, thread_index);
+}
+
#endif /* __included_proxy_h__ */
/*
diff --git a/src/plugins/hs_apps/sapi/vpp_echo_common.c b/src/plugins/hs_apps/sapi/vpp_echo_common.c
index 5ce04d1b75b..09ba583cf78 100644
--- a/src/plugins/hs_apps/sapi/vpp_echo_common.c
+++ b/src/plugins/hs_apps/sapi/vpp_echo_common.c
@@ -330,8 +330,8 @@ format_transport_proto (u8 * s, va_list * args)
case TRANSPORT_PROTO_UDP:
s = format (s, "UDP");
break;
- case TRANSPORT_PROTO_NONE:
- s = format (s, "NONE");
+ case TRANSPORT_PROTO_CT:
+ s = format (s, "CT");
break;
case TRANSPORT_PROTO_TLS:
s = format (s, "TLS");
diff --git a/src/plugins/hs_apps/test_builtins.c b/src/plugins/hs_apps/test_builtins.c
new file mode 100644
index 00000000000..c314e71b5df
--- /dev/null
+++ b/src/plugins/hs_apps/test_builtins.c
@@ -0,0 +1,192 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2024 Cisco Systems, Inc.
+ */
+
+#include <http_static/http_static.h>
+#include <vppinfra/tw_timer_2t_1w_2048sl.h>
+
+typedef struct
+{
+ u32 stop_timer_handle;
+ hss_session_handle_t sh;
+} tw_timer_elt_t;
+
+typedef struct tb_main_
+{
+ tw_timer_elt_t *delayed_resps;
+ tw_timer_wheel_2t_1w_2048sl_t tw;
+ hss_session_send_fn send_data;
+ u8 *test_data;
+} tb_main_t;
+
+static tb_main_t tb_main;
+
+static uword
+test_builtins_timer_process (vlib_main_t *vm, vlib_node_runtime_t *rt,
+ vlib_frame_t *f)
+{
+ tb_main_t *tbm = &tb_main;
+ f64 now, timeout = 1.0;
+ uword *event_data = 0;
+ uword __clib_unused event_type;
+
+ while (1)
+ {
+ vlib_process_wait_for_event_or_clock (vm, timeout);
+ now = vlib_time_now (vm);
+ event_type = vlib_process_get_events (vm, (uword **) &event_data);
+
+ /* expire timers */
+ tw_timer_expire_timers_2t_1w_2048sl (&tbm->tw, now);
+
+ vec_reset_length (event_data);
+ }
+ return 0;
+}
+
+VLIB_REGISTER_NODE (test_builtins_timer_process_node) = {
+ .function = test_builtins_timer_process,
+ .type = VLIB_NODE_TYPE_PROCESS,
+ .name = "test-builtins-timer-process",
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+
+static void
+send_data_to_hss (hss_session_handle_t sh, u8 *data, u8 free_vec_data)
+{
+ tb_main_t *tbm = &tb_main;
+ hss_url_handler_args_t args = {};
+
+ args.sh = sh;
+ args.data = data;
+ args.data_len = vec_len (data);
+ args.ct = HTTP_CONTENT_TEXT_PLAIN;
+ args.sc = HTTP_STATUS_OK;
+ args.free_vec_data = free_vec_data;
+
+ tbm->send_data (&args);
+}
+
+static hss_url_handler_rc_t
+handle_get_test1 (hss_url_handler_args_t *args)
+{
+ u8 *data;
+
+ clib_warning ("get request on test1");
+ data = format (0, "hello");
+ send_data_to_hss (args->sh, data, 1);
+
+ return HSS_URL_HANDLER_ASYNC;
+}
+
+static hss_url_handler_rc_t
+handle_get_test2 (hss_url_handler_args_t *args)
+{
+ u8 *data;
+
+ clib_warning ("get request on test2");
+ data = format (0, "some data");
+ send_data_to_hss (args->sh, data, 1);
+
+ return HSS_URL_HANDLER_ASYNC;
+}
+
+static void
+delayed_resp_cb (u32 *expired_timers)
+{
+ tb_main_t *tbm = &tb_main;
+ int i;
+ u32 pool_index;
+ tw_timer_elt_t *e;
+ u8 *data;
+
+ for (i = 0; i < vec_len (expired_timers); i++)
+ {
+ pool_index = expired_timers[i] & 0x7FFFFFFF;
+ e = pool_elt_at_index (tbm->delayed_resps, pool_index);
+ clib_warning ("sending delayed data");
+ data = format (0, "delayed data");
+ send_data_to_hss (e->sh, data, 1);
+ pool_put (tbm->delayed_resps, e);
+ }
+}
+
+static hss_url_handler_rc_t
+handle_get_test_delayed (hss_url_handler_args_t *args)
+{
+ tb_main_t *tbm = &tb_main;
+ tw_timer_elt_t *e;
+
+ clib_warning ("get request on test_delayed");
+ pool_get (tbm->delayed_resps, e);
+ e->sh = args->sh;
+ e->stop_timer_handle =
+ tw_timer_start_2t_1w_2048sl (&tbm->tw, e - tbm->delayed_resps, 0, 5);
+
+ return HSS_URL_HANDLER_ASYNC;
+}
+
+static hss_url_handler_rc_t
+handle_post_test3 (hss_url_handler_args_t *args)
+{
+ send_data_to_hss (args->sh, 0, 0);
+ return HSS_URL_HANDLER_ASYNC;
+}
+
+static hss_url_handler_rc_t
+handle_get_64bytes (hss_url_handler_args_t *args)
+{
+ tb_main_t *tbm = &tb_main;
+ send_data_to_hss (args->sh, tbm->test_data, 0);
+ return HSS_URL_HANDLER_ASYNC;
+}
+
+static void
+test_builtins_init (vlib_main_t *vm)
+{
+ tb_main_t *tbm = &tb_main;
+ hss_register_url_fn fp;
+ vlib_node_t *n;
+
+ fp = vlib_get_plugin_symbol ("http_static_plugin.so",
+ "hss_register_url_handler");
+
+ if (fp == 0)
+ {
+ clib_warning ("http_static_plugin.so not loaded...");
+ return;
+ }
+
+ tbm->test_data = format (
+ 0, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+
+ (*fp) (handle_get_test1, "test1", HTTP_REQ_GET);
+ (*fp) (handle_get_test2, "test2", HTTP_REQ_GET);
+ (*fp) (handle_get_test_delayed, "test_delayed", HTTP_REQ_GET);
+ (*fp) (handle_post_test3, "test3", HTTP_REQ_POST);
+ (*fp) (handle_get_64bytes, "64B", HTTP_REQ_GET);
+
+ tbm->send_data =
+ vlib_get_plugin_symbol ("http_static_plugin.so", "hss_session_send_data");
+
+ tw_timer_wheel_init_2t_1w_2048sl (&tbm->tw, delayed_resp_cb, 1.0, ~0);
+
+ vlib_node_set_state (vm, test_builtins_timer_process_node.index,
+ VLIB_NODE_STATE_POLLING);
+ n = vlib_get_node (vm, test_builtins_timer_process_node.index);
+ vlib_start_process (vm, n->runtime_index);
+}
+
+static clib_error_t *
+test_builtins_enable_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ test_builtins_init (vm);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (test_builtins_enable_command, static) = {
+ .path = "test-url-handler enable",
+ .short_help = "test-url-handler enable",
+ .function = test_builtins_enable_command_fn,
+};
diff --git a/src/plugins/hs_apps/vcl/vcl_test.h b/src/plugins/hs_apps/vcl/vcl_test.h
index 0ce27ef84e2..11667fb144a 100644
--- a/src/plugins/hs_apps/vcl/vcl_test.h
+++ b/src/plugins/hs_apps/vcl/vcl_test.h
@@ -124,7 +124,7 @@ typedef struct
typedef struct
{
- const vcl_test_proto_vft_t *protos[VPPCOM_PROTO_SRTP + 1];
+ const vcl_test_proto_vft_t *protos[VPPCOM_PROTO_HTTP + 1];
uint32_t ckpair_index;
hs_test_cfg_t cfg;
vcl_test_wrk_t *wrk;
@@ -420,6 +420,39 @@ vcl_test_write (vcl_test_session_t *ts, void *buf, uint32_t nbytes)
return (tx_bytes);
}
+static inline int
+vcl_test_write_ds (vcl_test_session_t *ts)
+{
+ vcl_test_stats_t *stats = &ts->stats;
+ int tx_bytes;
+
+ do
+ {
+ stats->tx_xacts++;
+ if (ts->ds[1].len)
+ tx_bytes = vppcom_session_write_segments (ts->fd, ts->ds, 2);
+ else
+ tx_bytes = vppcom_session_write_segments (ts->fd, ts->ds, 1);
+
+ if (tx_bytes < 0)
+ errno = -tx_bytes;
+ if ((tx_bytes == 0) ||
+ ((tx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))))
+ stats->rx_eagain++;
+ }
+ while ((tx_bytes == 0) ||
+ ((tx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))));
+
+ if (tx_bytes < 0)
+ {
+ vterr ("vppcom_session_write_segments()", -errno);
+ }
+ else
+ stats->tx_bytes += tx_bytes;
+
+ return (tx_bytes);
+}
+
static inline void
dump_help (void)
{
diff --git a/src/plugins/hs_apps/vcl/vcl_test_client.c b/src/plugins/hs_apps/vcl/vcl_test_client.c
index a4a10b562ff..8bac1f00b9d 100644
--- a/src/plugins/hs_apps/vcl/vcl_test_client.c
+++ b/src/plugins/hs_apps/vcl/vcl_test_client.c
@@ -419,13 +419,8 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk)
if (vcm->incremental_stats)
vtc_inc_stats_check (ts);
}
- if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) ||
- (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes))
- {
- clock_gettime (CLOCK_REALTIME, &ts->stats.stop);
- ts->is_done = 1;
- n_active_sessions--;
- }
+ if (vtc_session_check_is_done (ts, check_rx))
+ n_active_sessions -= 1;
}
}
diff --git a/src/plugins/hs_apps/vcl/vcl_test_protos.c b/src/plugins/hs_apps/vcl/vcl_test_protos.c
index cd1ac2b24f4..9c81c5f17a1 100644
--- a/src/plugins/hs_apps/vcl/vcl_test_protos.c
+++ b/src/plugins/hs_apps/vcl/vcl_test_protos.c
@@ -14,6 +14,23 @@
*/
#include <hs_apps/vcl/vcl_test.h>
+#include <http/http.h>
+#include <http/http_header_names.h>
+#include <http/http_content_types.h>
+
+typedef enum vcl_test_http_state_
+{
+ VCL_TEST_HTTP_IDLE = 0,
+ VCL_TEST_HTTP_IN_PROGRESS,
+ VCL_TEST_HTTP_COMPLETED,
+} vcl_test_http_state_t;
+
+typedef struct vcl_test_http_ctx_t
+{
+ u8 is_server;
+ vcl_test_http_state_t test_state;
+ u64 rem_data;
+} vcl_test_http_ctx_t;
static int
vt_tcp_connect (vcl_test_session_t *ts, vppcom_endpt_t *endpt)
@@ -978,6 +995,418 @@ static const vcl_test_proto_vft_t vcl_test_srtp = {
VCL_TEST_REGISTER_PROTO (VPPCOM_PROTO_SRTP, vcl_test_srtp);
+static void
+vt_http_session_init (vcl_test_session_t *ts, u8 is_server)
+{
+ vcl_test_http_ctx_t *http_ctx;
+
+ http_ctx = malloc (sizeof (vcl_test_http_ctx_t));
+ memset (http_ctx, 0, sizeof (*http_ctx));
+ http_ctx->is_server = is_server;
+ ts->opaque = http_ctx;
+}
+
+static inline void
+vt_http_send_reply_msg (vcl_test_session_t *ts, http_status_code_t status)
+{
+ http_msg_t msg;
+ int rv = 0;
+
+ memset (&msg, 0, sizeof (http_msg_t));
+ msg.type = HTTP_MSG_REPLY;
+ msg.code = status;
+
+ vppcom_data_segment_t segs[1] = { { (u8 *) &msg, sizeof (msg) } };
+
+ do
+ {
+ rv = vppcom_session_write_segments (ts->fd, segs, 1);
+
+ if (rv < 0)
+ {
+ errno = -rv;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ continue;
+
+ vterr ("vppcom_session_write()", -errno);
+ break;
+ }
+ }
+ while (rv <= 0);
+}
+
+static inline int
+vt_process_http_server_read_msg (vcl_test_session_t *ts, void *buf,
+ uint32_t nbytes)
+{
+ http_msg_t msg;
+ u8 *target_path = 0;
+ vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque;
+ vcl_test_stats_t *stats = &ts->stats;
+ int rv = 0;
+
+ do
+ {
+ stats->rx_xacts++;
+ rv = vppcom_session_read (ts->fd, buf, nbytes);
+
+ if (rv <= 0)
+ {
+ errno = -rv;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ stats->rx_eagain++;
+ continue;
+ }
+
+ vterr ("vppcom_session_read()", -errno);
+ return 0;
+ }
+
+ if (PREDICT_TRUE (vcl_test_http_ctx->test_state ==
+ VCL_TEST_HTTP_IN_PROGRESS))
+ {
+ vcl_test_http_ctx->rem_data -= rv;
+
+ if (vcl_test_http_ctx->rem_data == 0)
+ {
+ vcl_test_http_ctx->test_state = VCL_TEST_HTTP_COMPLETED;
+ vt_http_send_reply_msg (ts, HTTP_STATUS_OK);
+ }
+ }
+ else if (PREDICT_FALSE (vcl_test_http_ctx->test_state ==
+ VCL_TEST_HTTP_IDLE))
+ {
+ msg = *(http_msg_t *) buf;
+
+ /* verify that we have received http post request from client */
+ if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_POST)
+ {
+ vt_http_send_reply_msg (ts, HTTP_STATUS_METHOD_NOT_ALLOWED);
+ vterr ("error! only POST requests allowed from client", 0);
+ return 0;
+ }
+
+ if (msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
+ {
+ vt_http_send_reply_msg (ts, HTTP_STATUS_BAD_REQUEST);
+ vterr ("error! http target not in origin form", 0);
+ return 0;
+ }
+
+ /* validate target path syntax */
+ if (msg.data.target_path_len)
+ {
+ vec_validate (target_path, msg.data.target_path_len - 1);
+ memcpy (target_path,
+ buf + sizeof (msg) + msg.data.target_path_offset - 1,
+ msg.data.target_path_len + 1);
+ if (http_validate_abs_path_syntax (target_path, 0))
+ {
+ vt_http_send_reply_msg (ts, HTTP_STATUS_BAD_REQUEST);
+ vterr ("error! target path is not absolute", 0);
+ vec_free (target_path);
+ return 0;
+ }
+ vec_free (target_path);
+ }
+
+ /* read body */
+ if (msg.data.body_len)
+ {
+ vcl_test_http_ctx->rem_data = msg.data.body_len;
+ /* | <http_msg_t> | <target> | <headers> | <body> | */
+ vcl_test_http_ctx->rem_data -=
+ (rv - sizeof (msg) - msg.data.body_offset);
+ vcl_test_http_ctx->test_state = VCL_TEST_HTTP_IN_PROGRESS;
+ }
+ }
+
+ if (rv < nbytes)
+ stats->rx_incomp++;
+ }
+ while (rv <= 0);
+
+ stats->rx_bytes += rv;
+ return (rv);
+}
+
+static inline int
+vt_process_http_client_read_msg (vcl_test_session_t *ts, void *buf,
+ uint32_t nbytes)
+{
+ http_msg_t msg;
+ int rv = 0;
+
+ do
+ {
+ rv = vppcom_session_read (ts->fd, buf, nbytes);
+
+ if (rv < 0)
+ {
+ errno = -rv;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ continue;
+
+ vterr ("vppcom_session_read()", -errno);
+ break;
+ }
+ }
+ while (!rv);
+
+ msg = *(http_msg_t *) buf;
+
+ if (msg.type == HTTP_MSG_REPLY && msg.code == HTTP_STATUS_OK)
+ vtinf ("received 200 OK from server");
+ else
+ vterr ("received unexpected reply from server", 0);
+
+ return (rv);
+}
+
+static inline int
+vt_process_http_client_write_msg (vcl_test_session_t *ts, void *buf,
+ uint32_t nbytes)
+{
+ http_msg_t msg;
+ http_header_t *req_headers = 0;
+ u8 *headers_buf = 0;
+ u8 *target;
+ vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque;
+ vcl_test_stats_t *stats = &ts->stats;
+ int rv = 0;
+
+ if (PREDICT_TRUE (vcl_test_http_ctx->test_state ==
+ VCL_TEST_HTTP_IN_PROGRESS))
+ {
+ do
+ {
+ rv = vppcom_session_write (
+ ts->fd, buf, clib_min (nbytes, vcl_test_http_ctx->rem_data));
+
+ if (rv <= 0)
+ {
+ errno = -rv;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ stats->tx_eagain++;
+ continue;
+ }
+
+ vterr ("vppcom_session_write()", -errno);
+ return 0;
+ }
+
+ vcl_test_http_ctx->rem_data -= rv;
+
+ if (vcl_test_http_ctx->rem_data == 0)
+ {
+ vcl_test_http_ctx->test_state = VCL_TEST_HTTP_COMPLETED;
+ vtinf ("client finished sending %ld bytes of data",
+ ts->cfg.total_bytes);
+ }
+
+ if (rv < nbytes)
+ stats->tx_incomp++;
+ }
+ while (rv <= 0);
+ }
+
+ else if (PREDICT_FALSE (vcl_test_http_ctx->test_state == VCL_TEST_HTTP_IDLE))
+ {
+ http_add_header (
+ &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE),
+ http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
+ headers_buf = http_serialize_headers (req_headers);
+ vec_free (req_headers);
+
+ memset (&msg, 0, sizeof (http_msg_t));
+ msg.type = HTTP_MSG_REQUEST;
+ msg.method_type = HTTP_REQ_POST;
+
+ /* target */
+ msg.data.target_form = HTTP_TARGET_ORIGIN_FORM;
+ target = (u8 *) "/vcl_test_http\0";
+ msg.data.target_path_len = strlen ((char *) target);
+
+ /* headers */
+ msg.data.headers_offset = msg.data.target_path_len;
+ msg.data.headers_len = vec_len (headers_buf);
+
+ /* body */
+ msg.data.body_offset = msg.data.headers_offset + msg.data.headers_len;
+ msg.data.body_len = ts->cfg.total_bytes;
+
+ msg.data.len =
+ msg.data.target_path_len + msg.data.headers_len + msg.data.body_len;
+ msg.data.type = HTTP_MSG_DATA_INLINE;
+
+ vppcom_data_segment_t segs[3] = { { (u8 *) &msg, sizeof (msg) },
+ { target, strlen ((char *) target) },
+ { headers_buf,
+ vec_len (headers_buf) } };
+
+ do
+ {
+ rv = vppcom_session_write_segments (ts->fd, segs, 3);
+
+ if (rv <= 0)
+ {
+ errno = -rv;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ stats->tx_eagain++;
+ continue;
+ }
+
+ vterr ("vppcom_session_write_segments()", -errno);
+ vec_free (headers_buf);
+ return 0;
+ }
+ }
+ while (rv <= 0);
+
+ vcl_test_http_ctx->test_state = VCL_TEST_HTTP_IN_PROGRESS;
+ vcl_test_http_ctx->rem_data = ts->cfg.total_bytes;
+ vec_free (headers_buf);
+ }
+
+ stats->tx_bytes += rv;
+ return (rv);
+}
+
+static inline int
+vt_process_http_server_write_msg (vcl_test_session_t *ts, void *buf,
+ uint32_t nbytes)
+{
+ return 0;
+}
+
+static inline int
+vt_http_read (vcl_test_session_t *ts, void *buf, uint32_t nbytes)
+{
+ vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque;
+
+ if (vcl_test_http_ctx->is_server)
+ return vt_process_http_server_read_msg (ts, buf, nbytes);
+ else
+ return vt_process_http_client_read_msg (ts, buf, nbytes);
+}
+
+static inline int
+vt_http_write (vcl_test_session_t *ts, void *buf, uint32_t nbytes)
+{
+ vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque;
+
+ if (vcl_test_http_ctx->is_server)
+ return vt_process_http_server_write_msg (ts, buf, nbytes);
+ else
+ return vt_process_http_client_write_msg (ts, buf, nbytes);
+}
+
+static int
+vt_http_connect (vcl_test_session_t *ts, vppcom_endpt_t *endpt)
+{
+ uint32_t flags, flen;
+ int rv;
+
+ ts->fd = vppcom_session_create (VPPCOM_PROTO_HTTP, ts->noblk_connect);
+ if (ts->fd < 0)
+ {
+ vterr ("vppcom_session_create()", ts->fd);
+ return ts->fd;
+ }
+
+ rv = vppcom_session_connect (ts->fd, endpt);
+ if (rv < 0 && rv != VPPCOM_EINPROGRESS)
+ {
+ vterr ("vppcom_session_connect()", rv);
+ return rv;
+ }
+
+ ts->read = vt_http_read;
+ ts->write = vt_http_write;
+
+ if (!ts->noblk_connect)
+ {
+ flags = O_NONBLOCK;
+ flen = sizeof (flags);
+ vppcom_session_attr (ts->fd, VPPCOM_ATTR_SET_FLAGS, &flags, &flen);
+ vtinf ("Test session %d (fd %d) connected.", ts->session_index, ts->fd);
+ }
+
+ vt_http_session_init (ts, 0 /* is_server */);
+
+ return 0;
+}
+
+static int
+vt_http_listen (vcl_test_session_t *ts, vppcom_endpt_t *endpt)
+{
+ int rv;
+
+ ts->fd = vppcom_session_create (VPPCOM_PROTO_HTTP, 1 /* is_nonblocking */);
+ if (ts->fd < 0)
+ {
+ vterr ("vppcom_session_create()", ts->fd);
+ return ts->fd;
+ }
+
+ rv = vppcom_session_bind (ts->fd, endpt);
+ if (rv < 0)
+ {
+ vterr ("vppcom_session_bind()", rv);
+ return rv;
+ }
+
+ rv = vppcom_session_listen (ts->fd, 10);
+ if (rv < 0)
+ {
+ vterr ("vppcom_session_listen()", rv);
+ return rv;
+ }
+
+ return 0;
+}
+
+static int
+vt_http_accept (int listen_fd, vcl_test_session_t *ts)
+{
+ int client_fd;
+
+ client_fd = vppcom_session_accept (listen_fd, &ts->endpt, 0);
+ if (client_fd < 0)
+ {
+ vterr ("vppcom_session_accept()", client_fd);
+ return client_fd;
+ }
+
+ ts->fd = client_fd;
+ ts->is_open = 1;
+ ts->read = vt_http_read;
+ ts->write = vt_http_write;
+
+ vt_http_session_init (ts, 1 /* is_server */);
+
+ return 0;
+}
+
+static int
+vt_http_close (vcl_test_session_t *ts)
+{
+ free (ts->opaque);
+ return 0;
+}
+
+static const vcl_test_proto_vft_t vcl_test_http = {
+ .open = vt_http_connect,
+ .listen = vt_http_listen,
+ .accept = vt_http_accept,
+ .close = vt_http_close,
+};
+
+VCL_TEST_REGISTER_PROTO (VPPCOM_PROTO_HTTP, vcl_test_http);
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/plugins/hs_apps/vcl/vcl_test_server.c b/src/plugins/hs_apps/vcl/vcl_test_server.c
index d17a2089ba7..008539f2585 100644
--- a/src/plugins/hs_apps/vcl/vcl_test_server.c
+++ b/src/plugins/hs_apps/vcl/vcl_test_server.c
@@ -282,11 +282,7 @@ vts_server_process_rx (vcl_test_session_t *conn, int rx_bytes)
if (conn->cfg.test == HS_TEST_TYPE_BI)
{
if (vsm->use_ds)
- {
- (void) vcl_test_write (conn, conn->ds[0].data, conn->ds[0].len);
- if (conn->ds[1].len)
- (void) vcl_test_write (conn, conn->ds[1].data, conn->ds[1].len);
- }
+ (void) vcl_test_write_ds (conn);
else
(void) vcl_test_write (conn, conn->rxbuf, rx_bytes);
}
@@ -420,36 +416,41 @@ static void
vcl_test_init_endpoint_addr (vcl_test_server_main_t * vsm)
{
struct sockaddr_storage *servaddr = &vsm->servaddr;
- memset (servaddr, 0, sizeof (*servaddr));
if (vsm->server_cfg.address_ip6)
{
struct sockaddr_in6 *server_addr = (struct sockaddr_in6 *) servaddr;
- server_addr->sin6_family = AF_INET6;
- server_addr->sin6_addr = in6addr_any;
- server_addr->sin6_port = htons (vsm->server_cfg.port);
+ vsm->server_cfg.endpt.is_ip4 = 0;
+ vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin6_addr;
+ vsm->server_cfg.endpt.port = htons (vsm->server_cfg.port);
}
else
{
struct sockaddr_in *server_addr = (struct sockaddr_in *) servaddr;
- server_addr->sin_family = AF_INET;
- server_addr->sin_addr.s_addr = htonl (INADDR_ANY);
- server_addr->sin_port = htons (vsm->server_cfg.port);
+ vsm->server_cfg.endpt.is_ip4 = 1;
+ vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin_addr;
+ vsm->server_cfg.endpt.port = htons (vsm->server_cfg.port);
}
+}
+
+static void
+vcl_test_clear_endpoint_addr (vcl_test_server_main_t *vsm)
+{
+ struct sockaddr_storage *servaddr = &vsm->servaddr;
+
+ memset (&vsm->servaddr, 0, sizeof (vsm->servaddr));
if (vsm->server_cfg.address_ip6)
{
struct sockaddr_in6 *server_addr = (struct sockaddr_in6 *) servaddr;
- vsm->server_cfg.endpt.is_ip4 = 0;
- vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin6_addr;
- vsm->server_cfg.endpt.port = (uint16_t) server_addr->sin6_port;
+ server_addr->sin6_family = AF_INET6;
+ server_addr->sin6_addr = in6addr_any;
}
else
{
struct sockaddr_in *server_addr = (struct sockaddr_in *) servaddr;
- vsm->server_cfg.endpt.is_ip4 = 1;
- vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin_addr;
- vsm->server_cfg.endpt.port = (uint16_t) server_addr->sin_port;
+ server_addr->sin_family = AF_INET;
+ server_addr->sin_addr.s_addr = htonl (INADDR_ANY);
}
}
@@ -460,9 +461,10 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc,
int v, c;
vsm->server_cfg.proto = VPPCOM_PROTO_TCP;
+ vcl_test_clear_endpoint_addr (vsm);
opterr = 0;
- while ((c = getopt (argc, argv, "6DLsw:hp:S")) != -1)
+ while ((c = getopt (argc, argv, "6DLsw:hp:SB:")) != -1)
switch (c)
{
case '6':
@@ -473,7 +475,22 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc,
if (vppcom_unformat_proto (&vsm->server_cfg.proto, optarg))
vtwrn ("Invalid vppcom protocol %s, defaulting to TCP", optarg);
break;
-
+ case 'B':
+ if (vsm->server_cfg.address_ip6)
+ {
+ if (inet_pton (
+ AF_INET6, optarg,
+ &((struct sockaddr_in6 *) &vsm->servaddr)->sin6_addr) != 1)
+ vtwrn ("couldn't parse ipv6 addr %s", optarg);
+ }
+ else
+ {
+ if (inet_pton (
+ AF_INET, optarg,
+ &((struct sockaddr_in *) &vsm->servaddr)->sin_addr) != 1)
+ vtwrn ("couldn't parse ipv4 addr %s", optarg);
+ }
+ break;
case 'D':
vsm->server_cfg.proto = VPPCOM_PROTO_UDP;
break;