diff options
Diffstat (limited to 'src/plugins/hs_apps')
-rw-r--r-- | src/plugins/hs_apps/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/plugins/hs_apps/echo_client.c | 14 | ||||
-rw-r--r-- | src/plugins/hs_apps/echo_server.c | 21 | ||||
-rw-r--r-- | src/plugins/hs_apps/http_cli.c | 226 | ||||
-rw-r--r-- | src/plugins/hs_apps/http_client.c | 743 | ||||
-rw-r--r-- | src/plugins/hs_apps/http_client_cli.c | 141 | ||||
-rw-r--r-- | src/plugins/hs_apps/http_tps.c | 192 | ||||
-rw-r--r-- | src/plugins/hs_apps/proxy.c | 857 | ||||
-rw-r--r-- | src/plugins/hs_apps/proxy.h | 52 | ||||
-rw-r--r-- | src/plugins/hs_apps/sapi/vpp_echo_common.c | 4 | ||||
-rw-r--r-- | src/plugins/hs_apps/test_builtins.c | 192 | ||||
-rw-r--r-- | src/plugins/hs_apps/vcl/vcl_test.h | 35 | ||||
-rw-r--r-- | src/plugins/hs_apps/vcl/vcl_test_client.c | 9 | ||||
-rw-r--r-- | src/plugins/hs_apps/vcl/vcl_test_protos.c | 429 | ||||
-rw-r--r-- | src/plugins/hs_apps/vcl/vcl_test_server.c | 57 |
15 files changed, 2585 insertions, 389 deletions
diff --git a/src/plugins/hs_apps/CMakeLists.txt b/src/plugins/hs_apps/CMakeLists.txt index 179c9c7a4c4..eae100949d4 100644 --- a/src/plugins/hs_apps/CMakeLists.txt +++ b/src/plugins/hs_apps/CMakeLists.txt @@ -21,8 +21,10 @@ add_vpp_plugin(hs_apps hs_apps.c http_cli.c http_client_cli.c + http_client.c http_tps.c proxy.c + test_builtins.c ) ############################################################################## diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index d1443e75e80..d5edffbd02e 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -429,8 +429,11 @@ ec_init (vlib_main_t *vm) ecm->app_is_init = 1; + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; vlib_worker_thread_barrier_sync (vm); - vnet_session_enable_disable (vm, 1 /* turn on session and transports */); + vnet_session_enable_disable (vm, &args); /* Turn on the builtin client input nodes */ foreach_vlib_main () @@ -943,15 +946,16 @@ ec_connect_rpc (void *args) a->api_context = ci; if (needs_crypto) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = ecm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = ecm->ckpair_index; } rv = vnet_connect (a); if (needs_crypto) - clib_mem_free (a->sep_ext.ext_cfg); + session_endpoint_free_ext_cfgs (&a->sep_ext); if (rv) { diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c index 0243252434a..b981e775b57 100644 --- a/src/plugins/hs_apps/echo_server.c +++ b/src/plugins/hs_apps/echo_server.c @@ -591,6 +591,7 @@ echo_server_listen () i32 rv; echo_server_main_t *esm = &echo_server_main; vnet_listen_args_t _args = {}, *args = &_args; + int needs_crypto; if ((rv = parse_uri (esm->server_uri, &args->sep_ext))) { @@ -598,11 +599,14 @@ echo_server_listen () } args->app_index = esm->app_index; args->sep_ext.port = hs_make_data_port (args->sep_ext.port); - if (echo_client_transport_needs_crypto (args->sep_ext.transport_proto)) + needs_crypto = + echo_client_transport_needs_crypto (args->sep_ext.transport_proto); + if (needs_crypto) { - session_endpoint_alloc_ext_cfg (&args->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - args->sep_ext.ext_cfg->crypto.ckpair_index = esm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &args->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = esm->ckpair_index; } if (args->sep_ext.transport_proto == TRANSPORT_PROTO_UDP) @@ -612,8 +616,8 @@ echo_server_listen () rv = vnet_listen (args); esm->listener_handle = args->handle; - if (args->sep_ext.ext_cfg) - clib_mem_free (args->sep_ext.ext_cfg); + if (needs_crypto) + session_endpoint_free_ext_cfgs (&args->sep_ext); return rv; } @@ -736,7 +740,10 @@ echo_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, goto cleanup; } - vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ ); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); if (!server_uri_set) { diff --git a/src/plugins/hs_apps/http_cli.c b/src/plugins/hs_apps/http_cli.c index 4970da72012..dfa90f9eced 100644 --- a/src/plugins/hs_apps/http_cli.c +++ b/src/plugins/hs_apps/http_cli.c @@ -17,6 +17,8 @@ #include <vnet/session/application_interface.h> #include <vnet/session/session.h> #include <http/http.h> +#include <http/http_header_names.h> +#include <http/http_content_types.h> #define HCS_DEBUG 0 @@ -28,6 +30,12 @@ typedef struct { + u32 handle; + u8 *uri; +} hcs_uri_map_t; + +typedef struct +{ u32 hs_index; u32 thread_index; u64 node_index; @@ -43,6 +51,7 @@ typedef struct u8 *tx_buf; u32 tx_offset; u32 vpp_session_index; + http_header_t *resp_headers; } hcs_session_t; typedef struct @@ -59,6 +68,16 @@ typedef struct u32 fifo_size; u8 *uri; vlib_main_t *vlib_main; + + /* hash table to store uri -> uri map pool index */ + uword *index_by_uri; + + /* pool of uri maps */ + hcs_uri_map_t *uri_map_pool; + + /* for appns */ + u8 *appns_id; + u64 appns_secret; } hcs_main_t; static hcs_main_t hcs_main; @@ -148,31 +167,52 @@ hcs_cli_output (uword arg, u8 *buffer, uword buffer_bytes) } static void -start_send_data (hcs_session_t *hs, http_status_code_t status, - http_content_type_t type) +start_send_data (hcs_session_t *hs, http_status_code_t status) { http_msg_t msg; session_t *ts; + u8 *headers_buf = 0; int rv; + if (vec_len (hs->resp_headers)) + { + headers_buf = http_serialize_headers (hs->resp_headers); + vec_free (hs->resp_headers); + msg.data.headers_offset = 0; + msg.data.headers_len = vec_len (headers_buf); + } + else + { + msg.data.headers_offset = 0; + msg.data.headers_len = 0; + } + msg.type = HTTP_MSG_REPLY; msg.code = status; - msg.content_type = type; msg.data.type = HTTP_MSG_DATA_INLINE; - msg.data.len = vec_len (hs->tx_buf); + msg.data.body_len = vec_len (hs->tx_buf); + msg.data.body_offset = msg.data.headers_len; + msg.data.len = msg.data.body_len + msg.data.headers_len; ts = session_get (hs->vpp_session_index, hs->thread_index); rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg); ASSERT (rv == sizeof (msg)); - if (!msg.data.len) + if (msg.data.headers_len) + { + rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf); + ASSERT (rv == msg.data.headers_len); + vec_free (headers_buf); + } + + if (!msg.data.body_len) goto done; rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (hs->tx_buf), hs->tx_buf); if (rv != vec_len (hs->tx_buf)) { - hs->tx_offset = rv; + hs->tx_offset = (rv > 0) ? rv : 0; svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } else @@ -183,7 +223,7 @@ start_send_data (hcs_session_t *hs, http_status_code_t status, done: if (svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); } static void @@ -203,7 +243,12 @@ send_data_to_http (void *rpc_args) hs->tx_buf = args->buf; if (args->plain_text) type = HTTP_CONTENT_TEXT_PLAIN; - start_send_data (hs, HTTP_STATUS_OK, type); + + http_add_header (&hs->resp_headers, + http_header_name_token (HTTP_HEADER_CONTENT_TYPE), + http_content_type_token (type)); + + start_send_data (hs, HTTP_STATUS_OK); cleanup: @@ -325,6 +370,7 @@ hcs_ts_rx_callback (session_t *ts) hs = hcs_session_get (ts->thread_index, ts->opaque); hs->tx_buf = 0; + hs->resp_headers = 0; /* Read the http message header */ rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); @@ -332,16 +378,17 @@ hcs_ts_rx_callback (session_t *ts) if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET) { - start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED, - HTTP_CONTENT_TEXT_HTML); + http_add_header (&hs->resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("GET")); + start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED); goto done; } if (msg.data.target_path_len == 0 || msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) { - hs->tx_buf = 0; - start_send_data (hs, HTTP_STATUS_BAD_REQUEST, HTTP_CONTENT_TEXT_HTML); + start_send_data (hs, HTTP_STATUS_BAD_REQUEST); goto done; } @@ -353,13 +400,13 @@ hcs_ts_rx_callback (session_t *ts) HCS_DBG ("%v", args.buf); if (http_validate_abs_path_syntax (args.buf, &is_encoded)) { - start_send_data (hs, HTTP_STATUS_BAD_REQUEST, HTTP_CONTENT_TEXT_HTML); + start_send_data (hs, HTTP_STATUS_BAD_REQUEST); vec_free (args.buf); goto done; } if (is_encoded) { - u8 *decoded = http_percent_decode (args.buf); + u8 *decoded = http_percent_decode (args.buf, vec_len (args.buf)); vec_free (args.buf); args.buf = decoded; } @@ -374,13 +421,13 @@ hcs_ts_rx_callback (session_t *ts) ASSERT (rv == msg.data.headers_len); if (http_parse_headers (headers, &ht)) { - start_send_data (hs, HTTP_STATUS_BAD_REQUEST, - HTTP_CONTENT_TEXT_HTML); + start_send_data (hs, HTTP_STATUS_BAD_REQUEST); vec_free (args.buf); vec_free (headers); goto done; } - const char *accept_value = http_get_header (ht, HTTP_HEADER_ACCEPT); + const char *accept_value = + http_get_header (ht, http_header_name_str (HTTP_HEADER_ACCEPT)); if (accept_value) { HCS_DBG ("client accept: %s", accept_value); @@ -438,7 +485,7 @@ hcs_ts_tx_callback (session_t *ts) } if (svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); return 0; } @@ -554,6 +601,11 @@ hcs_attach () hcm->fifo_size ? hcm->fifo_size : 32 << 10; a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN; a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos; + if (hcm->appns_id) + { + a->namespace_id = hcm->appns_id; + a->options[APP_OPTIONS_NAMESPACE_SECRET] = hcm->appns_secret; + } if (vnet_application_attach (a)) { @@ -588,15 +640,15 @@ hcs_listen () session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; hcs_main_t *hcm = &hcs_main; vnet_listen_args_t _a, *a = &_a; - char *uri = "tcp://0.0.0.0/80"; u8 need_crypto; int rv; + char *uri; clib_memset (a, 0, sizeof (*a)); a->app_index = hcm->app_index; - if (hcm->uri) - uri = (char *) hcm->uri; + uri = (char *) hcm->uri; + ASSERT (uri); if (parse_uri (uri, &sep)) return -1; @@ -608,15 +660,24 @@ hcs_listen () if (need_crypto) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = hcm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = hcm->ckpair_index; } rv = vnet_listen (a); + if (rv == 0) + { + hcs_uri_map_t *map; + pool_get_zero (hcm->uri_map_pool, map); + map->uri = vec_dup (uri); + map->handle = a->handle; + hash_set_mem (hcm->index_by_uri, map->uri, map - hcm->uri_map_pool); + } if (need_crypto) - clib_mem_free (a->sep_ext.ext_cfg); + session_endpoint_free_ext_cfgs (&a->sep_ext); return rv; } @@ -633,6 +694,43 @@ hcs_detach () } static int +hcs_unlisten () +{ + hcs_main_t *hcm = &hcs_main; + vnet_unlisten_args_t _a, *a = &_a; + char *uri; + int rv = 0; + uword *value; + + clib_memset (a, 0, sizeof (*a)); + a->app_index = hcm->app_index; + + uri = (char *) hcm->uri; + ASSERT (uri); + + value = hash_get_mem (hcm->index_by_uri, uri); + if (value) + { + hcs_uri_map_t *map = pool_elt_at_index (hcm->uri_map_pool, *value); + + a->handle = map->handle; + rv = vnet_unlisten (a); + if (rv == 0) + { + hash_unset_mem (hcm->index_by_uri, uri); + vec_free (map->uri); + pool_put (hcm->uri_map_pool, map); + if (pool_elts (hcm->uri_map_pool) == 0) + hcs_detach (); + } + } + else + return -1; + + return rv; +} + +static int hcs_create (vlib_main_t *vm) { vlib_thread_main_t *vtm = vlib_get_thread_main (); @@ -665,6 +763,8 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input, hcs_main_t *hcm = &hcs_main; u64 seg_size; int rv; + u32 listener_add = ~0; + clib_error_t *error = 0; hcm->prealloc_fifos = 0; hcm->private_segment_size = 0; @@ -683,13 +783,32 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input, hcm->private_segment_size = seg_size; else if (unformat (line_input, "fifo-size %d", &hcm->fifo_size)) hcm->fifo_size <<= 10; - else if (unformat (line_input, "uri %s", &hcm->uri)) + else if (unformat (line_input, "uri %_%v%_", &hcm->uri)) + ; + else if (unformat (line_input, "appns %_%v%_", &hcm->appns_id)) ; + else if (unformat (line_input, "secret %lu", &hcm->appns_secret)) + ; + else if (unformat (line_input, "listener")) + { + if (unformat (line_input, "add")) + listener_add = 1; + else if (unformat (line_input, "del")) + listener_add = 0; + else + { + unformat_free (line_input); + error = clib_error_return (0, "unknown input `%U'", + format_unformat_error, line_input); + goto done; + } + } else { unformat_free (line_input); - return clib_error_return (0, "unknown input `%U'", - format_unformat_error, line_input); + error = clib_error_return (0, "unknown input `%U'", + format_unformat_error, line_input); + goto done; } } @@ -697,10 +816,43 @@ hcs_create_command_fn (vlib_main_t *vm, unformat_input_t *input, start_server: + if (hcm->uri == 0) + hcm->uri = format (0, "tcp://0.0.0.0/80"); + if (hcm->app_index != (u32) ~0) - return clib_error_return (0, "test http server is already running"); + { + if (hcm->appns_id && (listener_add != ~0)) + { + error = clib_error_return ( + 0, "appns must not be specified for listener add/del"); + goto done; + } + if (listener_add == 1) + { + if (hcs_listen ()) + error = + clib_error_return (0, "failed to start listening %v", hcm->uri); + goto done; + } + else if (listener_add == 0) + { + rv = hcs_unlisten (); + if (rv != 0) + error = clib_error_return ( + 0, "failed to stop listening %v, rv = %d", hcm->uri, rv); + goto done; + } + else + { + error = clib_error_return (0, "test http server is already running"); + goto done; + } + } - vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ ); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); rv = hcs_create (vm); switch (rv) @@ -708,16 +860,23 @@ start_server: case 0: break; default: - return clib_error_return (0, "server_create returned %d", rv); + { + error = clib_error_return (0, "server_create returned %d", rv); + goto done; + } } - return 0; +done: + vec_free (hcm->appns_id); + vec_free (hcm->uri); + return error; } VLIB_CLI_COMMAND (hcs_create_command, static) = { .path = "http cli server", .short_help = "http cli server [uri <uri>] [fifo-size <nbytes>] " - "[private-segment-size <nMG>] [prealloc-fifos <n>]", + "[private-segment-size <nMG>] [prealloc-fifos <n>] " + "[listener <add|del>] [appns <app-ns> secret <appns-secret>]", .function = hcs_create_command_fn, }; @@ -728,6 +887,7 @@ hcs_main_init (vlib_main_t *vm) hcs->app_index = ~0; hcs->vlib_main = vm; + hcs->index_by_uri = hash_create_vec (0, sizeof (u8), sizeof (uword)); return 0; } diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c new file mode 100644 index 00000000000..05a87ec7de8 --- /dev/null +++ b/src/plugins/hs_apps/http_client.c @@ -0,0 +1,743 @@ +/* SPDX-License-Identifier: Apache-2.0 + * Copyright(c) 2024 Cisco Systems, Inc. + */ + +#include <vnet/session/application.h> +#include <vnet/session/application_interface.h> +#include <vnet/session/session.h> +#include <http/http.h> +#include <http/http_header_names.h> +#include <http/http_content_types.h> +#include <http/http_status_codes.h> +#include <vppinfra/unix.h> + +typedef struct +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + u32 session_index; + u32 thread_index; + u32 vpp_session_index; + u64 to_recv; + u8 is_closed; +} hc_session_t; + +typedef struct +{ + hc_session_t *sessions; + u32 thread_index; + vlib_main_t *vlib_main; +} hc_worker_t; + +typedef struct +{ + u32 app_index; + u32 cli_node_index; + u8 attached; + u8 *uri; + session_endpoint_cfg_t connect_sep; + u8 *target; + u8 *headers_buf; + u8 *data; + u64 data_offset; + hc_worker_t *wrk; + u8 *resp_headers; + u8 *http_response; + u8 *response_status; + http_header_ht_t *custom_header; + u8 is_file; + u8 use_ptr; + u8 *filename; + bool verbose; + f64 timeout; + http_req_method_t req_method; +} hc_main_t; + +typedef enum +{ + HC_CONNECT_FAILED = 1, + HC_TRANSPORT_CLOSED, + HC_REPLY_RECEIVED, +} hc_cli_signal_t; + +static hc_main_t hc_main; + +static inline hc_worker_t * +hc_worker_get (u32 thread_index) +{ + return &hc_main.wrk[thread_index]; +} + +static inline hc_session_t * +hc_session_get (u32 session_index, u32 thread_index) +{ + hc_worker_t *wrk = hc_worker_get (thread_index); + wrk->vlib_main = vlib_get_main_by_index (thread_index); + return pool_elt_at_index (wrk->sessions, session_index); +} + +static void +hc_ho_session_free (u32 hs_index) +{ + hc_worker_t *wrk = hc_worker_get (0); + pool_put_index (wrk->sessions, hs_index); +} + +static hc_session_t * +hc_session_alloc (hc_worker_t *wrk) +{ + hc_session_t *s; + + pool_get_zero (wrk->sessions, s); + s->session_index = s - wrk->sessions; + s->thread_index = wrk->thread_index; + + return s; +} + +static int +hc_session_connected_callback (u32 app_index, u32 hc_session_index, + session_t *s, session_error_t err) +{ + hc_main_t *hcm = &hc_main; + hc_session_t *hc_session, *new_hc_session; + hc_worker_t *wrk; + http_msg_t msg; + u64 to_send; + u32 n_enq; + u8 n_segs; + int rv; + http_header_ht_t *header; + http_header_t *req_headers = 0; + u32 new_hc_index; + + HTTP_DBG (1, "ho hc_index: %d", hc_session_index); + + if (err) + { + clib_warning ("hc_session_index[%d] connected error: %U", + hc_session_index, format_session_error, err); + vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index, + HC_CONNECT_FAILED, 0); + return -1; + } + + hc_session = hc_session_get (hc_session_index, 0); + wrk = hc_worker_get (s->thread_index); + new_hc_session = hc_session_alloc (wrk); + new_hc_index = new_hc_session->session_index; + clib_memcpy_fast (new_hc_session, hc_session, sizeof (*hc_session)); + hc_session->vpp_session_index = s->session_index; + + new_hc_session->session_index = new_hc_index; + new_hc_session->thread_index = s->thread_index; + new_hc_session->vpp_session_index = s->session_index; + HTTP_DBG (1, "new hc_index: %d", new_hc_session->session_index); + s->opaque = new_hc_index; + + if (hcm->req_method == HTTP_REQ_POST) + { + if (hcm->is_file) + http_add_header ( + &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE), + http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM)); + else + http_add_header ( + &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE), + http_content_type_token (HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED)); + } + + vec_foreach (header, hcm->custom_header) + http_add_header (&req_headers, (const char *) header->name, + vec_len (header->name), (const char *) header->value, + vec_len (header->value)); + + hcm->headers_buf = http_serialize_headers (req_headers); + vec_free (req_headers); + + msg.method_type = hcm->req_method; + if (hcm->req_method == HTTP_REQ_POST) + msg.data.body_len = vec_len (hcm->data); + else + msg.data.body_len = 0; + + msg.type = HTTP_MSG_REQUEST; + /* request target */ + msg.data.target_form = HTTP_TARGET_ORIGIN_FORM; + msg.data.target_path_len = vec_len (hcm->target); + /* custom headers */ + msg.data.headers_len = vec_len (hcm->headers_buf); + /* total length */ + msg.data.len = + msg.data.target_path_len + msg.data.headers_len + msg.data.body_len; + + if (hcm->use_ptr) + { + uword target = pointer_to_uword (hcm->target); + uword headers = pointer_to_uword (hcm->headers_buf); + uword body = pointer_to_uword (hcm->data); + msg.data.type = HTTP_MSG_DATA_PTR; + svm_fifo_seg_t segs[4] = { + { (u8 *) &msg, sizeof (msg) }, + { (u8 *) &target, sizeof (target) }, + { (u8 *) &headers, sizeof (headers) }, + { (u8 *) &body, sizeof (body) }, + }; + + n_segs = (hcm->req_method == HTTP_REQ_GET) ? 3 : 4; + rv = svm_fifo_enqueue_segments (s->tx_fifo, segs, n_segs, + 0 /* allow partial */); + if (hcm->req_method == HTTP_REQ_POST) + ASSERT (rv == (sizeof (msg) + sizeof (target) + sizeof (headers) + + sizeof (body))); + else + ASSERT (rv == (sizeof (msg) + sizeof (target) + sizeof (headers))); + goto done; + } + + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.target_path_offset = 0; + msg.data.headers_offset = msg.data.target_path_len; + msg.data.body_offset = msg.data.headers_offset + msg.data.headers_len; + + rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + + rv = svm_fifo_enqueue (s->tx_fifo, vec_len (hcm->target), hcm->target); + ASSERT (rv == vec_len (hcm->target)); + + rv = svm_fifo_enqueue (s->tx_fifo, vec_len (hcm->headers_buf), + hcm->headers_buf); + ASSERT (rv == msg.data.headers_len); + + if (hcm->req_method == HTTP_REQ_POST) + { + to_send = vec_len (hcm->data); + n_enq = clib_min (svm_fifo_size (s->tx_fifo), to_send); + + rv = svm_fifo_enqueue (s->tx_fifo, n_enq, hcm->data); + if (rv < to_send) + { + hcm->data_offset = (rv > 0) ? rv : 0; + svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + } + } + +done: + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); + + return 0; +} + +static void +hc_session_disconnect_callback (session_t *s) +{ + hc_main_t *hcm = &hc_main; + vnet_disconnect_args_t _a = { 0 }, *a = &_a; + int rv; + + a->handle = session_handle (s); + a->app_index = hcm->app_index; + if ((rv = vnet_disconnect_session (a))) + clib_warning ("warning: disconnect returned: %U", format_session_error, + rv); +} + +static void +hc_session_transport_closed_callback (session_t *s) +{ + hc_main_t *hcm = &hc_main; + vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index, + HC_TRANSPORT_CLOSED, 0); +} + +static void +hc_ho_cleanup_callback (session_t *ts) +{ + HTTP_DBG (1, "ho hc_index: %d:", ts->opaque); + hc_ho_session_free (ts->opaque); +} + +static void +hc_session_reset_callback (session_t *s) +{ + hc_main_t *hcm = &hc_main; + hc_session_t *hc_session; + vnet_disconnect_args_t _a = { 0 }, *a = &_a; + int rv; + + hc_session = hc_session_get (s->opaque, s->thread_index); + hc_session->is_closed = 1; + + a->handle = session_handle (s); + a->app_index = hcm->app_index; + if ((rv = vnet_disconnect_session (a))) + clib_warning ("warning: disconnect returned: %U", format_session_error, + rv); +} + +static int +hc_rx_callback (session_t *s) +{ + hc_main_t *hcm = &hc_main; + hc_session_t *hc_session; + http_msg_t msg; + int rv; + + hc_session = hc_session_get (s->opaque, s->thread_index); + + if (hc_session->is_closed) + { + clib_warning ("hc_session_index[%d] is closed", s->opaque); + return -1; + } + + if (hc_session->to_recv == 0) + { + rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + + if (msg.type != HTTP_MSG_REPLY) + { + clib_warning ("unexpected msg type %d", msg.type); + return -1; + } + + if (msg.data.headers_len) + { + http_header_table_t *ht; + vec_validate (hcm->resp_headers, msg.data.headers_len - 1); + rv = svm_fifo_peek (s->rx_fifo, msg.data.headers_offset, + msg.data.headers_len, hcm->resp_headers); + + ASSERT (rv == msg.data.headers_len); + HTTP_DBG (1, (char *) hcm->resp_headers); + + if (http_parse_headers (hcm->resp_headers, &ht)) + { + clib_warning ("invalid headers received"); + return -1; + } + http_free_header_table (ht); + + hcm->response_status = + format (0, "%U", format_http_status_code, msg.code); + } + + if (msg.data.body_len == 0) + { + svm_fifo_dequeue_drop_all (s->rx_fifo); + goto done; + } + + /* drop everything up to body */ + svm_fifo_dequeue_drop (s->rx_fifo, msg.data.body_offset); + hc_session->to_recv = msg.data.body_len; + if (msg.code != HTTP_STATUS_OK && hc_session->to_recv == 0) + { + goto done; + } + vec_validate (hcm->http_response, msg.data.body_len - 1); + vec_reset_length (hcm->http_response); + } + + u32 max_deq = svm_fifo_max_dequeue (s->rx_fifo); + + u32 n_deq = clib_min (hc_session->to_recv, max_deq); + u32 curr = vec_len (hcm->http_response); + rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hcm->http_response + curr); + if (rv < 0) + { + clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv); + return -1; + } + + ASSERT (rv == n_deq); + vec_set_len (hcm->http_response, curr + n_deq); + ASSERT (hc_session->to_recv >= rv); + hc_session->to_recv -= rv; + +done: + if (hc_session->to_recv == 0) + { + hc_session_disconnect_callback (s); + vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index, + HC_REPLY_RECEIVED, 0); + } + + return 0; +} + +static int +hc_tx_callback (session_t *s) +{ + hc_main_t *hcm = &hc_main; + u64 to_send; + int rv; + + to_send = vec_len (hcm->data) - hcm->data_offset; + rv = svm_fifo_enqueue (s->tx_fifo, to_send, hcm->data + hcm->data_offset); + + if (rv <= 0) + { + svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; + } + + if (rv < to_send) + { + hcm->data_offset += rv; + svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + } + + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); + + return 0; +} + +static session_cb_vft_t hc_session_cb_vft = { + .session_connected_callback = hc_session_connected_callback, + .session_disconnect_callback = hc_session_disconnect_callback, + .session_transport_closed_callback = hc_session_transport_closed_callback, + .session_reset_callback = hc_session_reset_callback, + .builtin_app_rx_callback = hc_rx_callback, + .builtin_app_tx_callback = hc_tx_callback, + .half_open_cleanup_callback = hc_ho_cleanup_callback, +}; + +static clib_error_t * +hc_attach () +{ + hc_main_t *hcm = &hc_main; + vnet_app_attach_args_t _a, *a = &_a; + u64 options[18]; + int rv; + + clib_memset (a, 0, sizeof (*a)); + clib_memset (options, 0, sizeof (options)); + + a->api_client_index = APP_INVALID_INDEX; + a->name = format (0, "http_client"); + a->session_cb_vft = &hc_session_cb_vft; + a->options = options; + a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN; + + if ((rv = vnet_application_attach (a))) + return clib_error_return (0, "attach returned: %U", format_session_error, + rv); + + hcm->app_index = a->app_index; + vec_free (a->name); + hcm->attached = 1; + + return 0; +} + +static int +hc_connect_rpc (void *rpc_args) +{ + vnet_connect_args_t *a = rpc_args; + int rv; + + rv = vnet_connect (a); + if (rv > 0) + clib_warning (0, "connect returned: %U", format_session_error, rv); + + vec_free (a); + return rv; +} + +static void +hc_connect () +{ + hc_main_t *hcm = &hc_main; + vnet_connect_args_t *a = 0; + hc_worker_t *wrk; + hc_session_t *hc_session; + + vec_validate (a, 0); + clib_memset (a, 0, sizeof (a[0])); + + clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); + a->app_index = hcm->app_index; + + /* allocate http session on main thread */ + wrk = hc_worker_get (0); + hc_session = hc_session_alloc (wrk); + a->api_context = hc_session->session_index; + + session_send_rpc_evt_to_thread_force (transport_cl_thread (), hc_connect_rpc, + a); +} + +static clib_error_t * +hc_run (vlib_main_t *vm) +{ + hc_main_t *hcm = &hc_main; + vlib_thread_main_t *vtm = vlib_get_thread_main (); + u32 num_threads; + hc_worker_t *wrk; + uword event_type, *event_data = 0; + clib_error_t *err; + FILE *file_ptr; + + num_threads = 1 /* main thread */ + vtm->n_threads; + vec_validate (hcm->wrk, num_threads - 1); + vec_foreach (wrk, hcm->wrk) + wrk->thread_index = wrk - hcm->wrk; + + if ((err = hc_attach ())) + return clib_error_return (0, "http client attach: %U", format_clib_error, + err); + + hc_connect (); + + vlib_process_wait_for_event_or_clock (vm, hcm->timeout); + event_type = vlib_process_get_events (vm, &event_data); + switch (event_type) + { + case ~0: + err = clib_error_return (0, "error: timeout"); + break; + case HC_CONNECT_FAILED: + err = clib_error_return (0, "error: failed to connect"); + break; + case HC_TRANSPORT_CLOSED: + err = clib_error_return (0, "error: transport closed"); + break; + case HC_REPLY_RECEIVED: + if (hcm->filename) + { + file_ptr = + fopen ((char *) format (0, "/tmp/%v", hcm->filename), "w"); + if (file_ptr == NULL) + { + vlib_cli_output (vm, "couldn't open file %v", hcm->filename); + } + else + { + fprintf (file_ptr, "< %s\n< %s\n< %s", hcm->response_status, + hcm->resp_headers, hcm->http_response); + fclose (file_ptr); + vlib_cli_output (vm, "file saved (/tmp/%v)", hcm->filename); + } + } + if (hcm->verbose) + vlib_cli_output (vm, "< %v\n< %v", hcm->response_status, + hcm->resp_headers); + vlib_cli_output (vm, "<\n%v", hcm->http_response); + + break; + default: + err = clib_error_return (0, "error: unexpected event %d", event_type); + break; + } + + vec_free (event_data); + return err; +} + +static int +hc_detach () +{ + hc_main_t *hcm = &hc_main; + vnet_app_detach_args_t _da, *da = &_da; + int rv; + + if (!hcm->attached) + return 0; + + da->app_index = hcm->app_index; + da->api_client_index = APP_INVALID_INDEX; + rv = vnet_application_detach (da); + hcm->attached = 0; + hcm->app_index = APP_INVALID_INDEX; + + return rv; +} + +static void +hcc_worker_cleanup (hc_worker_t *wrk) +{ + pool_free (wrk->sessions); +} + +static void +hc_cleanup () +{ + hc_main_t *hcm = &hc_main; + hc_worker_t *wrk; + http_header_ht_t *header; + + vec_foreach (wrk, hcm->wrk) + hcc_worker_cleanup (wrk); + + vec_free (hcm->uri); + vec_free (hcm->target); + vec_free (hcm->headers_buf); + vec_free (hcm->data); + vec_free (hcm->resp_headers); + vec_free (hcm->http_response); + vec_free (hcm->response_status); + vec_free (hcm->wrk); + vec_free (hcm->filename); + vec_foreach (header, hcm->custom_header) + { + vec_free (header->name); + vec_free (header->value); + } + vec_free (hcm->custom_header); +} + +static clib_error_t * +hc_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + hc_main_t *hcm = &hc_main; + clib_error_t *err = 0; + unformat_input_t _line_input, *line_input = &_line_input; + u8 *path = 0; + u8 *file_data; + http_header_ht_t new_header; + u8 *name; + u8 *value; + int rv; + hcm->timeout = 10; + + if (hcm->attached) + return clib_error_return (0, "failed: already running!"); + + hcm->use_ptr = 0; + + /* Get a line of input. */ + if (!unformat_user (input, unformat_line_input, line_input)) + return clib_error_return (0, "expected required arguments"); + + hcm->req_method = + (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) && + unformat (line_input, "post") ? + HTTP_REQ_POST : + HTTP_REQ_GET; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (line_input, "uri %s", &hcm->uri)) + ; + else if (unformat (line_input, "data %v", &hcm->data)) + hcm->is_file = 0; + else if (unformat (line_input, "target %s", &hcm->target)) + ; + else if (unformat (line_input, "file %s", &path)) + hcm->is_file = 1; + else if (unformat (line_input, "use-ptr")) + hcm->use_ptr = 1; + else if (unformat (line_input, "save-to %s", &hcm->filename)) + { + if (strstr ((char *) hcm->filename, "..") || + strchr ((char *) hcm->filename, '/')) + { + err = clib_error_return ( + 0, "illegal characters in filename '%v'", hcm->filename); + goto done; + } + } + else if (unformat (line_input, "header %v:%v", &name, &value)) + { + new_header.name = name; + new_header.value = value; + vec_add1 (hcm->custom_header, new_header); + } + else if (unformat (line_input, "verbose")) + hcm->verbose = true; + else if (unformat (line_input, "timeout %f", &hcm->timeout)) + ; + else + { + err = clib_error_return (0, "unknown input `%U'", + format_unformat_error, line_input); + goto done; + } + } + + if (!hcm->uri) + { + err = clib_error_return (0, "URI not defined"); + goto done; + } + if (!hcm->target) + { + err = clib_error_return (0, "target not defined"); + goto done; + } + if (!hcm->data && hcm->req_method == HTTP_REQ_POST) + { + if (path) + { + err = clib_file_contents ((char *) path, &file_data); + if (err) + goto done; + hcm->data = file_data; + } + else + { + err = clib_error_return (0, "data not defined"); + goto done; + } + } + + if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep))) + { + err = + clib_error_return (0, "URI parse error: %U", format_session_error, rv); + goto done; + } + + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vlib_worker_thread_barrier_sync (vm); + vnet_session_enable_disable (vm, &args); + vlib_worker_thread_barrier_release (vm); + + hcm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index; + + err = hc_run (vm); + + if ((rv = hc_detach ())) + { + /* don't override last error */ + if (!err) + err = clib_error_return (0, "detach returned: %U", + format_session_error, rv); + else + clib_warning ("warning: detach returned: %U", format_session_error, + rv); + } + +done: + vec_free (path); + hc_cleanup (); + unformat_free (line_input); + return err; +} + +VLIB_CLI_COMMAND (hc_command, static) = { + .path = "http client", + .short_help = "[post] uri http://<ip-addr> target <origin-form> " + "[data <form-urlencoded> | file <file-path>] [use-ptr] " + "[save-to <filename>] [header <Key:Value>] [verbose] " + "[timeout <seconds> (default = 10)]", + .function = hc_command_fn, + .is_mp_safe = 1, +}; + +static clib_error_t * +hc_main_init () +{ + hc_main_t *hcm = &hc_main; + hcm->app_index = APP_INVALID_INDEX; + return 0; +} + +VLIB_INIT_FUNCTION (hc_main_init); diff --git a/src/plugins/hs_apps/http_client_cli.c b/src/plugins/hs_apps/http_client_cli.c index a99169bafea..861af7f03e2 100644 --- a/src/plugins/hs_apps/http_client_cli.c +++ b/src/plugins/hs_apps/http_client_cli.c @@ -16,6 +16,9 @@ #include <vnet/session/application_interface.h> #include <vnet/session/session.h> #include <http/http.h> +#include <http/http_header_names.h> +#include <http/http_content_types.h> +#include <http/http_status_codes.h> #define HCC_DEBUG 0 @@ -32,14 +35,14 @@ typedef struct u32 thread_index; u32 rx_offset; u32 vpp_session_index; - u32 to_recv; + u64 to_recv; u8 is_closed; + http_header_t *req_headers; } hcc_session_t; typedef struct { hcc_session_t *sessions; - u8 *rx_buf; u32 thread_index; } hcc_worker_t; @@ -96,10 +99,10 @@ hcc_session_get (u32 hs_index, u32 thread_index) } static void -hcc_session_free (u32 thread_index, hcc_session_t *hs) +hcc_ho_session_free (u32 hs_index) { - hcc_worker_t *wrk = hcc_worker_get (thread_index); - pool_put (wrk->sessions, hs); + hcc_worker_t *wrk = hcc_worker_get (0); + pool_put_index (wrk->sessions, hs_index); } static int @@ -128,9 +131,11 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as, hcc_session_t *hs, *new_hs; hcc_worker_t *wrk; http_msg_t msg; + u8 *headers_buf; + u32 new_hs_index; int rv; - HCC_DBG ("hc_index: %d", hc_index); + HCC_DBG ("ho hc_index: %d", hc_index); if (err) { @@ -141,32 +146,53 @@ hcc_ts_connected_callback (u32 app_index, u32 hc_index, session_t *as, return -1; } - /* TODO delete half open session once the support is added in http layer */ hs = hcc_session_get (hc_index, 0); wrk = hcc_worker_get (as->thread_index); new_hs = hcc_session_alloc (wrk); + new_hs_index = new_hs->session_index; clib_memcpy_fast (new_hs, hs, sizeof (*hs)); - - hs->vpp_session_index = as->session_index; + new_hs->session_index = new_hs_index; + new_hs->thread_index = as->thread_index; + new_hs->vpp_session_index = as->session_index; + HCC_DBG ("new hc_index: %d", new_hs->session_index); + as->opaque = new_hs_index; + + http_add_header (&new_hs->req_headers, + http_header_name_token (HTTP_HEADER_ACCEPT), + http_content_type_token (HTTP_CONTENT_TEXT_HTML)); + headers_buf = http_serialize_headers (new_hs->req_headers); + vec_free (new_hs->req_headers); msg.type = HTTP_MSG_REQUEST; msg.method_type = HTTP_REQ_GET; - msg.content_type = HTTP_CONTENT_TEXT_HTML; + /* request target */ + msg.data.target_form = HTTP_TARGET_ORIGIN_FORM; + msg.data.target_path_offset = 0; + msg.data.target_path_len = vec_len (hcm->http_query); + /* custom headers */ + msg.data.headers_offset = msg.data.target_path_len; + msg.data.headers_len = vec_len (headers_buf); + /* request body */ + msg.data.body_len = 0; + /* data type and total length */ msg.data.type = HTTP_MSG_DATA_INLINE; - msg.data.len = vec_len (hcm->http_query); + msg.data.len = + msg.data.target_path_len + msg.data.headers_len + msg.data.body_len; - svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) }, - { hcm->http_query, vec_len (hcm->http_query) } }; + svm_fifo_seg_t segs[3] = { { (u8 *) &msg, sizeof (msg) }, + { hcm->http_query, vec_len (hcm->http_query) }, + { headers_buf, vec_len (headers_buf) } }; - rv = svm_fifo_enqueue_segments (as->tx_fifo, segs, 2, 0 /* allow partial */); - if (rv < 0 || rv != sizeof (msg) + vec_len (hcm->http_query)) + rv = svm_fifo_enqueue_segments (as->tx_fifo, segs, 3, 0 /* allow partial */); + vec_free (headers_buf); + if (rv < 0 || rv != sizeof (msg) + msg.data.len) { clib_warning ("failed app enqueue"); return -1; } if (svm_fifo_set_event (as->tx_fifo)) - session_send_io_evt_to_thread (as->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (as->handle, SESSION_IO_EVT_TX); return 0; } @@ -221,23 +247,32 @@ hcc_ts_rx_callback (session_t *ts) if (hs->to_recv == 0) { + /* read the http message header */ rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); ASSERT (rv == sizeof (msg)); - if (msg.type != HTTP_MSG_REPLY || msg.code != HTTP_STATUS_OK) + if (msg.type != HTTP_MSG_REPLY) { clib_warning ("unexpected msg type %d", msg.type); return 0; } - vec_validate (hcm->http_response, msg.data.len - 1); + /* drop everything up to body */ + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset); + hs->to_recv = msg.data.body_len; + if (msg.code != HTTP_STATUS_OK && hs->to_recv == 0) + { + hcm->http_response = format (0, "request failed, response code: %U", + format_http_status_code, msg.code); + goto done; + } + vec_validate (hcm->http_response, msg.data.body_len - 1); vec_reset_length (hcm->http_response); - hs->to_recv = msg.data.len; } u32 max_deq = svm_fifo_max_dequeue (ts->rx_fifo); u32 n_deq = clib_min (hs->to_recv, max_deq); - u32 curr = vec_len (hcm->http_response); + u64 curr = vec_len (hcm->http_response); rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hcm->http_response + curr); if (rv < 0) { @@ -251,10 +286,12 @@ hcc_ts_rx_callback (session_t *ts) vec_set_len (hcm->http_response, curr + n_deq); ASSERT (hs->to_recv >= rv); hs->to_recv -= rv; - HCC_DBG ("app rcvd %d, remains %d", rv, hs->to_recv); + HCC_DBG ("app rcvd %d, remains %llu", rv, hs->to_recv); +done: if (hs->to_recv == 0) { + HCC_DBG ("all data received, going to disconnect"); hcc_session_disconnect (ts); vlib_process_signal_event_mt (hcm->vlib_main, hcm->cli_node_index, HCC_REPLY_RECEIVED, 0); @@ -264,18 +301,6 @@ hcc_ts_rx_callback (session_t *ts) } static void -hcc_ts_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) -{ - hcc_session_t *hs; - - hs = hcc_session_get (s->thread_index, s->opaque); - if (!hs) - return; - - hcc_session_free (s->thread_index, hs); -} - -static void hcc_ts_transport_closed (session_t *s) { hcc_main_t *hcm = &hcc_main; @@ -286,6 +311,13 @@ hcc_ts_transport_closed (session_t *s) HCC_TRANSPORT_CLOSED, 0); } +static void +hcc_ho_cleanup_callback (session_t *ts) +{ + HCC_DBG ("ho hc_index: %d:", ts->opaque); + hcc_ho_session_free (ts->opaque); +} + static session_cb_vft_t hcc_session_cb_vft = { .session_accept_callback = hcc_ts_accept_callback, .session_disconnect_callback = hcc_ts_disconnect_callback, @@ -293,8 +325,8 @@ static session_cb_vft_t hcc_session_cb_vft = { .builtin_app_rx_callback = hcc_ts_rx_callback, .builtin_app_tx_callback = hcc_ts_tx_callback, .session_reset_callback = hcc_ts_reset_callback, - .session_cleanup_callback = hcc_ts_cleanup_callback, .session_transport_closed_callback = hcc_ts_transport_closed, + .half_open_cleanup_callback = hcc_ho_cleanup_callback, }; static clib_error_t * @@ -349,6 +381,7 @@ hcc_connect_rpc (void *rpc_args) if (rv) clib_warning (0, "connect returned: %U", format_session_error, rv); + session_endpoint_free_ext_cfgs (&a->sep_ext); vec_free (a); return rv; } @@ -367,6 +400,7 @@ hcc_connect () hcc_main_t *hcm = &hcc_main; hcc_worker_t *wrk; hcc_session_t *hs; + transport_endpt_ext_cfg_t *ext_cfg; vec_validate (a, 0); clib_memset (a, 0, sizeof (a[0])); @@ -374,6 +408,11 @@ hcc_connect () clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); a->app_index = hcm->app_index; + /* set http (response) timeout to 10 seconds */ + ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (ext_cfg->opaque)); + ext_cfg->opaque = 10; + /* allocate http session on main thread */ wrk = hcc_worker_get (0); hs = hcc_session_alloc (wrk); @@ -394,7 +433,7 @@ hcc_run (vlib_main_t *vm, int print_output) hcc_worker_t *wrk; num_threads = 1 /* main thread */ + vtm->n_threads; - vec_validate (hcm->wrk, num_threads); + vec_validate (hcm->wrk, num_threads - 1); vec_foreach (wrk, hcm->wrk) { wrk->thread_index = wrk - hcm->wrk; @@ -423,7 +462,6 @@ hcc_run (vlib_main_t *vm, int print_output) case HCC_REPLY_RECEIVED: if (print_output) vlib_cli_output (vm, "%v", hcm->http_response); - vec_free (hcm->http_response); break; case HCC_TRANSPORT_CLOSED: err = clib_error_return (0, "error, transport closed"); @@ -460,6 +498,28 @@ hcc_detach () return rv; } +static void +hcc_worker_cleanup (hcc_worker_t *wrk) +{ + pool_free (wrk->sessions); +} + +static void +hcc_cleanup () +{ + hcc_main_t *hcm = &hcc_main; + hcc_worker_t *wrk; + + vec_foreach (wrk, hcm->wrk) + hcc_worker_cleanup (wrk); + + vec_free (hcm->uri); + vec_free (hcm->http_query); + vec_free (hcm->http_response); + vec_free (hcm->appns_id); + vec_free (hcm->wrk); +} + static clib_error_t * hcc_command_fn (vlib_main_t *vm, unformat_input_t *input, vlib_cli_command_t *cmd) @@ -509,7 +569,6 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input, } } - vec_free (hcm->appns_id); hcm->appns_id = appns_id; hcm->cli_node_index = vlib_get_current_process (vm)->node_runtime.node_index; @@ -525,8 +584,11 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; vlib_worker_thread_barrier_sync (vm); - vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */); + vnet_session_enable_disable (vm, &args); vlib_worker_thread_barrier_release (vm); err = hcc_run (vm, print_output); @@ -540,8 +602,7 @@ hcc_command_fn (vlib_main_t *vm, unformat_input_t *input, } done: - vec_free (hcm->uri); - vec_free (hcm->http_query); + hcc_cleanup (); unformat_free (line_input); return err; } diff --git a/src/plugins/hs_apps/http_tps.c b/src/plugins/hs_apps/http_tps.c index 3a086501f86..a40a31caf63 100644 --- a/src/plugins/hs_apps/http_tps.c +++ b/src/plugins/hs_apps/http_tps.c @@ -17,6 +17,10 @@ #include <vnet/session/application_interface.h> #include <vnet/session/session.h> #include <http/http.h> +#include <http/http_header_names.h> +#include <http/http_content_types.h> + +#define HTS_RX_BUF_SIZE (64 << 10) typedef struct { @@ -26,6 +30,8 @@ typedef struct u64 data_len; u64 data_offset; u32 vpp_session_index; + u64 left_recv; + u64 total_recv; union { /** threshold after which connection is closed */ @@ -34,6 +40,8 @@ typedef struct u32 close_rate; }; u8 *uri; + u8 *rx_buf; + http_header_t *resp_headers; } hts_session_t; typedef struct hts_listen_cfg_ @@ -102,6 +110,8 @@ hts_session_free (hts_session_t *hs) if (htm->debug_level > 0) clib_warning ("Freeing session %u", hs->session_index); + vec_free (hs->rx_buf); + if (CLIB_DEBUG) clib_memset (hs, 0xfa, sizeof (*hs)); @@ -151,7 +161,7 @@ hts_session_tx_zc (hts_session_t *hs, session_t *ts) svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); } static void @@ -198,7 +208,7 @@ hts_session_tx_no_zc (hts_session_t *hs, session_t *ts) svm_fifo_add_want_deq_ntf (ts->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); if (svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); } static inline void @@ -223,22 +233,46 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status) { http_msg_t msg; session_t *ts; + u8 *headers_buf = 0; + u32 n_segs = 1; + svm_fifo_seg_t seg[2]; int rv; + if (vec_len (hs->resp_headers)) + { + headers_buf = http_serialize_headers (hs->resp_headers); + vec_free (hs->resp_headers); + msg.data.headers_offset = 0; + msg.data.headers_len = vec_len (headers_buf); + seg[1].data = headers_buf; + seg[1].len = msg.data.headers_len; + n_segs = 2; + } + else + { + msg.data.headers_offset = 0; + msg.data.headers_len = 0; + } + msg.type = HTTP_MSG_REPLY; msg.code = status; - msg.content_type = HTTP_CONTENT_APP_OCTET_STREAM; msg.data.type = HTTP_MSG_DATA_INLINE; - msg.data.len = hs->data_len; + msg.data.body_len = hs->data_len; + msg.data.body_offset = msg.data.headers_len; + msg.data.len = msg.data.body_len + msg.data.headers_len; + seg[0].data = (u8 *) &msg; + seg[0].len = sizeof (msg); ts = session_get (hs->vpp_session_index, hs->thread_index); - rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg); - ASSERT (rv == sizeof (msg)); + rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs, + 0 /* allow partial */); + vec_free (headers_buf); + ASSERT (rv == (sizeof (msg) + msg.data.headers_len)); - if (!msg.data.len) + if (!msg.data.body_len) { if (svm_fifo_set_event (ts->tx_fifo)) - session_send_io_evt_to_thread (ts->tx_fifo, SESSION_IO_EVT_TX); + session_program_tx_io_evt (ts->handle, SESSION_IO_EVT_TX); return; } @@ -286,6 +320,10 @@ try_test_file (hts_session_t *hs, u8 *target) } } + http_add_header (&hs->resp_headers, + http_header_name_token (HTTP_HEADER_CONTENT_TYPE), + http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM)); + hts_start_send_data (hs, HTTP_STATUS_OK); done: @@ -294,6 +332,40 @@ done: return rc; } +static inline void +hts_session_rx_body (hts_session_t *hs, session_t *ts) +{ + hts_main_t *htm = &hts_main; + u32 n_deq; + int rv; + + n_deq = svm_fifo_max_dequeue (ts->rx_fifo); + if (!htm->no_zc) + { + svm_fifo_dequeue_drop_all (ts->rx_fifo); + } + else + { + n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE); + rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf); + ASSERT (rv == n_deq); + } + hs->left_recv -= n_deq; + + if (hs->close_threshold > 0) + { + if ((f64) (hs->total_recv - hs->left_recv) / hs->total_recv > + hs->close_threshold) + hts_disconnect_transport (hs); + } + + if (hs->left_recv == 0) + { + hts_start_send_data (hs, HTTP_STATUS_OK); + vec_free (hs->rx_buf); + } +} + static int hts_ts_rx_callback (session_t *ts) { @@ -305,38 +377,77 @@ hts_ts_rx_callback (session_t *ts) hs = hts_session_get (ts->thread_index, ts->opaque); - /* Read the http message header */ - rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); - ASSERT (rv == sizeof (msg)); - - if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET) + if (hs->left_recv == 0) { - hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED); - goto done; - } + hs->data_len = 0; + hs->resp_headers = 0; + hs->rx_buf = 0; - if (msg.data.target_path_len == 0 || - msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) - { - hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); - goto done; - } + /* Read the http message header */ + rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); - vec_validate (target, msg.data.target_path_len - 1); - rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset, - msg.data.target_path_len, target); - ASSERT (rv == msg.data.target_path_len); + if (msg.type != HTTP_MSG_REQUEST) + { + hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR); + goto done; + } + if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST) + { + http_add_header (&hs->resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("GET, POST")); + hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED); + goto done; + } - if (htm->debug_level) - clib_warning ("Request target: %v", target); + if (msg.data.target_path_len == 0 || + msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) + { + hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); + goto done; + } - if (try_test_file (hs, target)) - hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND); + vec_validate (target, msg.data.target_path_len - 1); + rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset, + msg.data.target_path_len, target); + ASSERT (rv == msg.data.target_path_len); - vec_free (target); + if (htm->debug_level) + clib_warning ("%s request target: %v", + msg.method_type == HTTP_REQ_GET ? "GET" : "POST", + target); + + if (msg.method_type == HTTP_REQ_GET) + { + if (try_test_file (hs, target)) + hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND); + vec_free (target); + } + else + { + vec_free (target); + if (!msg.data.body_len) + { + hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST); + goto done; + } + /* drop everything up to body */ + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset); + hs->left_recv = msg.data.body_len; + hs->total_recv = msg.data.body_len; + if (htm->no_zc) + vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1); + hts_session_rx_body (hs, ts); + return 0; + } + + done: + svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len); + } + else + hts_session_rx_body (hs, ts); -done: - svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len); return 0; } @@ -363,6 +474,7 @@ hts_ts_accept_callback (session_t *ts) hs = hts_session_alloc (ts->thread_index); hs->vpp_session_index = ts->session_index; + hs->left_recv = 0; ts->opaque = hs->session_index; ts->session_state = SESSION_STATE_READY; @@ -529,15 +641,16 @@ hts_start_listen (hts_main_t *htm, session_endpoint_cfg_t *sep, u8 *uri, if (need_crypto) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = htm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = htm->ckpair_index; } rv = vnet_listen (a); if (need_crypto) - clib_mem_free (a->sep_ext.ext_cfg); + session_endpoint_free_ext_cfgs (&a->sep_ext); if (rv) return rv; @@ -726,7 +839,10 @@ start_server: if (htm->app_index == (u32) ~0) { - vnet_session_enable_disable (vm, 1 /* is_enable */); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); if (hts_create (vm)) { diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index e8fedf921a5..d7fe6fb54df 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -19,50 +19,145 @@ #include <vnet/session/application_interface.h> #include <hs_apps/proxy.h> #include <vnet/tcp/tcp.h> +#include <http/http.h> +#include <http/http_header_names.h> proxy_main_t proxy_main; #define TCP_MSS 1460 -typedef struct +static proxy_session_side_ctx_t * +proxy_session_side_ctx_alloc (proxy_worker_t *wrk) { - session_endpoint_cfg_t sep; - u32 app_index; - u32 api_context; -} proxy_connect_args_t; + proxy_session_side_ctx_t *ctx; + + pool_get_zero (wrk->ctx_pool, ctx); + ctx->sc_index = ctx - wrk->ctx_pool; + ctx->ps_index = ~0; + + return ctx; +} static void -proxy_cb_fn (void *data, u32 data_len) +proxy_session_side_ctx_free (proxy_worker_t *wrk, + proxy_session_side_ctx_t *ctx) { - proxy_connect_args_t *pa = (proxy_connect_args_t *) data; - vnet_connect_args_t a; + pool_put (wrk->ctx_pool, ctx); +} - clib_memset (&a, 0, sizeof (a)); - a.api_context = pa->api_context; - a.app_index = pa->app_index; - clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep)); - vnet_connect (&a); - if (a.sep_ext.ext_cfg) - clib_mem_free (a.sep_ext.ext_cfg); +static proxy_session_side_ctx_t * +proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index) +{ + return pool_elt_at_index (wrk->ctx_pool, ctx_index); } static void -proxy_call_main_thread (vnet_connect_args_t * a) +proxy_send_http_resp (session_t *s, http_status_code_t sc, + http_header_t *resp_headers) { - if (vlib_get_thread_index () == 0) + http_msg_t msg; + int rv; + u8 *headers_buf = 0; + + if (vec_len (resp_headers)) { - vnet_connect (a); - if (a->sep_ext.ext_cfg) - clib_mem_free (a->sep_ext.ext_cfg); + headers_buf = http_serialize_headers (resp_headers); + msg.data.len = msg.data.headers_len = vec_len (headers_buf); } else + msg.data.len = msg.data.headers_len = 0; + + msg.type = HTTP_MSG_REPLY; + msg.code = sc; + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.headers_offset = 0; + msg.data.body_len = 0; + msg.data.body_offset = 0; + rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + if (msg.data.headers_len) + { + rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf); + ASSERT (rv == vec_len (headers_buf)); + vec_free (headers_buf); + } + + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); +} + +static void +proxy_do_connect (vnet_connect_args_t *a) +{ + ASSERT (session_vlib_thread_is_cl_thread ()); + vnet_connect (a); + session_endpoint_free_ext_cfgs (&a->sep_ext); +} + +static void +proxy_handle_connects_rpc (void *args) +{ + u32 thread_index = pointer_to_uword (args), n_connects = 0, n_pending; + proxy_worker_t *wrk; + u32 max_connects; + + wrk = proxy_worker_get (thread_index); + + clib_spinlock_lock (&wrk->pending_connects_lock); + + n_pending = clib_fifo_elts (wrk->pending_connects); + max_connects = clib_min (32, n_pending); + vec_validate (wrk->burst_connects, max_connects); + + while (n_connects < max_connects) + clib_fifo_sub1 (wrk->pending_connects, wrk->burst_connects[n_connects++]); + + clib_spinlock_unlock (&wrk->pending_connects_lock); + + /* Do connects without locking pending_connects */ + n_connects = 0; + while (n_connects < max_connects) { - proxy_connect_args_t args; - args.api_context = a->api_context; - args.app_index = a->app_index; - clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext)); - vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args)); + proxy_do_connect (&wrk->burst_connects[n_connects]); + n_connects += 1; } + + /* More work to do, program rpc */ + if (max_connects < n_pending) + session_send_rpc_evt_to_thread_force ( + transport_cl_thread (), proxy_handle_connects_rpc, + uword_to_pointer ((uword) thread_index, void *)); +} + +static void +proxy_program_connect (vnet_connect_args_t *a) +{ + u32 connects_thread = transport_cl_thread (), thread_index, n_pending; + proxy_worker_t *wrk; + + thread_index = vlib_get_thread_index (); + + /* If already on first worker, handle request */ + if (thread_index == connects_thread) + { + proxy_do_connect (a); + return; + } + + /* If not on first worker, queue request */ + wrk = proxy_worker_get (thread_index); + + clib_spinlock_lock (&wrk->pending_connects_lock); + + clib_fifo_add1 (wrk->pending_connects, *a); + n_pending = clib_fifo_elts (wrk->pending_connects); + + clib_spinlock_unlock (&wrk->pending_connects_lock); + + if (n_pending == 1) + session_send_rpc_evt_to_thread_force ( + connects_thread, proxy_handle_connects_rpc, + uword_to_pointer ((uword) thread_index, void *)); } static proxy_session_t * @@ -85,16 +180,6 @@ proxy_session_get (u32 ps_index) return pool_elt_at_index (pm->sessions, ps_index); } -static inline proxy_session_t * -proxy_session_get_if_valid (u32 ps_index) -{ - proxy_main_t *pm = &proxy_main; - - if (pool_is_free_index (pm->sessions, ps_index)) - return 0; - return pool_elt_at_index (pm->sessions, ps_index); -} - static void proxy_session_free (proxy_session_t *ps) { @@ -115,7 +200,7 @@ proxy_session_postponed_free_rpc (void *arg) clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (ps_index); - segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo); + segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo); proxy_session_free (ps); clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -126,54 +211,79 @@ proxy_session_postponed_free_rpc (void *arg) static void proxy_session_postponed_free (proxy_session_t *ps) { - session_send_rpc_evt_to_thread (ps->po_thread_index, + /* Passive open session handle has been invalidated so we don't have thread + * index at this point */ + session_send_rpc_evt_to_thread (ps->po.rx_fifo->master_thread_index, proxy_session_postponed_free_rpc, uword_to_pointer (ps->ps_index, void *)); } static void +proxy_session_close_po (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->po.session_handle; + a->app_index = pm->server_app_index; + vnet_disconnect_session (a); + + ps->po_disconnected = 1; +} + +static void +proxy_session_close_ao (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->ao.session_handle; + a->app_index = pm->active_open_app_index; + vnet_disconnect_session (a); + + ps->ao_disconnected = 1; +} + +static void proxy_try_close_session (session_t * s, int is_active_open) { proxy_main_t *pm = &proxy_main; - proxy_session_t *ps = 0; - vnet_disconnect_args_t _a, *a = &_a; + proxy_session_side_ctx_t *sc; + proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); if (is_active_open) { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - ps->ao_disconnected = 1; + proxy_session_close_ao (ps); if (!ps->po_disconnected) { - ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + ASSERT (ps->po.session_handle != SESSION_INVALID_HANDLE); + proxy_session_close_po (ps); } } else { - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + proxy_session_close_po (ps); if (!ps->ao_disconnected && !ps->active_open_establishing) { /* Proxy session closed before active open */ - if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE) - { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - } + if (ps->ao.session_handle != SESSION_INVALID_HANDLE) + proxy_session_close_ao (ps); ps->ao_disconnected = 1; } } @@ -181,29 +291,63 @@ proxy_try_close_session (session_t * s, int is_active_open) } static void +proxy_try_side_ctx_cleanup (session_t *s) +{ + proxy_main_t *pm = &proxy_main; + proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + if (sc->state == PROXY_SC_S_CREATED) + return; + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + + if (!ps->po_disconnected) + proxy_session_close_po (ps); + + if (!ps->ao_disconnected) + proxy_session_close_ao (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); +} + +static void proxy_try_delete_session (session_t * s, u8 is_active_open) { proxy_main_t *pm = &proxy_main; proxy_session_t *ps = 0; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + u32 ps_index; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + ps_index = sc->ps_index; + + proxy_session_side_ctx_free (wrk, sc); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (ps_index); if (is_active_open) { - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; + ps->ao.session_handle = SESSION_INVALID_HANDLE; /* Revert master thread index change on connect notification */ - ps->server_rx_fifo->master_thread_index = ps->po_thread_index; + ps->po.rx_fifo->master_thread_index = + ps->po.tx_fifo->master_thread_index; /* Passive open already cleaned up */ - if (ps->vpp_server_handle == SESSION_INVALID_HANDLE) + if (ps->po.session_handle == SESSION_INVALID_HANDLE) { - ASSERT (s->rx_fifo->refcnt == 1); - /* The two sides of the proxy on different threads */ - if (ps->po_thread_index != s->thread_index) + if (ps->po.tx_fifo->master_thread_index != s->thread_index) { /* This is not the right thread to delete the fifos */ s->rx_fifo = 0; @@ -211,14 +355,17 @@ proxy_try_delete_session (session_t * s, u8 is_active_open) proxy_session_postponed_free (ps); } else - proxy_session_free (ps); + { + ASSERT (s->rx_fifo->refcnt == 1); + proxy_session_free (ps); + } } } else { - ps->vpp_server_handle = SESSION_INVALID_HANDLE; + ps->po.session_handle = SESSION_INVALID_HANDLE; - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) + if (ps->ao.session_handle == SESSION_INVALID_HANDLE) { if (!ps->active_open_establishing) proxy_session_free (ps); @@ -275,16 +422,26 @@ static int proxy_accept_callback (session_t * s) { proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; proxy_session_t *ps; + proxy_worker_t *wrk; + transport_proto_t tp = session_get_transport_proto (s); + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_alloc (wrk); + s->opaque = sc->sc_index; clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_alloc (); - ps->vpp_server_handle = session_handle (s); - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; - ps->po_thread_index = s->thread_index; - s->opaque = ps->ps_index; + ps->po.session_handle = session_handle (s); + ps->po.rx_fifo = s->rx_fifo; + ps->po.tx_fifo = s->tx_fifo; + + ps->ao.session_handle = SESSION_INVALID_HANDLE; + sc->ps_index = ps->ps_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -325,92 +482,167 @@ proxy_transport_needs_crypto (transport_proto_t proto) return proto == TRANSPORT_PROTO_TLS; } -static int -proxy_rx_callback (session_t * s) +static void +proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s) { + int actual_transfer __attribute__ ((unused)); + vnet_connect_args_t _a = {}, *a = &_a; proxy_main_t *pm = &proxy_main; - u32 thread_index = vlib_get_thread_index (); - svm_fifo_t *ao_tx_fifo; + u32 max_dequeue, ps_index; proxy_session_t *ps; - - ASSERT (s->thread_index == thread_index); + transport_proto_t tp = session_get_transport_proto (s); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); - if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)) + /* maybe we were already here */ + if (ps->active_open_establishing) { clib_spinlock_unlock_if_init (&pm->sessions_lock); + return; + } - ao_tx_fifo = s->rx_fifo; + ps->active_open_establishing = 1; + ps_index = ps->ps_index; - /* - * Send event for active open tx fifo - */ - if (svm_fifo_set_event (ao_tx_fifo)) + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + if (tp == TRANSPORT_PROTO_HTTP) + { + http_msg_t msg; + u8 *target_buf = 0; + http_uri_t target_uri; + http_header_t *resp_headers = 0; + session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL; + int rv; + + rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + + if (msg.type != HTTP_MSG_REQUEST) + { + proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0); + return; + } + if (msg.method_type != HTTP_REQ_CONNECT) + { + http_add_header (&resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("CONNECT")); + proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED, + resp_headers); + vec_free (resp_headers); + return; + } + + if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM || + msg.data.target_path_len == 0) { - u32 ao_thread_index = ao_tx_fifo->master_thread_index; - u32 ao_session_index = ao_tx_fifo->shr->master_session_index; - if (session_send_io_evt_to_thread_custom (&ao_session_index, - ao_thread_index, - SESSION_IO_EVT_TX)) - clib_warning ("failed to enqueue tx evt"); + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; } - if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) - svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + /* read target uri */ + target_buf = vec_new (u8, msg.data.target_path_len); + rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset, + msg.data.target_path_len, target_buf); + ASSERT (rv == msg.data.target_path_len); + svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len); + rv = http_parse_authority_form_target (target_buf, &target_uri); + vec_free (target_buf); + if (rv) + { + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; + } + target_sep.is_ip4 = target_uri.is_ip4; + target_sep.ip = target_uri.ip; + target_sep.port = target_uri.port; + target_sep.transport_proto = TRANSPORT_PROTO_TCP; + clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep)); } else { - vnet_connect_args_t _a, *a = &_a; - svm_fifo_t *tx_fifo, *rx_fifo; - u32 max_dequeue, ps_index; - int actual_transfer __attribute__ ((unused)); + max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (max_dequeue == 0)) + return; - rx_fifo = s->rx_fifo; - tx_fifo = s->tx_fifo; + max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); + actual_transfer = + svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue, + pm->rx_buf[s->thread_index]); - ASSERT (rx_fifo->master_thread_index == thread_index); - ASSERT (tx_fifo->master_thread_index == thread_index); + /* Expectation is that here actual data just received is parsed and based + * on its contents, the destination and parameters of the connect to the + * upstream are decided + */ - max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); + } - if (PREDICT_FALSE (max_dequeue == 0)) - { - clib_spinlock_unlock_if_init (&pm->sessions_lock); - return 0; - } + a->api_context = ps_index; + a->app_index = pm->active_open_app_index; - max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); - actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ , - max_dequeue, pm->rx_buf[thread_index]); + if (proxy_transport_needs_crypto (a->sep.transport_proto)) + { + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = pm->ckpair_index; + } - /* $$$ your message in this space: parse url, etc. */ + proxy_program_connect (a); +} - clib_memset (a, 0, sizeof (*a)); +static int +proxy_rx_callback (session_t *s) +{ + proxy_session_side_ctx_t *sc; + svm_fifo_t *ao_tx_fifo; + proxy_session_t *ps; + proxy_worker_t *wrk; - ps->server_rx_fifo = rx_fifo; - ps->server_tx_fifo = tx_fifo; - ps->active_open_establishing = 1; - ps_index = ps->ps_index; + ASSERT (s->thread_index == vlib_get_thread_index ()); - clib_spinlock_unlock_if_init (&pm->sessions_lock); + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); - clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); - a->api_context = ps_index; - a->app_index = pm->active_open_app_index; + if (PREDICT_FALSE (sc->state < PROXY_SC_S_ESTABLISHED)) + { + proxy_main_t *pm = &proxy_main; - if (proxy_transport_needs_crypto (a->sep.transport_proto)) + if (sc->state == PROXY_SC_S_CREATED) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; + proxy_session_start_connect (sc, s); + sc->state = PROXY_SC_S_CONNECTING; + return 0; } - proxy_call_main_thread (a); + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + sc->pair = ps->ao; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + if (sc->pair.session_handle == SESSION_INVALID_HANDLE) + return 0; + + sc->state = PROXY_SC_S_ESTABLISHED; } + ao_tx_fifo = s->rx_fifo; + + /* + * Send event for active open tx fifo + */ + if (svm_fifo_set_event (ao_tx_fifo)) + session_program_tx_io_evt (sc->pair.session_handle, SESSION_IO_EVT_TX); + + if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) + svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; } @@ -418,20 +650,20 @@ static void proxy_force_ack (void *handlep) { transport_connection_t *tc; - session_t *ao_s; + session_t *s; - ao_s = session_get_from_handle (pointer_to_uword (handlep)); - if (session_get_transport_proto (ao_s) != TRANSPORT_PROTO_TCP) + s = session_get_from_handle (pointer_to_uword (handlep)); + if (session_get_transport_proto (s) != TRANSPORT_PROTO_TCP) return; - tc = session_get_transport (ao_s); + tc = session_get_transport (s); tcp_send_ack ((tcp_connection_t *) tc); } static int proxy_tx_callback (session_t * proxy_s) { - proxy_main_t *pm = &proxy_main; - proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10); @@ -441,21 +673,17 @@ proxy_tx_callback (session_t * proxy_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (proxy_s->opaque); - - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) - goto unlock; + wrk = proxy_worker_get (proxy_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, proxy_s->opaque); + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; /* Force ack on active open side to update rcv wnd. Make sure it's done on * the right thread */ - void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *); - session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index, - proxy_force_ack, arg); - -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); return 0; } @@ -464,7 +692,10 @@ static void proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf) { if (ntf == SESSION_CLEANUP_TRANSPORT) - return; + { + proxy_try_side_ctx_cleanup (s); + return; + } proxy_try_delete_session (s, 0 /* is_active_open */ ); } @@ -490,10 +721,17 @@ active_open_alloc_session_fifos (session_t *s) clib_spinlock_lock_if_init (&pm->sessions_lock); + /* Active open opaque is pointing at proxy session */ ps = proxy_session_get (s->opaque); - txf = ps->server_rx_fifo; - rxf = ps->server_tx_fifo; + if (ps->po_disconnected) + { + clib_spinlock_unlock_if_init (&pm->sessions_lock); + return SESSION_E_ALLOC; + } + + txf = ps->po.rx_fifo; + rxf = ps->po.tx_fifo; /* * Reset the active-open tx-fifo master indices so the active-open session @@ -524,31 +762,43 @@ active_open_connected_callback (u32 app_index, u32 opaque, { proxy_main_t *pm = &proxy_main; proxy_session_t *ps; - u8 thread_index = vlib_get_thread_index (); - - /* - * Setup proxy session handle. - */ - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (opaque); + proxy_worker_t *wrk; + proxy_session_side_ctx_t *sc; + session_t *po_s; + transport_proto_t tp; /* Connection failed */ if (err) { - vnet_disconnect_args_t _a, *a = &_a; + clib_spinlock_lock_if_init (&pm->sessions_lock); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; - } - else - { - ps->vpp_active_open_handle = session_handle (s); - ps->active_open_establishing = 0; + ps = proxy_session_get (opaque); + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0); + } + ps->ao_disconnected = 1; + proxy_session_close_po (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + return 0; } + wrk = proxy_worker_get (s->thread_index); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (opaque); + + ps->ao.rx_fifo = s->rx_fifo; + ps->ao.tx_fifo = s->tx_fifo; + ps->ao.session_handle = session_handle (s); + + ps->active_open_establishing = 0; + /* Passive open session was already closed! */ if (ps->po_disconnected) { @@ -558,21 +808,136 @@ active_open_connected_callback (u32 app_index, u32 opaque, return -1; } - s->opaque = opaque; + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + + sc = proxy_session_side_ctx_alloc (wrk); + sc->pair = ps->po; + sc->ps_index = ps->ps_index; clib_spinlock_unlock_if_init (&pm->sessions_lock); - /* - * Send event for active open tx fifo - */ - ASSERT (s->thread_index == thread_index); - if (svm_fifo_set_event (s->tx_fifo)) - session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX); + sc->state = PROXY_SC_S_ESTABLISHED; + s->opaque = sc->sc_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; + + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0); + } + else + { + /* + * Send event for active open tx fifo + */ + ASSERT (s->thread_index == vlib_get_thread_index ()); + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX); + } return 0; } static void +active_open_migrate_po_fixup_rpc (void *arg) +{ + u32 ps_index = pointer_to_uword (arg); + proxy_session_side_ctx_t *po_sc; + proxy_main_t *pm = &proxy_main; + session_handle_t po_sh; + proxy_worker_t *wrk; + proxy_session_t *ps; + session_t *po_s; + + wrk = proxy_worker_get (vlib_get_thread_index ()); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (ps_index); + + po_s = session_get_from_handle (ps->po.session_handle); + po_s->rx_fifo = ps->po.rx_fifo; + po_s->tx_fifo = ps->po.tx_fifo; + + po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque); + po_sc->pair = ps->ao; + po_sh = ps->po.session_handle; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_program_tx_io_evt (po_sh, SESSION_IO_EVT_TX); +} + +static void +active_open_migrate_rpc (void *arg) +{ + u32 ps_index = pointer_to_uword (arg); + proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + proxy_session_t *ps; + session_t *s; + + wrk = proxy_worker_get (vlib_get_thread_index ()); + sc = proxy_session_side_ctx_alloc (wrk); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (ps_index); + sc->ps_index = ps->ps_index; + + s = session_get_from_handle (ps->ao.session_handle); + s->opaque = sc->sc_index; + s->flags &= ~SESSION_F_IS_MIGRATING; + + /* Fixup passive open session because of migration and zc */ + ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo; + ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo; + + ps->po.tx_fifo->shr->master_session_index = + session_index_from_handle (ps->po.session_handle); + ps->po.tx_fifo->master_thread_index = + session_thread_from_handle (ps->po.session_handle); + + sc->pair = ps->po; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), + active_open_migrate_po_fixup_rpc, uword_to_pointer (sc->ps_index, void *)); +} + +static void +active_open_migrate_callback (session_t *s, session_handle_t new_sh) +{ + proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; + proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + + /* NOTE: this is just an example. ZC makes this migration rather + * tedious. Probably better approaches could be found */ + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + ps->ao.session_handle = new_sh; + ps->ao.rx_fifo = 0; + ps->ao.tx_fifo = 0; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_send_rpc_evt_to_thread (session_thread_from_handle (new_sh), + active_open_migrate_rpc, + uword_to_pointer (sc->ps_index, void *)); + + proxy_session_side_ctx_free (wrk, sc); +} + +static void active_open_reset_callback (session_t * s) { proxy_try_close_session (s, 1 /* is_active_open */ ); @@ -618,10 +983,8 @@ active_open_rx_callback (session_t * s) static int active_open_tx_callback (session_t * ao_s) { - proxy_main_t *pm = &proxy_main; - transport_connection_t *tc; - proxy_session_t *ps; - session_t *proxy_s; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10); @@ -631,23 +994,27 @@ active_open_tx_callback (session_t * ao_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); + wrk = proxy_worker_get (ao_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, ao_s->opaque); - ps = proxy_session_get_if_valid (ao_s->opaque); - if (!ps) - goto unlock; - - if (ps->vpp_server_handle == ~0) - goto unlock; - - proxy_s = session_get_from_handle (ps->vpp_server_handle); - - /* Force ack on proxy side to update rcv wnd */ - tc = session_get_transport (proxy_s); - tcp_send_ack ((tcp_connection_t *) tc); + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + if (sc->is_http) + { + /* notify HTTP transport */ + session_t *po = session_get_from_handle (sc->pair.session_handle); + session_send_io_evt_to_thread_custom ( + &po->session_index, po->thread_index, SESSION_IO_EVT_RX); + } + else + { + /* Force ack on proxy side to update rcv wnd */ + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); + } return 0; } @@ -664,6 +1031,7 @@ active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf) static session_cb_vft_t active_open_clients = { .session_reset_callback = active_open_reset_callback, .session_connected_callback = active_open_connected_callback, + .session_migrate_callback = active_open_migrate_callback, .session_accept_callback = active_open_create_callback, .session_disconnect_callback = active_open_disconnect_callback, .session_cleanup_callback = active_open_cleanup_callback, @@ -756,22 +1124,33 @@ proxy_server_listen () { proxy_main_t *pm = &proxy_main; vnet_listen_args_t _a, *a = &_a; - int rv; + int rv, need_crypto; clib_memset (a, 0, sizeof (*a)); a->app_index = pm->server_app_index; clib_memcpy (&a->sep_ext, &pm->server_sep, sizeof (pm->server_sep)); - if (proxy_transport_needs_crypto (a->sep.transport_proto)) + /* Make sure listener is marked connected for transports like udp */ + a->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED; + need_crypto = proxy_transport_needs_crypto (a->sep.transport_proto); + if (need_crypto) + { + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = pm->ckpair_index; + } + /* set http timeout for connect-proxy */ + if (pm->server_sep.transport_proto == TRANSPORT_PROTO_HTTP) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (ext_cfg->opaque)); + ext_cfg->opaque = pm->idle_timeout; } rv = vnet_listen (a); - if (a->sep_ext.ext_cfg) - clib_mem_free (a->sep_ext.ext_cfg); + if (need_crypto) + session_endpoint_free_ext_cfgs (&a->sep_ext); return rv; } @@ -797,15 +1176,25 @@ proxy_server_create (vlib_main_t * vm) { vlib_thread_main_t *vtm = vlib_get_thread_main (); proxy_main_t *pm = &proxy_main; + proxy_worker_t *wrk; u32 num_threads; int i; + if (vlib_num_workers ()) + clib_spinlock_init (&pm->sessions_lock); + num_threads = 1 /* main thread */ + vtm->n_threads; vec_validate (pm->rx_buf, num_threads - 1); for (i = 0; i < num_threads; i++) vec_validate (pm->rx_buf[i], pm->rcv_buffer_size); + vec_validate (pm->workers, vlib_num_workers ()); + vec_foreach (wrk, pm->workers) + { + clib_spinlock_init (&wrk->pending_connects_lock); + } + proxy_server_add_ckpair (); if (proxy_server_attach ()) @@ -813,11 +1202,6 @@ proxy_server_create (vlib_main_t * vm) clib_warning ("failed to attach server app"); return -1; } - if (proxy_server_listen ()) - { - clib_warning ("failed to start listening"); - return -1; - } if (active_open_attach ()) { clib_warning ("failed to attach active open app"); @@ -849,9 +1233,6 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, pm->private_segment_count = 0; pm->segment_size = 512 << 20; - if (vlib_num_workers ()) - clib_spinlock_init (&pm->sessions_lock); - if (!unformat_user (input, unformat_line_input, line_input)) return 0; @@ -883,6 +1264,8 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, vec_add1 (server_uri, 0); else if (unformat (line_input, "client-uri %s", &client_uri)) vec_add1 (client_uri, 0); + else if (unformat (line_input, "idle-timeout %d", &pm->idle_timeout)) + ; else { error = clib_error_return (0, "unknown input `%U'", @@ -897,35 +1280,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, default_server_uri); server_uri = format (0, "%s%c", default_server_uri, 0); } - if (!client_uri) - { - clib_warning ("No client-uri provided, Using default: %s", - default_client_uri); - client_uri = format (0, "%s%c", default_client_uri, 0); - } - if (parse_uri ((char *) server_uri, &pm->server_sep)) { error = clib_error_return (0, "Invalid server uri %v", server_uri); goto done; } - if (parse_uri ((char *) client_uri, &pm->client_sep)) + + /* http proxy get target within request */ + if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP) { - error = clib_error_return (0, "Invalid client uri %v", client_uri); - goto done; + if (!client_uri) + { + clib_warning ("No client-uri provided, Using default: %s", + default_client_uri); + client_uri = format (0, "%s%c", default_client_uri, 0); + } + if (parse_uri ((char *) client_uri, &pm->client_sep)) + { + error = clib_error_return (0, "Invalid client uri %v", client_uri); + goto done; + } } - vnet_session_enable_disable (vm, 1 /* turn on session and transport */ ); - - rv = proxy_server_create (vm); - switch (rv) + if (pm->server_app_index == APP_INVALID_INDEX) { - case 0: - break; - default: - error = clib_error_return (0, "server_create returned %d", rv); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); + rv = proxy_server_create (vm); + if (rv) + { + error = clib_error_return (0, "server_create returned %d", rv); + goto done; + } } + if (proxy_server_listen ()) + error = clib_error_return (0, "failed to start listening"); + done: unformat_free (line_input); vec_free (client_uri); @@ -933,14 +1326,14 @@ done: return error; } -VLIB_CLI_COMMAND (proxy_create_command, static) = -{ +VLIB_CLI_COMMAND (proxy_create_command, static) = { .path = "test proxy server", - .short_help = "test proxy server [server-uri <tcp://ip/port>]" - "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" - "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" - "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" - "[private-segment-size <mem>][private-segment-count <nn>]", + .short_help = "test proxy server [server-uri <proto://ip/port>]" + "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" + "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" + "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" + "[private-segment-size <mem>][private-segment-count <nn>]" + "[idle-timeout <nn>]", .function = proxy_server_create_command_fn, }; @@ -950,6 +1343,8 @@ proxy_main_init (vlib_main_t * vm) proxy_main_t *pm = &proxy_main; pm->server_client_index = ~0; pm->active_open_client_index = ~0; + pm->server_app_index = APP_INVALID_INDEX; + pm->idle_timeout = 600; /* connect-proxy default idle timeout 10 minutes */ return 0; } diff --git a/src/plugins/hs_apps/proxy.h b/src/plugins/hs_apps/proxy.h index 26f4de2f729..75567e4c1ba 100644 --- a/src/plugins/hs_apps/proxy.h +++ b/src/plugins/hs_apps/proxy.h @@ -26,23 +26,57 @@ #include <vnet/session/session.h> #include <vnet/session/application_interface.h> +#define foreach_proxy_session_side_state \ + _ (CREATED, "created") \ + _ (CONNECTING, "connecting") \ + _ (ESTABLISHED, "establiehed") \ + _ (CLOSED, "closed") + +typedef enum proxy_session_side_state_ +{ +#define _(sym, str) PROXY_SC_S_##sym, + foreach_proxy_session_side_state +#undef _ +} proxy_session_side_state_t; +typedef struct proxy_session_side_ +{ + session_handle_t session_handle; + svm_fifo_t *rx_fifo; + svm_fifo_t *tx_fifo; +} proxy_session_side_t; + +typedef struct proxy_session_side_ctx_ +{ + proxy_session_side_t pair; + proxy_session_side_state_t state; + u32 sc_index; + u32 ps_index; + u8 is_http; +} proxy_session_side_ctx_t; + typedef struct { - svm_fifo_t *server_rx_fifo; - svm_fifo_t *server_tx_fifo; + proxy_session_side_t po; /**< passive open side */ + proxy_session_side_t ao; /**< active open side */ - session_handle_t vpp_server_handle; - session_handle_t vpp_active_open_handle; volatile int active_open_establishing; volatile int po_disconnected; volatile int ao_disconnected; u32 ps_index; - u32 po_thread_index; } proxy_session_t; +typedef struct proxy_worker_ +{ + proxy_session_side_ctx_t *ctx_pool; + clib_spinlock_t pending_connects_lock; + vnet_connect_args_t *pending_connects; + vnet_connect_args_t *burst_connects; +} proxy_worker_t; + typedef struct { + proxy_worker_t *workers; /**< per-thread data */ proxy_session_t *sessions; /**< session pool, shared */ clib_spinlock_t sessions_lock; /**< lock for session pool */ u8 **rx_buf; /**< intermediate rx buffers */ @@ -63,6 +97,7 @@ typedef struct u32 private_segment_count; /**< Number of private fifo segs */ u64 segment_size; /**< size of fifo segs */ u8 prealloc_fifos; /**< Request fifo preallocation */ + u32 idle_timeout; /**< connect-proxy timeout for idle connections */ int rcv_buffer_size; session_endpoint_cfg_t server_sep; session_endpoint_cfg_t client_sep; @@ -75,6 +110,13 @@ typedef struct extern proxy_main_t proxy_main; +static inline proxy_worker_t * +proxy_worker_get (u32 thread_index) +{ + proxy_main_t *pm = &proxy_main; + return vec_elt_at_index (pm->workers, thread_index); +} + #endif /* __included_proxy_h__ */ /* diff --git a/src/plugins/hs_apps/sapi/vpp_echo_common.c b/src/plugins/hs_apps/sapi/vpp_echo_common.c index 5ce04d1b75b..09ba583cf78 100644 --- a/src/plugins/hs_apps/sapi/vpp_echo_common.c +++ b/src/plugins/hs_apps/sapi/vpp_echo_common.c @@ -330,8 +330,8 @@ format_transport_proto (u8 * s, va_list * args) case TRANSPORT_PROTO_UDP: s = format (s, "UDP"); break; - case TRANSPORT_PROTO_NONE: - s = format (s, "NONE"); + case TRANSPORT_PROTO_CT: + s = format (s, "CT"); break; case TRANSPORT_PROTO_TLS: s = format (s, "TLS"); diff --git a/src/plugins/hs_apps/test_builtins.c b/src/plugins/hs_apps/test_builtins.c new file mode 100644 index 00000000000..c314e71b5df --- /dev/null +++ b/src/plugins/hs_apps/test_builtins.c @@ -0,0 +1,192 @@ +/* SPDX-License-Identifier: Apache-2.0 + * Copyright(c) 2024 Cisco Systems, Inc. + */ + +#include <http_static/http_static.h> +#include <vppinfra/tw_timer_2t_1w_2048sl.h> + +typedef struct +{ + u32 stop_timer_handle; + hss_session_handle_t sh; +} tw_timer_elt_t; + +typedef struct tb_main_ +{ + tw_timer_elt_t *delayed_resps; + tw_timer_wheel_2t_1w_2048sl_t tw; + hss_session_send_fn send_data; + u8 *test_data; +} tb_main_t; + +static tb_main_t tb_main; + +static uword +test_builtins_timer_process (vlib_main_t *vm, vlib_node_runtime_t *rt, + vlib_frame_t *f) +{ + tb_main_t *tbm = &tb_main; + f64 now, timeout = 1.0; + uword *event_data = 0; + uword __clib_unused event_type; + + while (1) + { + vlib_process_wait_for_event_or_clock (vm, timeout); + now = vlib_time_now (vm); + event_type = vlib_process_get_events (vm, (uword **) &event_data); + + /* expire timers */ + tw_timer_expire_timers_2t_1w_2048sl (&tbm->tw, now); + + vec_reset_length (event_data); + } + return 0; +} + +VLIB_REGISTER_NODE (test_builtins_timer_process_node) = { + .function = test_builtins_timer_process, + .type = VLIB_NODE_TYPE_PROCESS, + .name = "test-builtins-timer-process", + .state = VLIB_NODE_STATE_DISABLED, +}; + +static void +send_data_to_hss (hss_session_handle_t sh, u8 *data, u8 free_vec_data) +{ + tb_main_t *tbm = &tb_main; + hss_url_handler_args_t args = {}; + + args.sh = sh; + args.data = data; + args.data_len = vec_len (data); + args.ct = HTTP_CONTENT_TEXT_PLAIN; + args.sc = HTTP_STATUS_OK; + args.free_vec_data = free_vec_data; + + tbm->send_data (&args); +} + +static hss_url_handler_rc_t +handle_get_test1 (hss_url_handler_args_t *args) +{ + u8 *data; + + clib_warning ("get request on test1"); + data = format (0, "hello"); + send_data_to_hss (args->sh, data, 1); + + return HSS_URL_HANDLER_ASYNC; +} + +static hss_url_handler_rc_t +handle_get_test2 (hss_url_handler_args_t *args) +{ + u8 *data; + + clib_warning ("get request on test2"); + data = format (0, "some data"); + send_data_to_hss (args->sh, data, 1); + + return HSS_URL_HANDLER_ASYNC; +} + +static void +delayed_resp_cb (u32 *expired_timers) +{ + tb_main_t *tbm = &tb_main; + int i; + u32 pool_index; + tw_timer_elt_t *e; + u8 *data; + + for (i = 0; i < vec_len (expired_timers); i++) + { + pool_index = expired_timers[i] & 0x7FFFFFFF; + e = pool_elt_at_index (tbm->delayed_resps, pool_index); + clib_warning ("sending delayed data"); + data = format (0, "delayed data"); + send_data_to_hss (e->sh, data, 1); + pool_put (tbm->delayed_resps, e); + } +} + +static hss_url_handler_rc_t +handle_get_test_delayed (hss_url_handler_args_t *args) +{ + tb_main_t *tbm = &tb_main; + tw_timer_elt_t *e; + + clib_warning ("get request on test_delayed"); + pool_get (tbm->delayed_resps, e); + e->sh = args->sh; + e->stop_timer_handle = + tw_timer_start_2t_1w_2048sl (&tbm->tw, e - tbm->delayed_resps, 0, 5); + + return HSS_URL_HANDLER_ASYNC; +} + +static hss_url_handler_rc_t +handle_post_test3 (hss_url_handler_args_t *args) +{ + send_data_to_hss (args->sh, 0, 0); + return HSS_URL_HANDLER_ASYNC; +} + +static hss_url_handler_rc_t +handle_get_64bytes (hss_url_handler_args_t *args) +{ + tb_main_t *tbm = &tb_main; + send_data_to_hss (args->sh, tbm->test_data, 0); + return HSS_URL_HANDLER_ASYNC; +} + +static void +test_builtins_init (vlib_main_t *vm) +{ + tb_main_t *tbm = &tb_main; + hss_register_url_fn fp; + vlib_node_t *n; + + fp = vlib_get_plugin_symbol ("http_static_plugin.so", + "hss_register_url_handler"); + + if (fp == 0) + { + clib_warning ("http_static_plugin.so not loaded..."); + return; + } + + tbm->test_data = format ( + 0, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); + + (*fp) (handle_get_test1, "test1", HTTP_REQ_GET); + (*fp) (handle_get_test2, "test2", HTTP_REQ_GET); + (*fp) (handle_get_test_delayed, "test_delayed", HTTP_REQ_GET); + (*fp) (handle_post_test3, "test3", HTTP_REQ_POST); + (*fp) (handle_get_64bytes, "64B", HTTP_REQ_GET); + + tbm->send_data = + vlib_get_plugin_symbol ("http_static_plugin.so", "hss_session_send_data"); + + tw_timer_wheel_init_2t_1w_2048sl (&tbm->tw, delayed_resp_cb, 1.0, ~0); + + vlib_node_set_state (vm, test_builtins_timer_process_node.index, + VLIB_NODE_STATE_POLLING); + n = vlib_get_node (vm, test_builtins_timer_process_node.index); + vlib_start_process (vm, n->runtime_index); +} + +static clib_error_t * +test_builtins_enable_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + test_builtins_init (vm); + return 0; +} + +VLIB_CLI_COMMAND (test_builtins_enable_command, static) = { + .path = "test-url-handler enable", + .short_help = "test-url-handler enable", + .function = test_builtins_enable_command_fn, +}; diff --git a/src/plugins/hs_apps/vcl/vcl_test.h b/src/plugins/hs_apps/vcl/vcl_test.h index 0ce27ef84e2..11667fb144a 100644 --- a/src/plugins/hs_apps/vcl/vcl_test.h +++ b/src/plugins/hs_apps/vcl/vcl_test.h @@ -124,7 +124,7 @@ typedef struct typedef struct { - const vcl_test_proto_vft_t *protos[VPPCOM_PROTO_SRTP + 1]; + const vcl_test_proto_vft_t *protos[VPPCOM_PROTO_HTTP + 1]; uint32_t ckpair_index; hs_test_cfg_t cfg; vcl_test_wrk_t *wrk; @@ -420,6 +420,39 @@ vcl_test_write (vcl_test_session_t *ts, void *buf, uint32_t nbytes) return (tx_bytes); } +static inline int +vcl_test_write_ds (vcl_test_session_t *ts) +{ + vcl_test_stats_t *stats = &ts->stats; + int tx_bytes; + + do + { + stats->tx_xacts++; + if (ts->ds[1].len) + tx_bytes = vppcom_session_write_segments (ts->fd, ts->ds, 2); + else + tx_bytes = vppcom_session_write_segments (ts->fd, ts->ds, 1); + + if (tx_bytes < 0) + errno = -tx_bytes; + if ((tx_bytes == 0) || + ((tx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))) + stats->rx_eagain++; + } + while ((tx_bytes == 0) || + ((tx_bytes < 0) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))); + + if (tx_bytes < 0) + { + vterr ("vppcom_session_write_segments()", -errno); + } + else + stats->tx_bytes += tx_bytes; + + return (tx_bytes); +} + static inline void dump_help (void) { diff --git a/src/plugins/hs_apps/vcl/vcl_test_client.c b/src/plugins/hs_apps/vcl/vcl_test_client.c index a4a10b562ff..8bac1f00b9d 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_client.c +++ b/src/plugins/hs_apps/vcl/vcl_test_client.c @@ -419,13 +419,8 @@ vtc_worker_run_select (vcl_test_client_worker_t *wrk) if (vcm->incremental_stats) vtc_inc_stats_check (ts); } - if ((!check_rx && ts->stats.tx_bytes >= ts->cfg.total_bytes) || - (check_rx && ts->stats.rx_bytes >= ts->cfg.total_bytes)) - { - clock_gettime (CLOCK_REALTIME, &ts->stats.stop); - ts->is_done = 1; - n_active_sessions--; - } + if (vtc_session_check_is_done (ts, check_rx)) + n_active_sessions -= 1; } } diff --git a/src/plugins/hs_apps/vcl/vcl_test_protos.c b/src/plugins/hs_apps/vcl/vcl_test_protos.c index cd1ac2b24f4..9c81c5f17a1 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_protos.c +++ b/src/plugins/hs_apps/vcl/vcl_test_protos.c @@ -14,6 +14,23 @@ */ #include <hs_apps/vcl/vcl_test.h> +#include <http/http.h> +#include <http/http_header_names.h> +#include <http/http_content_types.h> + +typedef enum vcl_test_http_state_ +{ + VCL_TEST_HTTP_IDLE = 0, + VCL_TEST_HTTP_IN_PROGRESS, + VCL_TEST_HTTP_COMPLETED, +} vcl_test_http_state_t; + +typedef struct vcl_test_http_ctx_t +{ + u8 is_server; + vcl_test_http_state_t test_state; + u64 rem_data; +} vcl_test_http_ctx_t; static int vt_tcp_connect (vcl_test_session_t *ts, vppcom_endpt_t *endpt) @@ -978,6 +995,418 @@ static const vcl_test_proto_vft_t vcl_test_srtp = { VCL_TEST_REGISTER_PROTO (VPPCOM_PROTO_SRTP, vcl_test_srtp); +static void +vt_http_session_init (vcl_test_session_t *ts, u8 is_server) +{ + vcl_test_http_ctx_t *http_ctx; + + http_ctx = malloc (sizeof (vcl_test_http_ctx_t)); + memset (http_ctx, 0, sizeof (*http_ctx)); + http_ctx->is_server = is_server; + ts->opaque = http_ctx; +} + +static inline void +vt_http_send_reply_msg (vcl_test_session_t *ts, http_status_code_t status) +{ + http_msg_t msg; + int rv = 0; + + memset (&msg, 0, sizeof (http_msg_t)); + msg.type = HTTP_MSG_REPLY; + msg.code = status; + + vppcom_data_segment_t segs[1] = { { (u8 *) &msg, sizeof (msg) } }; + + do + { + rv = vppcom_session_write_segments (ts->fd, segs, 1); + + if (rv < 0) + { + errno = -rv; + if (errno == EAGAIN || errno == EWOULDBLOCK) + continue; + + vterr ("vppcom_session_write()", -errno); + break; + } + } + while (rv <= 0); +} + +static inline int +vt_process_http_server_read_msg (vcl_test_session_t *ts, void *buf, + uint32_t nbytes) +{ + http_msg_t msg; + u8 *target_path = 0; + vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque; + vcl_test_stats_t *stats = &ts->stats; + int rv = 0; + + do + { + stats->rx_xacts++; + rv = vppcom_session_read (ts->fd, buf, nbytes); + + if (rv <= 0) + { + errno = -rv; + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + stats->rx_eagain++; + continue; + } + + vterr ("vppcom_session_read()", -errno); + return 0; + } + + if (PREDICT_TRUE (vcl_test_http_ctx->test_state == + VCL_TEST_HTTP_IN_PROGRESS)) + { + vcl_test_http_ctx->rem_data -= rv; + + if (vcl_test_http_ctx->rem_data == 0) + { + vcl_test_http_ctx->test_state = VCL_TEST_HTTP_COMPLETED; + vt_http_send_reply_msg (ts, HTTP_STATUS_OK); + } + } + else if (PREDICT_FALSE (vcl_test_http_ctx->test_state == + VCL_TEST_HTTP_IDLE)) + { + msg = *(http_msg_t *) buf; + + /* verify that we have received http post request from client */ + if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_POST) + { + vt_http_send_reply_msg (ts, HTTP_STATUS_METHOD_NOT_ALLOWED); + vterr ("error! only POST requests allowed from client", 0); + return 0; + } + + if (msg.data.target_form != HTTP_TARGET_ORIGIN_FORM) + { + vt_http_send_reply_msg (ts, HTTP_STATUS_BAD_REQUEST); + vterr ("error! http target not in origin form", 0); + return 0; + } + + /* validate target path syntax */ + if (msg.data.target_path_len) + { + vec_validate (target_path, msg.data.target_path_len - 1); + memcpy (target_path, + buf + sizeof (msg) + msg.data.target_path_offset - 1, + msg.data.target_path_len + 1); + if (http_validate_abs_path_syntax (target_path, 0)) + { + vt_http_send_reply_msg (ts, HTTP_STATUS_BAD_REQUEST); + vterr ("error! target path is not absolute", 0); + vec_free (target_path); + return 0; + } + vec_free (target_path); + } + + /* read body */ + if (msg.data.body_len) + { + vcl_test_http_ctx->rem_data = msg.data.body_len; + /* | <http_msg_t> | <target> | <headers> | <body> | */ + vcl_test_http_ctx->rem_data -= + (rv - sizeof (msg) - msg.data.body_offset); + vcl_test_http_ctx->test_state = VCL_TEST_HTTP_IN_PROGRESS; + } + } + + if (rv < nbytes) + stats->rx_incomp++; + } + while (rv <= 0); + + stats->rx_bytes += rv; + return (rv); +} + +static inline int +vt_process_http_client_read_msg (vcl_test_session_t *ts, void *buf, + uint32_t nbytes) +{ + http_msg_t msg; + int rv = 0; + + do + { + rv = vppcom_session_read (ts->fd, buf, nbytes); + + if (rv < 0) + { + errno = -rv; + if (errno == EAGAIN || errno == EWOULDBLOCK) + continue; + + vterr ("vppcom_session_read()", -errno); + break; + } + } + while (!rv); + + msg = *(http_msg_t *) buf; + + if (msg.type == HTTP_MSG_REPLY && msg.code == HTTP_STATUS_OK) + vtinf ("received 200 OK from server"); + else + vterr ("received unexpected reply from server", 0); + + return (rv); +} + +static inline int +vt_process_http_client_write_msg (vcl_test_session_t *ts, void *buf, + uint32_t nbytes) +{ + http_msg_t msg; + http_header_t *req_headers = 0; + u8 *headers_buf = 0; + u8 *target; + vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque; + vcl_test_stats_t *stats = &ts->stats; + int rv = 0; + + if (PREDICT_TRUE (vcl_test_http_ctx->test_state == + VCL_TEST_HTTP_IN_PROGRESS)) + { + do + { + rv = vppcom_session_write ( + ts->fd, buf, clib_min (nbytes, vcl_test_http_ctx->rem_data)); + + if (rv <= 0) + { + errno = -rv; + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + stats->tx_eagain++; + continue; + } + + vterr ("vppcom_session_write()", -errno); + return 0; + } + + vcl_test_http_ctx->rem_data -= rv; + + if (vcl_test_http_ctx->rem_data == 0) + { + vcl_test_http_ctx->test_state = VCL_TEST_HTTP_COMPLETED; + vtinf ("client finished sending %ld bytes of data", + ts->cfg.total_bytes); + } + + if (rv < nbytes) + stats->tx_incomp++; + } + while (rv <= 0); + } + + else if (PREDICT_FALSE (vcl_test_http_ctx->test_state == VCL_TEST_HTTP_IDLE)) + { + http_add_header ( + &req_headers, http_header_name_token (HTTP_HEADER_CONTENT_TYPE), + http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM)); + headers_buf = http_serialize_headers (req_headers); + vec_free (req_headers); + + memset (&msg, 0, sizeof (http_msg_t)); + msg.type = HTTP_MSG_REQUEST; + msg.method_type = HTTP_REQ_POST; + + /* target */ + msg.data.target_form = HTTP_TARGET_ORIGIN_FORM; + target = (u8 *) "/vcl_test_http\0"; + msg.data.target_path_len = strlen ((char *) target); + + /* headers */ + msg.data.headers_offset = msg.data.target_path_len; + msg.data.headers_len = vec_len (headers_buf); + + /* body */ + msg.data.body_offset = msg.data.headers_offset + msg.data.headers_len; + msg.data.body_len = ts->cfg.total_bytes; + + msg.data.len = + msg.data.target_path_len + msg.data.headers_len + msg.data.body_len; + msg.data.type = HTTP_MSG_DATA_INLINE; + + vppcom_data_segment_t segs[3] = { { (u8 *) &msg, sizeof (msg) }, + { target, strlen ((char *) target) }, + { headers_buf, + vec_len (headers_buf) } }; + + do + { + rv = vppcom_session_write_segments (ts->fd, segs, 3); + + if (rv <= 0) + { + errno = -rv; + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + stats->tx_eagain++; + continue; + } + + vterr ("vppcom_session_write_segments()", -errno); + vec_free (headers_buf); + return 0; + } + } + while (rv <= 0); + + vcl_test_http_ctx->test_state = VCL_TEST_HTTP_IN_PROGRESS; + vcl_test_http_ctx->rem_data = ts->cfg.total_bytes; + vec_free (headers_buf); + } + + stats->tx_bytes += rv; + return (rv); +} + +static inline int +vt_process_http_server_write_msg (vcl_test_session_t *ts, void *buf, + uint32_t nbytes) +{ + return 0; +} + +static inline int +vt_http_read (vcl_test_session_t *ts, void *buf, uint32_t nbytes) +{ + vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque; + + if (vcl_test_http_ctx->is_server) + return vt_process_http_server_read_msg (ts, buf, nbytes); + else + return vt_process_http_client_read_msg (ts, buf, nbytes); +} + +static inline int +vt_http_write (vcl_test_session_t *ts, void *buf, uint32_t nbytes) +{ + vcl_test_http_ctx_t *vcl_test_http_ctx = (vcl_test_http_ctx_t *) ts->opaque; + + if (vcl_test_http_ctx->is_server) + return vt_process_http_server_write_msg (ts, buf, nbytes); + else + return vt_process_http_client_write_msg (ts, buf, nbytes); +} + +static int +vt_http_connect (vcl_test_session_t *ts, vppcom_endpt_t *endpt) +{ + uint32_t flags, flen; + int rv; + + ts->fd = vppcom_session_create (VPPCOM_PROTO_HTTP, ts->noblk_connect); + if (ts->fd < 0) + { + vterr ("vppcom_session_create()", ts->fd); + return ts->fd; + } + + rv = vppcom_session_connect (ts->fd, endpt); + if (rv < 0 && rv != VPPCOM_EINPROGRESS) + { + vterr ("vppcom_session_connect()", rv); + return rv; + } + + ts->read = vt_http_read; + ts->write = vt_http_write; + + if (!ts->noblk_connect) + { + flags = O_NONBLOCK; + flen = sizeof (flags); + vppcom_session_attr (ts->fd, VPPCOM_ATTR_SET_FLAGS, &flags, &flen); + vtinf ("Test session %d (fd %d) connected.", ts->session_index, ts->fd); + } + + vt_http_session_init (ts, 0 /* is_server */); + + return 0; +} + +static int +vt_http_listen (vcl_test_session_t *ts, vppcom_endpt_t *endpt) +{ + int rv; + + ts->fd = vppcom_session_create (VPPCOM_PROTO_HTTP, 1 /* is_nonblocking */); + if (ts->fd < 0) + { + vterr ("vppcom_session_create()", ts->fd); + return ts->fd; + } + + rv = vppcom_session_bind (ts->fd, endpt); + if (rv < 0) + { + vterr ("vppcom_session_bind()", rv); + return rv; + } + + rv = vppcom_session_listen (ts->fd, 10); + if (rv < 0) + { + vterr ("vppcom_session_listen()", rv); + return rv; + } + + return 0; +} + +static int +vt_http_accept (int listen_fd, vcl_test_session_t *ts) +{ + int client_fd; + + client_fd = vppcom_session_accept (listen_fd, &ts->endpt, 0); + if (client_fd < 0) + { + vterr ("vppcom_session_accept()", client_fd); + return client_fd; + } + + ts->fd = client_fd; + ts->is_open = 1; + ts->read = vt_http_read; + ts->write = vt_http_write; + + vt_http_session_init (ts, 1 /* is_server */); + + return 0; +} + +static int +vt_http_close (vcl_test_session_t *ts) +{ + free (ts->opaque); + return 0; +} + +static const vcl_test_proto_vft_t vcl_test_http = { + .open = vt_http_connect, + .listen = vt_http_listen, + .accept = vt_http_accept, + .close = vt_http_close, +}; + +VCL_TEST_REGISTER_PROTO (VPPCOM_PROTO_HTTP, vcl_test_http); + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/plugins/hs_apps/vcl/vcl_test_server.c b/src/plugins/hs_apps/vcl/vcl_test_server.c index d17a2089ba7..008539f2585 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_server.c +++ b/src/plugins/hs_apps/vcl/vcl_test_server.c @@ -282,11 +282,7 @@ vts_server_process_rx (vcl_test_session_t *conn, int rx_bytes) if (conn->cfg.test == HS_TEST_TYPE_BI) { if (vsm->use_ds) - { - (void) vcl_test_write (conn, conn->ds[0].data, conn->ds[0].len); - if (conn->ds[1].len) - (void) vcl_test_write (conn, conn->ds[1].data, conn->ds[1].len); - } + (void) vcl_test_write_ds (conn); else (void) vcl_test_write (conn, conn->rxbuf, rx_bytes); } @@ -420,36 +416,41 @@ static void vcl_test_init_endpoint_addr (vcl_test_server_main_t * vsm) { struct sockaddr_storage *servaddr = &vsm->servaddr; - memset (servaddr, 0, sizeof (*servaddr)); if (vsm->server_cfg.address_ip6) { struct sockaddr_in6 *server_addr = (struct sockaddr_in6 *) servaddr; - server_addr->sin6_family = AF_INET6; - server_addr->sin6_addr = in6addr_any; - server_addr->sin6_port = htons (vsm->server_cfg.port); + vsm->server_cfg.endpt.is_ip4 = 0; + vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin6_addr; + vsm->server_cfg.endpt.port = htons (vsm->server_cfg.port); } else { struct sockaddr_in *server_addr = (struct sockaddr_in *) servaddr; - server_addr->sin_family = AF_INET; - server_addr->sin_addr.s_addr = htonl (INADDR_ANY); - server_addr->sin_port = htons (vsm->server_cfg.port); + vsm->server_cfg.endpt.is_ip4 = 1; + vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin_addr; + vsm->server_cfg.endpt.port = htons (vsm->server_cfg.port); } +} + +static void +vcl_test_clear_endpoint_addr (vcl_test_server_main_t *vsm) +{ + struct sockaddr_storage *servaddr = &vsm->servaddr; + + memset (&vsm->servaddr, 0, sizeof (vsm->servaddr)); if (vsm->server_cfg.address_ip6) { struct sockaddr_in6 *server_addr = (struct sockaddr_in6 *) servaddr; - vsm->server_cfg.endpt.is_ip4 = 0; - vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin6_addr; - vsm->server_cfg.endpt.port = (uint16_t) server_addr->sin6_port; + server_addr->sin6_family = AF_INET6; + server_addr->sin6_addr = in6addr_any; } else { struct sockaddr_in *server_addr = (struct sockaddr_in *) servaddr; - vsm->server_cfg.endpt.is_ip4 = 1; - vsm->server_cfg.endpt.ip = (uint8_t *) &server_addr->sin_addr; - vsm->server_cfg.endpt.port = (uint16_t) server_addr->sin_port; + server_addr->sin_family = AF_INET; + server_addr->sin_addr.s_addr = htonl (INADDR_ANY); } } @@ -460,9 +461,10 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc, int v, c; vsm->server_cfg.proto = VPPCOM_PROTO_TCP; + vcl_test_clear_endpoint_addr (vsm); opterr = 0; - while ((c = getopt (argc, argv, "6DLsw:hp:S")) != -1) + while ((c = getopt (argc, argv, "6DLsw:hp:SB:")) != -1) switch (c) { case '6': @@ -473,7 +475,22 @@ vcl_test_server_process_opts (vcl_test_server_main_t * vsm, int argc, if (vppcom_unformat_proto (&vsm->server_cfg.proto, optarg)) vtwrn ("Invalid vppcom protocol %s, defaulting to TCP", optarg); break; - + case 'B': + if (vsm->server_cfg.address_ip6) + { + if (inet_pton ( + AF_INET6, optarg, + &((struct sockaddr_in6 *) &vsm->servaddr)->sin6_addr) != 1) + vtwrn ("couldn't parse ipv6 addr %s", optarg); + } + else + { + if (inet_pton ( + AF_INET, optarg, + &((struct sockaddr_in *) &vsm->servaddr)->sin_addr) != 1) + vtwrn ("couldn't parse ipv4 addr %s", optarg); + } + break; case 'D': vsm->server_cfg.proto = VPPCOM_PROTO_UDP; break; |