diff options
Diffstat (limited to 'src/plugins/hs_apps/proxy.c')
-rw-r--r-- | src/plugins/hs_apps/proxy.c | 857 |
1 files changed, 626 insertions, 231 deletions
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index e8fedf921a5..d7fe6fb54df 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -19,50 +19,145 @@ #include <vnet/session/application_interface.h> #include <hs_apps/proxy.h> #include <vnet/tcp/tcp.h> +#include <http/http.h> +#include <http/http_header_names.h> proxy_main_t proxy_main; #define TCP_MSS 1460 -typedef struct +static proxy_session_side_ctx_t * +proxy_session_side_ctx_alloc (proxy_worker_t *wrk) { - session_endpoint_cfg_t sep; - u32 app_index; - u32 api_context; -} proxy_connect_args_t; + proxy_session_side_ctx_t *ctx; + + pool_get_zero (wrk->ctx_pool, ctx); + ctx->sc_index = ctx - wrk->ctx_pool; + ctx->ps_index = ~0; + + return ctx; +} static void -proxy_cb_fn (void *data, u32 data_len) +proxy_session_side_ctx_free (proxy_worker_t *wrk, + proxy_session_side_ctx_t *ctx) { - proxy_connect_args_t *pa = (proxy_connect_args_t *) data; - vnet_connect_args_t a; + pool_put (wrk->ctx_pool, ctx); +} - clib_memset (&a, 0, sizeof (a)); - a.api_context = pa->api_context; - a.app_index = pa->app_index; - clib_memcpy (&a.sep_ext, &pa->sep, sizeof (pa->sep)); - vnet_connect (&a); - if (a.sep_ext.ext_cfg) - clib_mem_free (a.sep_ext.ext_cfg); +static proxy_session_side_ctx_t * +proxy_session_side_ctx_get (proxy_worker_t *wrk, u32 ctx_index) +{ + return pool_elt_at_index (wrk->ctx_pool, ctx_index); } static void -proxy_call_main_thread (vnet_connect_args_t * a) +proxy_send_http_resp (session_t *s, http_status_code_t sc, + http_header_t *resp_headers) { - if (vlib_get_thread_index () == 0) + http_msg_t msg; + int rv; + u8 *headers_buf = 0; + + if (vec_len (resp_headers)) { - vnet_connect (a); - if (a->sep_ext.ext_cfg) - clib_mem_free (a->sep_ext.ext_cfg); + headers_buf = http_serialize_headers (resp_headers); + msg.data.len = msg.data.headers_len = vec_len (headers_buf); } else + msg.data.len = msg.data.headers_len = 0; + + msg.type = HTTP_MSG_REPLY; + msg.code = sc; + msg.data.type = HTTP_MSG_DATA_INLINE; + msg.data.headers_offset = 0; + msg.data.body_len = 0; + msg.data.body_offset = 0; + rv = svm_fifo_enqueue (s->tx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + if (msg.data.headers_len) + { + rv = svm_fifo_enqueue (s->tx_fifo, vec_len (headers_buf), headers_buf); + ASSERT (rv == vec_len (headers_buf)); + vec_free (headers_buf); + } + + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); +} + +static void +proxy_do_connect (vnet_connect_args_t *a) +{ + ASSERT (session_vlib_thread_is_cl_thread ()); + vnet_connect (a); + session_endpoint_free_ext_cfgs (&a->sep_ext); +} + +static void +proxy_handle_connects_rpc (void *args) +{ + u32 thread_index = pointer_to_uword (args), n_connects = 0, n_pending; + proxy_worker_t *wrk; + u32 max_connects; + + wrk = proxy_worker_get (thread_index); + + clib_spinlock_lock (&wrk->pending_connects_lock); + + n_pending = clib_fifo_elts (wrk->pending_connects); + max_connects = clib_min (32, n_pending); + vec_validate (wrk->burst_connects, max_connects); + + while (n_connects < max_connects) + clib_fifo_sub1 (wrk->pending_connects, wrk->burst_connects[n_connects++]); + + clib_spinlock_unlock (&wrk->pending_connects_lock); + + /* Do connects without locking pending_connects */ + n_connects = 0; + while (n_connects < max_connects) { - proxy_connect_args_t args; - args.api_context = a->api_context; - args.app_index = a->app_index; - clib_memcpy (&args.sep, &a->sep_ext, sizeof (a->sep_ext)); - vl_api_rpc_call_main_thread (proxy_cb_fn, (u8 *) & args, sizeof (args)); + proxy_do_connect (&wrk->burst_connects[n_connects]); + n_connects += 1; } + + /* More work to do, program rpc */ + if (max_connects < n_pending) + session_send_rpc_evt_to_thread_force ( + transport_cl_thread (), proxy_handle_connects_rpc, + uword_to_pointer ((uword) thread_index, void *)); +} + +static void +proxy_program_connect (vnet_connect_args_t *a) +{ + u32 connects_thread = transport_cl_thread (), thread_index, n_pending; + proxy_worker_t *wrk; + + thread_index = vlib_get_thread_index (); + + /* If already on first worker, handle request */ + if (thread_index == connects_thread) + { + proxy_do_connect (a); + return; + } + + /* If not on first worker, queue request */ + wrk = proxy_worker_get (thread_index); + + clib_spinlock_lock (&wrk->pending_connects_lock); + + clib_fifo_add1 (wrk->pending_connects, *a); + n_pending = clib_fifo_elts (wrk->pending_connects); + + clib_spinlock_unlock (&wrk->pending_connects_lock); + + if (n_pending == 1) + session_send_rpc_evt_to_thread_force ( + connects_thread, proxy_handle_connects_rpc, + uword_to_pointer ((uword) thread_index, void *)); } static proxy_session_t * @@ -85,16 +180,6 @@ proxy_session_get (u32 ps_index) return pool_elt_at_index (pm->sessions, ps_index); } -static inline proxy_session_t * -proxy_session_get_if_valid (u32 ps_index) -{ - proxy_main_t *pm = &proxy_main; - - if (pool_is_free_index (pm->sessions, ps_index)) - return 0; - return pool_elt_at_index (pm->sessions, ps_index); -} - static void proxy_session_free (proxy_session_t *ps) { @@ -115,7 +200,7 @@ proxy_session_postponed_free_rpc (void *arg) clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (ps_index); - segment_manager_dealloc_fifos (ps->server_rx_fifo, ps->server_tx_fifo); + segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo); proxy_session_free (ps); clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -126,54 +211,79 @@ proxy_session_postponed_free_rpc (void *arg) static void proxy_session_postponed_free (proxy_session_t *ps) { - session_send_rpc_evt_to_thread (ps->po_thread_index, + /* Passive open session handle has been invalidated so we don't have thread + * index at this point */ + session_send_rpc_evt_to_thread (ps->po.rx_fifo->master_thread_index, proxy_session_postponed_free_rpc, uword_to_pointer (ps->ps_index, void *)); } static void +proxy_session_close_po (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->po.session_handle; + a->app_index = pm->server_app_index; + vnet_disconnect_session (a); + + ps->po_disconnected = 1; +} + +static void +proxy_session_close_ao (proxy_session_t *ps) +{ + vnet_disconnect_args_t _a = {}, *a = &_a; + proxy_main_t *pm = &proxy_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&pm->sessions_lock)); + + a->handle = ps->ao.session_handle; + a->app_index = pm->active_open_app_index; + vnet_disconnect_session (a); + + ps->ao_disconnected = 1; +} + +static void proxy_try_close_session (session_t * s, int is_active_open) { proxy_main_t *pm = &proxy_main; - proxy_session_t *ps = 0; - vnet_disconnect_args_t _a, *a = &_a; + proxy_session_side_ctx_t *sc; + proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); if (is_active_open) { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - ps->ao_disconnected = 1; + proxy_session_close_ao (ps); if (!ps->po_disconnected) { - ASSERT (ps->vpp_server_handle != SESSION_INVALID_HANDLE); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + ASSERT (ps->po.session_handle != SESSION_INVALID_HANDLE); + proxy_session_close_po (ps); } } else { - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; + proxy_session_close_po (ps); if (!ps->ao_disconnected && !ps->active_open_establishing) { /* Proxy session closed before active open */ - if (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE) - { - a->handle = ps->vpp_active_open_handle; - a->app_index = pm->active_open_app_index; - vnet_disconnect_session (a); - } + if (ps->ao.session_handle != SESSION_INVALID_HANDLE) + proxy_session_close_ao (ps); ps->ao_disconnected = 1; } } @@ -181,29 +291,63 @@ proxy_try_close_session (session_t * s, int is_active_open) } static void +proxy_try_side_ctx_cleanup (session_t *s) +{ + proxy_main_t *pm = &proxy_main; + proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + if (sc->state == PROXY_SC_S_CREATED) + return; + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + + if (!ps->po_disconnected) + proxy_session_close_po (ps); + + if (!ps->ao_disconnected) + proxy_session_close_ao (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); +} + +static void proxy_try_delete_session (session_t * s, u8 is_active_open) { proxy_main_t *pm = &proxy_main; proxy_session_t *ps = 0; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + u32 ps_index; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + ps_index = sc->ps_index; + + proxy_session_side_ctx_free (wrk, sc); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (ps_index); if (is_active_open) { - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; + ps->ao.session_handle = SESSION_INVALID_HANDLE; /* Revert master thread index change on connect notification */ - ps->server_rx_fifo->master_thread_index = ps->po_thread_index; + ps->po.rx_fifo->master_thread_index = + ps->po.tx_fifo->master_thread_index; /* Passive open already cleaned up */ - if (ps->vpp_server_handle == SESSION_INVALID_HANDLE) + if (ps->po.session_handle == SESSION_INVALID_HANDLE) { - ASSERT (s->rx_fifo->refcnt == 1); - /* The two sides of the proxy on different threads */ - if (ps->po_thread_index != s->thread_index) + if (ps->po.tx_fifo->master_thread_index != s->thread_index) { /* This is not the right thread to delete the fifos */ s->rx_fifo = 0; @@ -211,14 +355,17 @@ proxy_try_delete_session (session_t * s, u8 is_active_open) proxy_session_postponed_free (ps); } else - proxy_session_free (ps); + { + ASSERT (s->rx_fifo->refcnt == 1); + proxy_session_free (ps); + } } } else { - ps->vpp_server_handle = SESSION_INVALID_HANDLE; + ps->po.session_handle = SESSION_INVALID_HANDLE; - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) + if (ps->ao.session_handle == SESSION_INVALID_HANDLE) { if (!ps->active_open_establishing) proxy_session_free (ps); @@ -275,16 +422,26 @@ static int proxy_accept_callback (session_t * s) { proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; proxy_session_t *ps; + proxy_worker_t *wrk; + transport_proto_t tp = session_get_transport_proto (s); + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_alloc (wrk); + s->opaque = sc->sc_index; clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_alloc (); - ps->vpp_server_handle = session_handle (s); - ps->vpp_active_open_handle = SESSION_INVALID_HANDLE; - ps->po_thread_index = s->thread_index; - s->opaque = ps->ps_index; + ps->po.session_handle = session_handle (s); + ps->po.rx_fifo = s->rx_fifo; + ps->po.tx_fifo = s->tx_fifo; + + ps->ao.session_handle = SESSION_INVALID_HANDLE; + sc->ps_index = ps->ps_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -325,92 +482,167 @@ proxy_transport_needs_crypto (transport_proto_t proto) return proto == TRANSPORT_PROTO_TLS; } -static int -proxy_rx_callback (session_t * s) +static void +proxy_session_start_connect (proxy_session_side_ctx_t *sc, session_t *s) { + int actual_transfer __attribute__ ((unused)); + vnet_connect_args_t _a = {}, *a = &_a; proxy_main_t *pm = &proxy_main; - u32 thread_index = vlib_get_thread_index (); - svm_fifo_t *ao_tx_fifo; + u32 max_dequeue, ps_index; proxy_session_t *ps; - - ASSERT (s->thread_index == thread_index); + transport_proto_t tp = session_get_transport_proto (s); clib_spinlock_lock_if_init (&pm->sessions_lock); - ps = proxy_session_get (s->opaque); + ps = proxy_session_get (sc->ps_index); - if (PREDICT_TRUE (ps->vpp_active_open_handle != SESSION_INVALID_HANDLE)) + /* maybe we were already here */ + if (ps->active_open_establishing) { clib_spinlock_unlock_if_init (&pm->sessions_lock); + return; + } - ao_tx_fifo = s->rx_fifo; + ps->active_open_establishing = 1; + ps_index = ps->ps_index; - /* - * Send event for active open tx fifo - */ - if (svm_fifo_set_event (ao_tx_fifo)) + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + if (tp == TRANSPORT_PROTO_HTTP) + { + http_msg_t msg; + u8 *target_buf = 0; + http_uri_t target_uri; + http_header_t *resp_headers = 0; + session_endpoint_cfg_t target_sep = SESSION_ENDPOINT_CFG_NULL; + int rv; + + rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg); + ASSERT (rv == sizeof (msg)); + + if (msg.type != HTTP_MSG_REQUEST) + { + proxy_send_http_resp (s, HTTP_STATUS_INTERNAL_ERROR, 0); + return; + } + if (msg.method_type != HTTP_REQ_CONNECT) + { + http_add_header (&resp_headers, + http_header_name_token (HTTP_HEADER_ALLOW), + http_token_lit ("CONNECT")); + proxy_send_http_resp (s, HTTP_STATUS_METHOD_NOT_ALLOWED, + resp_headers); + vec_free (resp_headers); + return; + } + + if (msg.data.target_form != HTTP_TARGET_AUTHORITY_FORM || + msg.data.target_path_len == 0) { - u32 ao_thread_index = ao_tx_fifo->master_thread_index; - u32 ao_session_index = ao_tx_fifo->shr->master_session_index; - if (session_send_io_evt_to_thread_custom (&ao_session_index, - ao_thread_index, - SESSION_IO_EVT_TX)) - clib_warning ("failed to enqueue tx evt"); + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; } - if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) - svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + /* read target uri */ + target_buf = vec_new (u8, msg.data.target_path_len); + rv = svm_fifo_peek (s->rx_fifo, msg.data.target_path_offset, + msg.data.target_path_len, target_buf); + ASSERT (rv == msg.data.target_path_len); + svm_fifo_dequeue_drop (s->rx_fifo, msg.data.len); + rv = http_parse_authority_form_target (target_buf, &target_uri); + vec_free (target_buf); + if (rv) + { + proxy_send_http_resp (s, HTTP_STATUS_BAD_REQUEST, 0); + return; + } + target_sep.is_ip4 = target_uri.is_ip4; + target_sep.ip = target_uri.ip; + target_sep.port = target_uri.port; + target_sep.transport_proto = TRANSPORT_PROTO_TCP; + clib_memcpy (&a->sep_ext, &target_sep, sizeof (target_sep)); } else { - vnet_connect_args_t _a, *a = &_a; - svm_fifo_t *tx_fifo, *rx_fifo; - u32 max_dequeue, ps_index; - int actual_transfer __attribute__ ((unused)); + max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + if (PREDICT_FALSE (max_dequeue == 0)) + return; - rx_fifo = s->rx_fifo; - tx_fifo = s->tx_fifo; + max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); + actual_transfer = + svm_fifo_peek (s->rx_fifo, 0 /* relative_offset */, max_dequeue, + pm->rx_buf[s->thread_index]); - ASSERT (rx_fifo->master_thread_index == thread_index); - ASSERT (tx_fifo->master_thread_index == thread_index); + /* Expectation is that here actual data just received is parsed and based + * on its contents, the destination and parameters of the connect to the + * upstream are decided + */ - max_dequeue = svm_fifo_max_dequeue_cons (s->rx_fifo); + clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); + } - if (PREDICT_FALSE (max_dequeue == 0)) - { - clib_spinlock_unlock_if_init (&pm->sessions_lock); - return 0; - } + a->api_context = ps_index; + a->app_index = pm->active_open_app_index; - max_dequeue = clib_min (pm->rcv_buffer_size, max_dequeue); - actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ , - max_dequeue, pm->rx_buf[thread_index]); + if (proxy_transport_needs_crypto (a->sep.transport_proto)) + { + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = pm->ckpair_index; + } - /* $$$ your message in this space: parse url, etc. */ + proxy_program_connect (a); +} - clib_memset (a, 0, sizeof (*a)); +static int +proxy_rx_callback (session_t *s) +{ + proxy_session_side_ctx_t *sc; + svm_fifo_t *ao_tx_fifo; + proxy_session_t *ps; + proxy_worker_t *wrk; - ps->server_rx_fifo = rx_fifo; - ps->server_tx_fifo = tx_fifo; - ps->active_open_establishing = 1; - ps_index = ps->ps_index; + ASSERT (s->thread_index == vlib_get_thread_index ()); - clib_spinlock_unlock_if_init (&pm->sessions_lock); + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); - clib_memcpy (&a->sep_ext, &pm->client_sep, sizeof (pm->client_sep)); - a->api_context = ps_index; - a->app_index = pm->active_open_app_index; + if (PREDICT_FALSE (sc->state < PROXY_SC_S_ESTABLISHED)) + { + proxy_main_t *pm = &proxy_main; - if (proxy_transport_needs_crypto (a->sep.transport_proto)) + if (sc->state == PROXY_SC_S_CREATED) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; + proxy_session_start_connect (sc, s); + sc->state = PROXY_SC_S_CONNECTING; + return 0; } - proxy_call_main_thread (a); + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + sc->pair = ps->ao; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + if (sc->pair.session_handle == SESSION_INVALID_HANDLE) + return 0; + + sc->state = PROXY_SC_S_ESTABLISHED; } + ao_tx_fifo = s->rx_fifo; + + /* + * Send event for active open tx fifo + */ + if (svm_fifo_set_event (ao_tx_fifo)) + session_program_tx_io_evt (sc->pair.session_handle, SESSION_IO_EVT_TX); + + if (svm_fifo_max_enqueue (ao_tx_fifo) <= TCP_MSS) + svm_fifo_add_want_deq_ntf (ao_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return 0; } @@ -418,20 +650,20 @@ static void proxy_force_ack (void *handlep) { transport_connection_t *tc; - session_t *ao_s; + session_t *s; - ao_s = session_get_from_handle (pointer_to_uword (handlep)); - if (session_get_transport_proto (ao_s) != TRANSPORT_PROTO_TCP) + s = session_get_from_handle (pointer_to_uword (handlep)); + if (session_get_transport_proto (s) != TRANSPORT_PROTO_TCP) return; - tc = session_get_transport (ao_s); + tc = session_get_transport (s); tcp_send_ack ((tcp_connection_t *) tc); } static int proxy_tx_callback (session_t * proxy_s) { - proxy_main_t *pm = &proxy_main; - proxy_session_t *ps; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10); @@ -441,21 +673,17 @@ proxy_tx_callback (session_t * proxy_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (proxy_s->opaque); - - if (ps->vpp_active_open_handle == SESSION_INVALID_HANDLE) - goto unlock; + wrk = proxy_worker_get (proxy_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, proxy_s->opaque); + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; /* Force ack on active open side to update rcv wnd. Make sure it's done on * the right thread */ - void *arg = uword_to_pointer (ps->vpp_active_open_handle, void *); - session_send_rpc_evt_to_thread (ps->server_rx_fifo->master_thread_index, - proxy_force_ack, arg); - -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); return 0; } @@ -464,7 +692,10 @@ static void proxy_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf) { if (ntf == SESSION_CLEANUP_TRANSPORT) - return; + { + proxy_try_side_ctx_cleanup (s); + return; + } proxy_try_delete_session (s, 0 /* is_active_open */ ); } @@ -490,10 +721,17 @@ active_open_alloc_session_fifos (session_t *s) clib_spinlock_lock_if_init (&pm->sessions_lock); + /* Active open opaque is pointing at proxy session */ ps = proxy_session_get (s->opaque); - txf = ps->server_rx_fifo; - rxf = ps->server_tx_fifo; + if (ps->po_disconnected) + { + clib_spinlock_unlock_if_init (&pm->sessions_lock); + return SESSION_E_ALLOC; + } + + txf = ps->po.rx_fifo; + rxf = ps->po.tx_fifo; /* * Reset the active-open tx-fifo master indices so the active-open session @@ -524,31 +762,43 @@ active_open_connected_callback (u32 app_index, u32 opaque, { proxy_main_t *pm = &proxy_main; proxy_session_t *ps; - u8 thread_index = vlib_get_thread_index (); - - /* - * Setup proxy session handle. - */ - clib_spinlock_lock_if_init (&pm->sessions_lock); - - ps = proxy_session_get (opaque); + proxy_worker_t *wrk; + proxy_session_side_ctx_t *sc; + session_t *po_s; + transport_proto_t tp; /* Connection failed */ if (err) { - vnet_disconnect_args_t _a, *a = &_a; + clib_spinlock_lock_if_init (&pm->sessions_lock); - a->handle = ps->vpp_server_handle; - a->app_index = pm->server_app_index; - vnet_disconnect_session (a); - ps->po_disconnected = 1; - } - else - { - ps->vpp_active_open_handle = session_handle (s); - ps->active_open_establishing = 0; + ps = proxy_session_get (opaque); + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_BAD_GATEWAY, 0); + } + ps->ao_disconnected = 1; + proxy_session_close_po (ps); + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + return 0; } + wrk = proxy_worker_get (s->thread_index); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (opaque); + + ps->ao.rx_fifo = s->rx_fifo; + ps->ao.tx_fifo = s->tx_fifo; + ps->ao.session_handle = session_handle (s); + + ps->active_open_establishing = 0; + /* Passive open session was already closed! */ if (ps->po_disconnected) { @@ -558,21 +808,136 @@ active_open_connected_callback (u32 app_index, u32 opaque, return -1; } - s->opaque = opaque; + po_s = session_get_from_handle (ps->po.session_handle); + tp = session_get_transport_proto (po_s); + + sc = proxy_session_side_ctx_alloc (wrk); + sc->pair = ps->po; + sc->ps_index = ps->ps_index; clib_spinlock_unlock_if_init (&pm->sessions_lock); - /* - * Send event for active open tx fifo - */ - ASSERT (s->thread_index == thread_index); - if (svm_fifo_set_event (s->tx_fifo)) - session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX); + sc->state = PROXY_SC_S_ESTABLISHED; + s->opaque = sc->sc_index; + sc->is_http = tp == TRANSPORT_PROTO_HTTP ? 1 : 0; + + if (tp == TRANSPORT_PROTO_HTTP) + { + proxy_send_http_resp (po_s, HTTP_STATUS_OK, 0); + } + else + { + /* + * Send event for active open tx fifo + */ + ASSERT (s->thread_index == vlib_get_thread_index ()); + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (session_handle (s), SESSION_IO_EVT_TX); + } return 0; } static void +active_open_migrate_po_fixup_rpc (void *arg) +{ + u32 ps_index = pointer_to_uword (arg); + proxy_session_side_ctx_t *po_sc; + proxy_main_t *pm = &proxy_main; + session_handle_t po_sh; + proxy_worker_t *wrk; + proxy_session_t *ps; + session_t *po_s; + + wrk = proxy_worker_get (vlib_get_thread_index ()); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (ps_index); + + po_s = session_get_from_handle (ps->po.session_handle); + po_s->rx_fifo = ps->po.rx_fifo; + po_s->tx_fifo = ps->po.tx_fifo; + + po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque); + po_sc->pair = ps->ao; + po_sh = ps->po.session_handle; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_program_tx_io_evt (po_sh, SESSION_IO_EVT_TX); +} + +static void +active_open_migrate_rpc (void *arg) +{ + u32 ps_index = pointer_to_uword (arg); + proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; + proxy_session_t *ps; + session_t *s; + + wrk = proxy_worker_get (vlib_get_thread_index ()); + sc = proxy_session_side_ctx_alloc (wrk); + + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (ps_index); + sc->ps_index = ps->ps_index; + + s = session_get_from_handle (ps->ao.session_handle); + s->opaque = sc->sc_index; + s->flags &= ~SESSION_F_IS_MIGRATING; + + /* Fixup passive open session because of migration and zc */ + ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo; + ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo; + + ps->po.tx_fifo->shr->master_session_index = + session_index_from_handle (ps->po.session_handle); + ps->po.tx_fifo->master_thread_index = + session_thread_from_handle (ps->po.session_handle); + + sc->pair = ps->po; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), + active_open_migrate_po_fixup_rpc, uword_to_pointer (sc->ps_index, void *)); +} + +static void +active_open_migrate_callback (session_t *s, session_handle_t new_sh) +{ + proxy_main_t *pm = &proxy_main; + proxy_session_side_ctx_t *sc; + proxy_session_t *ps; + proxy_worker_t *wrk; + + wrk = proxy_worker_get (s->thread_index); + sc = proxy_session_side_ctx_get (wrk, s->opaque); + + /* NOTE: this is just an example. ZC makes this migration rather + * tedious. Probably better approaches could be found */ + clib_spinlock_lock_if_init (&pm->sessions_lock); + + ps = proxy_session_get (sc->ps_index); + ps->ao.session_handle = new_sh; + ps->ao.rx_fifo = 0; + ps->ao.tx_fifo = 0; + + clib_spinlock_unlock_if_init (&pm->sessions_lock); + + session_send_rpc_evt_to_thread (session_thread_from_handle (new_sh), + active_open_migrate_rpc, + uword_to_pointer (sc->ps_index, void *)); + + proxy_session_side_ctx_free (wrk, sc); +} + +static void active_open_reset_callback (session_t * s) { proxy_try_close_session (s, 1 /* is_active_open */ ); @@ -618,10 +983,8 @@ active_open_rx_callback (session_t * s) static int active_open_tx_callback (session_t * ao_s) { - proxy_main_t *pm = &proxy_main; - transport_connection_t *tc; - proxy_session_t *ps; - session_t *proxy_s; + proxy_session_side_ctx_t *sc; + proxy_worker_t *wrk; u32 min_free; min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10); @@ -631,23 +994,27 @@ active_open_tx_callback (session_t * ao_s) return 0; } - clib_spinlock_lock_if_init (&pm->sessions_lock); + wrk = proxy_worker_get (ao_s->thread_index); + sc = proxy_session_side_ctx_get (wrk, ao_s->opaque); - ps = proxy_session_get_if_valid (ao_s->opaque); - if (!ps) - goto unlock; - - if (ps->vpp_server_handle == ~0) - goto unlock; - - proxy_s = session_get_from_handle (ps->vpp_server_handle); - - /* Force ack on proxy side to update rcv wnd */ - tc = session_get_transport (proxy_s); - tcp_send_ack ((tcp_connection_t *) tc); + if (sc->state < PROXY_SC_S_ESTABLISHED) + return 0; -unlock: - clib_spinlock_unlock_if_init (&pm->sessions_lock); + if (sc->is_http) + { + /* notify HTTP transport */ + session_t *po = session_get_from_handle (sc->pair.session_handle); + session_send_io_evt_to_thread_custom ( + &po->session_index, po->thread_index, SESSION_IO_EVT_RX); + } + else + { + /* Force ack on proxy side to update rcv wnd */ + void *arg = uword_to_pointer (sc->pair.session_handle, void *); + session_send_rpc_evt_to_thread ( + session_thread_from_handle (sc->pair.session_handle), proxy_force_ack, + arg); + } return 0; } @@ -664,6 +1031,7 @@ active_open_cleanup_callback (session_t * s, session_cleanup_ntf_t ntf) static session_cb_vft_t active_open_clients = { .session_reset_callback = active_open_reset_callback, .session_connected_callback = active_open_connected_callback, + .session_migrate_callback = active_open_migrate_callback, .session_accept_callback = active_open_create_callback, .session_disconnect_callback = active_open_disconnect_callback, .session_cleanup_callback = active_open_cleanup_callback, @@ -756,22 +1124,33 @@ proxy_server_listen () { proxy_main_t *pm = &proxy_main; vnet_listen_args_t _a, *a = &_a; - int rv; + int rv, need_crypto; clib_memset (a, 0, sizeof (*a)); a->app_index = pm->server_app_index; clib_memcpy (&a->sep_ext, &pm->server_sep, sizeof (pm->server_sep)); - if (proxy_transport_needs_crypto (a->sep.transport_proto)) + /* Make sure listener is marked connected for transports like udp */ + a->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED; + need_crypto = proxy_transport_needs_crypto (a->sep.transport_proto); + if (need_crypto) + { + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_CRYPTO, + sizeof (transport_endpt_crypto_cfg_t)); + ext_cfg->crypto.ckpair_index = pm->ckpair_index; + } + /* set http timeout for connect-proxy */ + if (pm->server_sep.transport_proto == TRANSPORT_PROTO_HTTP) { - session_endpoint_alloc_ext_cfg (&a->sep_ext, - TRANSPORT_ENDPT_EXT_CFG_CRYPTO); - a->sep_ext.ext_cfg->crypto.ckpair_index = pm->ckpair_index; + transport_endpt_ext_cfg_t *ext_cfg = session_endpoint_add_ext_cfg ( + &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (ext_cfg->opaque)); + ext_cfg->opaque = pm->idle_timeout; } rv = vnet_listen (a); - if (a->sep_ext.ext_cfg) - clib_mem_free (a->sep_ext.ext_cfg); + if (need_crypto) + session_endpoint_free_ext_cfgs (&a->sep_ext); return rv; } @@ -797,15 +1176,25 @@ proxy_server_create (vlib_main_t * vm) { vlib_thread_main_t *vtm = vlib_get_thread_main (); proxy_main_t *pm = &proxy_main; + proxy_worker_t *wrk; u32 num_threads; int i; + if (vlib_num_workers ()) + clib_spinlock_init (&pm->sessions_lock); + num_threads = 1 /* main thread */ + vtm->n_threads; vec_validate (pm->rx_buf, num_threads - 1); for (i = 0; i < num_threads; i++) vec_validate (pm->rx_buf[i], pm->rcv_buffer_size); + vec_validate (pm->workers, vlib_num_workers ()); + vec_foreach (wrk, pm->workers) + { + clib_spinlock_init (&wrk->pending_connects_lock); + } + proxy_server_add_ckpair (); if (proxy_server_attach ()) @@ -813,11 +1202,6 @@ proxy_server_create (vlib_main_t * vm) clib_warning ("failed to attach server app"); return -1; } - if (proxy_server_listen ()) - { - clib_warning ("failed to start listening"); - return -1; - } if (active_open_attach ()) { clib_warning ("failed to attach active open app"); @@ -849,9 +1233,6 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, pm->private_segment_count = 0; pm->segment_size = 512 << 20; - if (vlib_num_workers ()) - clib_spinlock_init (&pm->sessions_lock); - if (!unformat_user (input, unformat_line_input, line_input)) return 0; @@ -883,6 +1264,8 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, vec_add1 (server_uri, 0); else if (unformat (line_input, "client-uri %s", &client_uri)) vec_add1 (client_uri, 0); + else if (unformat (line_input, "idle-timeout %d", &pm->idle_timeout)) + ; else { error = clib_error_return (0, "unknown input `%U'", @@ -897,35 +1280,45 @@ proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, default_server_uri); server_uri = format (0, "%s%c", default_server_uri, 0); } - if (!client_uri) - { - clib_warning ("No client-uri provided, Using default: %s", - default_client_uri); - client_uri = format (0, "%s%c", default_client_uri, 0); - } - if (parse_uri ((char *) server_uri, &pm->server_sep)) { error = clib_error_return (0, "Invalid server uri %v", server_uri); goto done; } - if (parse_uri ((char *) client_uri, &pm->client_sep)) + + /* http proxy get target within request */ + if (pm->server_sep.transport_proto != TRANSPORT_PROTO_HTTP) { - error = clib_error_return (0, "Invalid client uri %v", client_uri); - goto done; + if (!client_uri) + { + clib_warning ("No client-uri provided, Using default: %s", + default_client_uri); + client_uri = format (0, "%s%c", default_client_uri, 0); + } + if (parse_uri ((char *) client_uri, &pm->client_sep)) + { + error = clib_error_return (0, "Invalid client uri %v", client_uri); + goto done; + } } - vnet_session_enable_disable (vm, 1 /* turn on session and transport */ ); - - rv = proxy_server_create (vm); - switch (rv) + if (pm->server_app_index == APP_INVALID_INDEX) { - case 0: - break; - default: - error = clib_error_return (0, "server_create returned %d", rv); + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vnet_session_enable_disable (vm, &args); + rv = proxy_server_create (vm); + if (rv) + { + error = clib_error_return (0, "server_create returned %d", rv); + goto done; + } } + if (proxy_server_listen ()) + error = clib_error_return (0, "failed to start listening"); + done: unformat_free (line_input); vec_free (client_uri); @@ -933,14 +1326,14 @@ done: return error; } -VLIB_CLI_COMMAND (proxy_create_command, static) = -{ +VLIB_CLI_COMMAND (proxy_create_command, static) = { .path = "test proxy server", - .short_help = "test proxy server [server-uri <tcp://ip/port>]" - "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" - "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" - "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" - "[private-segment-size <mem>][private-segment-count <nn>]", + .short_help = "test proxy server [server-uri <proto://ip/port>]" + "[client-uri <tcp://ip/port>][fifo-size <nn>[k|m]]" + "[max-fifo-size <nn>[k|m]][high-watermark <nn>]" + "[low-watermark <nn>][rcv-buf-size <nn>][prealloc-fifos <nn>]" + "[private-segment-size <mem>][private-segment-count <nn>]" + "[idle-timeout <nn>]", .function = proxy_server_create_command_fn, }; @@ -950,6 +1343,8 @@ proxy_main_init (vlib_main_t * vm) proxy_main_t *pm = &proxy_main; pm->server_client_index = ~0; pm->active_open_client_index = ~0; + pm->server_app_index = APP_INVALID_INDEX; + pm->idle_timeout = 600; /* connect-proxy default idle timeout 10 minutes */ return 0; } |