diff options
author | Dave Barach <dave@barachs.net> | 2017-08-07 09:35:25 -0400 |
---|---|---|
committer | Dave Wallace <dwallacelf@gmail.com> | 2017-08-10 00:13:08 +0000 |
commit | 52851e6aa9304054fd1059c8dd284abf8e532bf2 (patch) | |
tree | efe4334245044e342817b28d69ee208fba9952ec | |
parent | b639b593372612fd3a55b3137b4fdd62df775d96 (diff) |
TCP proxy prototype
- Clean up internal API client registration
- Add proxy server
- Add a reference count to the svm fifo
Change-Id: I5ace1c85497062ed412d26ae76a9e6741af1e984
Signed-off-by: Dave Barach <dave@barachs.net>
Signed-off-by: Florin Coras <fcoras@cisco.com>
-rw-r--r-- | src/svm/svm_fifo.c | 10 | ||||
-rw-r--r-- | src/svm/svm_fifo.h | 1 | ||||
-rw-r--r-- | src/svm/svm_fifo_segment.c | 5 | ||||
-rw-r--r-- | src/vlibmemory/api_common.h | 1 | ||||
-rw-r--r-- | src/vlibmemory/memory_vlib.c | 38 | ||||
-rw-r--r-- | src/vnet.am | 1 | ||||
-rw-r--r-- | src/vnet/session/application.c | 6 | ||||
-rw-r--r-- | src/vnet/session/application.h | 1 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 4 | ||||
-rw-r--r-- | src/vnet/session/session.c | 43 | ||||
-rw-r--r-- | src/vnet/session/session.h | 8 | ||||
-rw-r--r-- | src/vnet/session/stream_session.h | 5 | ||||
-rw-r--r-- | src/vnet/tcp/builtin_client.c | 95 | ||||
-rw-r--r-- | src/vnet/tcp/builtin_proxy.c | 599 | ||||
-rw-r--r-- | src/vnet/tcp/builtin_proxy.h | 100 | ||||
-rw-r--r-- | src/vnet/tcp/builtin_server.c | 96 |
16 files changed, 801 insertions, 212 deletions
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c index fc2189c5176..e478c06e891 100644 --- a/src/svm/svm_fifo.c +++ b/src/svm/svm_fifo.c @@ -201,14 +201,20 @@ svm_fifo_create (u32 data_size_in_bytes) memset (f, 0, sizeof (*f)); f->nitems = data_size_in_bytes; f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX; + f->refcnt = 1; return (f); } void svm_fifo_free (svm_fifo_t * f) { - pool_free (f->ooo_segments); - clib_mem_free (f); + ASSERT (f->refcnt > 0); + + if (--f->refcnt == 0) + { + pool_free (f->ooo_segments); + clib_mem_free (f); + } } always_inline ooo_segment_t * diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h index a83cd858f83..f10b4d91a2a 100644 --- a/src/svm/svm_fifo.h +++ b/src/svm/svm_fifo.h @@ -75,6 +75,7 @@ typedef struct _svm_fifo #if SVM_FIFO_TRACE svm_fifo_trace_elem_t *trace; #endif + i8 refcnt; CLIB_CACHE_LINE_ALIGN_MARK (data); } svm_fifo_t; diff --git a/src/svm/svm_fifo_segment.c b/src/svm/svm_fifo_segment.c index 86661e516a2..c04b9d8ccba 100644 --- a/src/svm/svm_fifo_segment.c +++ b/src/svm/svm_fifo_segment.c @@ -296,6 +296,7 @@ svm_fifo_segment_alloc_fifo (svm_fifo_segment_private_t * s, memset (f, 0, sizeof (*f)); f->nitems = data_size_in_bytes; f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX; + f->refcnt = 1; goto found; } /* FALLTHROUGH */ @@ -344,6 +345,10 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f, svm_fifo_segment_header_t *fsh; void *oldheap; + ASSERT (f->refcnt > 0); + + if (--f->refcnt > 0) + return; sh = s->ssvm.sh; fsh = (svm_fifo_segment_header_t *) sh->opaque[0]; diff --git a/src/vlibmemory/api_common.h b/src/vlibmemory/api_common.h index 53909cc4184..19daecdfb6a 100644 --- a/src/vlibmemory/api_common.h +++ b/src/vlibmemory/api_common.h @@ -126,6 +126,7 @@ int vl_client_connect_to_vlib_no_rx_pthread (const char *svm_name, u16 vl_client_get_first_plugin_msg_id (const char *plugin_name); void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length); +u32 vl_api_memclnt_create_internal (char *, unix_shared_memory_queue_t *); #endif /* included_vlibmemory_api_common_h */ diff --git a/src/vlibmemory/memory_vlib.c b/src/vlibmemory/memory_vlib.c index 004a997455e..688ce6044e0 100644 --- a/src/vlibmemory/memory_vlib.c +++ b/src/vlibmemory/memory_vlib.c @@ -137,6 +137,44 @@ vl_api_serialize_message_table (api_main_t * am, u8 * vector) } /* + * vl_api_memclnt_create_internal + */ + +u32 +vl_api_memclnt_create_internal (char *name, unix_shared_memory_queue_t * q) +{ + vl_api_registration_t **regpp; + vl_api_registration_t *regp; + svm_region_t *svm; + void *oldheap; + api_main_t *am = &api_main; + + ASSERT (vlib_get_thread_index () == 0); + pool_get (am->vl_clients, regpp); + + svm = am->vlib_rp; + + pthread_mutex_lock (&svm->mutex); + oldheap = svm_push_data_heap (svm); + *regpp = clib_mem_alloc (sizeof (vl_api_registration_t)); + + regp = *regpp; + memset (regp, 0, sizeof (*regp)); + regp->registration_type = REGISTRATION_TYPE_SHMEM; + regp->vl_api_registration_pool_index = regpp - am->vl_clients; + + regp->vl_input_queue = q; + regp->name = format (0, "%s%c", name, 0); + + pthread_mutex_unlock (&svm->mutex); + svm_pop_heap (oldheap); + return vl_msg_api_handle_from_index_and_epoch + (regp->vl_api_registration_pool_index, + am->shmem_hdr->application_restarts); +} + + +/* * vl_api_memclnt_create_t_handler */ void diff --git a/src/vnet.am b/src/vnet.am index ad84c028830..ede0376de1e 100644 --- a/src/vnet.am +++ b/src/vnet.am @@ -475,6 +475,7 @@ libvnet_la_SOURCES += \ vnet/tcp/builtin_client.c \ vnet/tcp/builtin_server.c \ vnet/tcp/builtin_http_server.c \ + vnet/tcp/builtin_proxy.c \ vnet/tcp/tcp_test.c \ vnet/tcp/tcp.c diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 076c66f1fff..78c41b938cf 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -367,6 +367,12 @@ app_get_name_from_reg_index (application_t * app) return app_name; } +int +application_is_proxy (application_t * app) +{ + return !(app->flags & APP_OPTIONS_FLAGS_IS_PROXY); +} + u8 * format_application_listener (u8 * s, va_list * args) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 35caae85dbb..29d37a06b18 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -116,6 +116,7 @@ segment_manager_t *application_get_listen_segment_manager (application_t * s); segment_manager_t *application_get_connect_segment_manager (application_t * app); +int application_is_proxy (application_t * app); #endif /* SRC_VNET_SESSION_APPLICATION_H_ */ diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index ed9f89b361e..1d63f6cc102 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -100,6 +100,9 @@ typedef struct _vnet_connect_args /* Used for redirects */ void *mp; + + /* used for proxy connections */ + u64 server_handle; } vnet_connect_args_t; typedef struct _vnet_disconnect_args_t @@ -129,6 +132,7 @@ typedef enum _(USE_FIFO, "Use FIFO with redirects") \ _(ADD_SEGMENT, "Add segment and signal app if needed") \ _(BUILTIN_APP, "Application is builtin") \ + _(IS_PROXY, "Application is proxying") typedef enum _app_options { diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 4ba152917ac..991bcd5a53f 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -30,7 +30,7 @@ extern transport_proto_vft_t *tp_vfts; int stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, - stream_session_t ** ret_s) + u8 alloc_fifos, stream_session_t ** ret_s) { session_manager_main_t *smm = &session_manager_main; svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0; @@ -43,31 +43,37 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, ASSERT (thread_index == vlib_get_thread_index ()); - if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo, - &server_tx_fifo, - &fifo_segment_index))) - return rv; - /* Create the session */ pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES); memset (s, 0, sizeof (*s)); - - /* Initialize backpointers */ pool_index = s - smm->sessions[thread_index]; - server_rx_fifo->master_session_index = pool_index; - server_rx_fifo->master_thread_index = thread_index; - server_tx_fifo->master_session_index = pool_index; - server_tx_fifo->master_thread_index = thread_index; + /* Allocate fifos */ + if (alloc_fifos) + { + if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo, + &server_tx_fifo, + &fifo_segment_index))) + { + pool_put (smm->sessions[thread_index], s); + return rv; + } + /* Initialize backpointers */ + server_rx_fifo->master_session_index = pool_index; + server_rx_fifo->master_thread_index = thread_index; + + server_tx_fifo->master_session_index = pool_index; + server_tx_fifo->master_thread_index = thread_index; - s->server_rx_fifo = server_rx_fifo; - s->server_tx_fifo = server_tx_fifo; + s->server_rx_fifo = server_rx_fifo; + s->server_tx_fifo = server_tx_fifo; + s->svm_segment_index = fifo_segment_index; + } /* Initialize state machine, such as it is... */ s->session_type = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4); s->session_state = SESSION_STATE_CONNECTING; - s->svm_segment_index = fifo_segment_index; s->thread_index = thread_index; s->session_index = pool_index; @@ -379,10 +385,11 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) if (!is_fail) { segment_manager_t *sm; + u8 alloc_fifos; sm = application_get_connect_segment_manager (app); - + alloc_fifos = application_is_proxy (app); /* Create new session (svm segments are allocated if needed) */ - if (stream_session_create_i (sm, tc, &new_s)) + if (stream_session_create_i (sm, tc, alloc_fifos, &new_s)) { is_fail = 1; error = -1; @@ -515,7 +522,7 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, server = application_get (listener->app_index); sm = application_get_listen_segment_manager (server, listener); - if ((rv = stream_session_create_i (sm, tc, &s))) + if ((rv = stream_session_create_i (sm, tc, 1, &s))) return rv; s->app_index = server->index; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 538433da0f2..74d82a408dc 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -183,10 +183,10 @@ stream_session_is_valid (u32 si, u8 thread_index) stream_session_t *s; s = pool_elt_at_index (session_manager_main.sessions[thread_index], si); if (s->thread_index != thread_index || s->session_index != si - || s->server_rx_fifo->master_session_index != si - || s->server_tx_fifo->master_session_index != si - || s->server_rx_fifo->master_thread_index != thread_index - || s->server_tx_fifo->master_thread_index != thread_index) + /* || s->server_rx_fifo->master_session_index != si + || s->server_tx_fifo->master_session_index != si + || s->server_rx_fifo->master_thread_index != thread_index + || s->server_tx_fifo->master_thread_index != thread_index */ ) return 0; return 1; } diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h index 82bbf521ce9..4c26321194d 100644 --- a/src/vnet/session/stream_session.h +++ b/src/vnet/session/stream_session.h @@ -83,8 +83,11 @@ typedef struct _stream_session_t u32 opaque2; + /** connected (server) session handle */ + u64 server_session_handle; + /** Opaque, pad to a 64-octet boundary */ - u64 opaque[2]; + u64 opaque[1]; } stream_session_t; #endif /* SRC_VNET_SESSION_STREAM_SESSION_H_ */ diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c index 0cb9faa8769..938e07ba505 100644 --- a/src/vnet/tcp/builtin_client.c +++ b/src/vnet/tcp/builtin_client.c @@ -24,25 +24,6 @@ #include <vlibsocket/api.h> #include <vpp/app/version.h> -/* define message IDs */ -#include <vpp/api/vpe_msg_enum.h> - -/* define message structures */ -#define vl_typedefs -#include <vpp/api/vpe_all_api_h.h> -#undef vl_typedefs - -/* define generated endian-swappers */ -#define vl_endianfun -#include <vpp/api/vpe_all_api_h.h> -#undef vl_endianfun - -/* instantiate all the print functions we know about */ -#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) -#define vl_printfun -#include <vpp/api/vpe_all_api_h.h> -#undef vl_printfun - #define TCP_BUILTIN_CLIENT_DBG (0) static void @@ -308,87 +289,16 @@ VLIB_REGISTER_NODE (builtin_client_node) = }; /* *INDENT-ON* */ -/* So we don't get "no handler for... " msgs */ -static void -vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp) -{ - vlib_main_t *vm = vlib_get_main (); - tclient_main_t *tm = &tclient_main; - tm->my_client_index = mp->index; - vlib_process_signal_event (vm, tm->cli_node_index, 1 /* evt */ , - 0 /* data */ ); -} - static int create_api_loopback (tclient_main_t * tm) { - vlib_main_t *vm = vlib_get_main (); - vl_api_memclnt_create_t _m, *mp = &_m; - extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *); api_main_t *am = &api_main; vl_shmem_hdr_t *shmem_hdr; - uword *event_data = 0, event_type; - int resolved = 0; - - /* - * Create a "loopback" API client connection - * Don't do things like this unless you know what you're doing... - */ shmem_hdr = am->shmem_hdr; tm->vl_input_queue = shmem_hdr->vl_input_queue; - memset (mp, 0, sizeof (*mp)); - mp->_vl_msg_id = VL_API_MEMCLNT_CREATE; - mp->context = 0xFEEDFACE; - mp->input_queue = pointer_to_uword (tm->vl_input_queue); - strncpy ((char *) mp->name, "tcp_clients_tester", sizeof (mp->name) - 1); - - vl_api_memclnt_create_t_handler (mp); - - /* Wait for reply */ - vlib_process_wait_for_event_or_clock (vm, 1.0); - event_type = vlib_process_get_events (vm, &event_data); - switch (event_type) - { - case 1: - resolved = 1; - break; - case ~0: - /* timed out */ - break; - default: - clib_warning ("unknown event_type %d", event_type); - } - if (!resolved) - return -1; - return 0; -} - -#define foreach_tclient_static_api_msg \ -_(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \ - -static clib_error_t * -tclient_api_hookup (vlib_main_t * vm) -{ - vl_msg_api_msg_config_t _c, *c = &_c; - - /* Hook up client-side static APIs to our handlers */ -#define _(N,n) do { \ - c->id = VL_API_##N; \ - c->name = #n; \ - c->handler = vl_api_##n##_t_handler; \ - c->cleanup = vl_noop_handler; \ - c->endian = vl_api_##n##_t_endian; \ - c->print = vl_api_##n##_t_print; \ - c->size = sizeof(vl_api_##n##_t); \ - c->traced = 1; /* trace, so these msgs print */ \ - c->replay = 0; /* don't replay client create/delete msgs */ \ - c->message_bounce = 0; /* don't bounce this message */ \ - vl_msg_api_config(c);} while (0); - - foreach_tclient_static_api_msg; -#undef _ - + tm->my_client_index = + vl_api_memclnt_create_internal ("tcp_test_client", tm->vl_input_queue); return 0; } @@ -400,7 +310,6 @@ tcp_test_clients_init (vlib_main_t * vm) u32 num_threads; int i; - tclient_api_hookup (vm); if (create_api_loopback (tm)) return -1; diff --git a/src/vnet/tcp/builtin_proxy.c b/src/vnet/tcp/builtin_proxy.c new file mode 100644 index 00000000000..d8cfb11d315 --- /dev/null +++ b/src/vnet/tcp/builtin_proxy.c @@ -0,0 +1,599 @@ +/* +* Copyright (c) 2015-2017 Cisco and/or its affiliates. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <vnet/vnet.h> +#include <vlibmemory/api.h> +#include <vnet/session/application.h> +#include <vnet/session/application_interface.h> +#include <vnet/tcp/builtin_proxy.h> + +builtin_proxy_main_t builtin_proxy_main; + +static void +delete_proxy_session (stream_session_t * s, int is_active_open) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + proxy_session_t *ps = 0; + vnet_disconnect_args_t _a, *a = &_a; + stream_session_t *active_open_session = 0; + stream_session_t *server_session = 0; + uword *p; + u64 handle; + + handle = stream_session_handle (s); + + clib_spinlock_lock_if_init (&bpm->sessions_lock); + if (is_active_open) + { + active_open_session = s; + + p = hash_get (bpm->proxy_session_by_active_open_handle, handle); + if (p == 0) + { + clib_warning ("proxy session for %s handle %lld (%llx) AWOL", + is_active_open ? "active open" : "server", + handle, handle); + } + else + { + ps = pool_elt_at_index (bpm->sessions, p[0]); + if (ps->vpp_server_handle != ~0) + server_session = stream_session_get_from_handle + (ps->vpp_server_handle); + else + server_session = 0; + } + } + else + { + server_session = s; + + p = hash_get (bpm->proxy_session_by_server_handle, handle); + if (p == 0) + { + clib_warning ("proxy session for %s handle %lld (%llx) AWOL", + is_active_open ? "active open" : "server", + handle, handle); + } + else + { + ps = pool_elt_at_index (bpm->sessions, p[0]); + if (ps->vpp_server_handle != ~0) + active_open_session = stream_session_get_from_handle + (ps->vpp_server_handle); + else + active_open_session = 0; + } + } + + if (ps) + { + if (CLIB_DEBUG > 0) + memset (ps, 0xFE, sizeof (*ps)); + pool_put (bpm->sessions, ps); + } + + clib_spinlock_unlock_if_init (&bpm->sessions_lock); + + if (active_open_session) + { + a->handle = stream_session_handle (active_open_session); + a->app_index = bpm->active_open_app_index; + hash_unset (bpm->proxy_session_by_active_open_handle, + stream_session_handle (active_open_session)); + vnet_disconnect_session (a); + } + + if (server_session) + { + a->handle = stream_session_handle (server_session); + a->app_index = bpm->server_app_index; + hash_unset (bpm->proxy_session_by_server_handle, + stream_session_handle (server_session)); + vnet_disconnect_session (a); + } +} + +static int +server_accept_callback (stream_session_t * s) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + + s->session_state = SESSION_STATE_READY; + + clib_spinlock_lock_if_init (&bpm->sessions_lock); + + return 0; +} + +static void +server_disconnect_callback (stream_session_t * s) +{ + delete_proxy_session (s, 0 /* is_active_open */ ); +} + +static void +server_reset_callback (stream_session_t * s) +{ + clib_warning ("Reset session %U", format_stream_session, s, 2); + delete_proxy_session (s, 0 /* is_active_open */ ); +} + +static int +server_connected_callback (u32 app_index, u32 api_context, + stream_session_t * s, u8 is_fail) +{ + clib_warning ("called..."); + return -1; +} + +static int +server_add_segment_callback (u32 client_index, + const u8 * seg_name, u32 seg_size) +{ + clib_warning ("called..."); + return -1; +} + +static int +server_redirect_connect_callback (u32 client_index, void *mp) +{ + clib_warning ("called..."); + return -1; +} + +static int +server_rx_callback (stream_session_t * s) +{ + u32 max_dequeue; + int actual_transfer __attribute__ ((unused)); + svm_fifo_t *tx_fifo, *rx_fifo; + builtin_proxy_main_t *bpm = &builtin_proxy_main; + u32 thread_index = vlib_get_thread_index (); + vnet_connect_args_t _a, *a = &_a; + proxy_session_t *ps; + int proxy_index; + uword *p; + svm_fifo_t *active_open_tx_fifo; + session_fifo_event_t evt; + + ASSERT (s->thread_index == thread_index); + + clib_spinlock_lock_if_init (&bpm->sessions_lock); + p = + hash_get (bpm->proxy_session_by_server_handle, stream_session_handle (s)); + + if (PREDICT_TRUE (p != 0)) + { + clib_spinlock_unlock_if_init (&bpm->sessions_lock); + active_open_tx_fifo = s->server_rx_fifo; + + /* + * Send event for active open tx fifo + */ + if (svm_fifo_set_event (active_open_tx_fifo)) + { + evt.fifo = active_open_tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; + if (unix_shared_memory_queue_add + (bpm->active_open_event_queue[thread_index], (u8 *) & evt, + 0 /* do wait for mutex */ )) + clib_warning ("failed to enqueue tx evt"); + } + } + else + { + rx_fifo = s->server_rx_fifo; + tx_fifo = s->server_tx_fifo; + + ASSERT (rx_fifo->master_thread_index == thread_index); + ASSERT (tx_fifo->master_thread_index == thread_index); + + max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo); + + if (PREDICT_FALSE (max_dequeue == 0)) + return 0; + + actual_transfer = svm_fifo_peek (rx_fifo, 0 /* relative_offset */ , + max_dequeue, + bpm->rx_buf[thread_index]); + + /* $$$ your message in this space: parse url, etc. */ + + memset (a, 0, sizeof (*a)); + + clib_spinlock_lock_if_init (&bpm->sessions_lock); + pool_get (bpm->sessions, ps); + memset (ps, 0, sizeof (*ps)); + ps->server_rx_fifo = rx_fifo; + ps->server_tx_fifo = tx_fifo; + ps->vpp_server_handle = stream_session_handle (s); + + proxy_index = ps - bpm->sessions; + + hash_set (bpm->proxy_session_by_server_handle, ps->vpp_server_handle, + proxy_index); + + clib_spinlock_unlock_if_init (&bpm->sessions_lock); + + a->uri = "tcp://6.0.2.2/23"; + a->api_context = proxy_index; + a->app_index = bpm->active_open_app_index; + a->mp = 0; + vnet_connect_uri (a); + } + + return 0; +} + +static session_cb_vft_t builtin_session_cb_vft = { + .session_accept_callback = server_accept_callback, + .session_disconnect_callback = server_disconnect_callback, + .session_connected_callback = server_connected_callback, + .add_segment_callback = server_add_segment_callback, + .redirect_connect_callback = server_redirect_connect_callback, + .builtin_server_rx_callback = server_rx_callback, + .session_reset_callback = server_reset_callback +}; + +static int +active_open_connected_callback (u32 app_index, u32 opaque, + stream_session_t * s, u8 is_fail) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + proxy_session_t *ps; + u8 thread_index = vlib_get_thread_index (); + session_fifo_event_t evt; + + if (is_fail) + { + clib_warning ("connection %d failed!", opaque); + return 0; + } + + /* + * Setup proxy session handle. + */ + clib_spinlock_lock_if_init (&bpm->sessions_lock); + + ps = pool_elt_at_index (bpm->sessions, opaque); + ps->vpp_active_open_handle = stream_session_handle (s); + + s->server_tx_fifo = ps->server_rx_fifo; + s->server_rx_fifo = ps->server_tx_fifo; + + /* + * Reset the active-open tx-fifo master indices so the active-open session + * will receive data, etc. + */ + s->server_tx_fifo->master_session_index = s->session_index; + s->server_tx_fifo->master_thread_index = s->thread_index; + + /* + * Account for the active-open session's use of the fifos + * so they won't disappear until the last session which uses + * them disappears + */ + s->server_tx_fifo->refcnt++; + s->server_rx_fifo->refcnt++; + + hash_set (bpm->proxy_session_by_active_open_handle, + ps->vpp_active_open_handle, opaque); + + clib_spinlock_unlock_if_init (&bpm->sessions_lock); + + /* + * Send event for active open tx fifo + */ + if (svm_fifo_set_event (s->server_tx_fifo)) + { + evt.fifo = s->server_tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; + if (unix_shared_memory_queue_add + (bpm->active_open_event_queue[thread_index], (u8 *) & evt, + 0 /* do wait for mutex */ )) + clib_warning ("failed to enqueue tx evt"); + } + + return 0; +} + +static void +active_open_reset_callback (stream_session_t * s) +{ + delete_proxy_session (s, 1 /* is_active_open */ ); +} + +static int +active_open_create_callback (stream_session_t * s) +{ + return 0; +} + +static void +active_open_disconnect_callback (stream_session_t * s) +{ + delete_proxy_session (s, 1 /* is_active_open */ ); +} + +static int +active_open_rx_callback (stream_session_t * s) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + session_fifo_event_t evt; + svm_fifo_t *server_rx_fifo; + u32 thread_index = vlib_get_thread_index (); + + server_rx_fifo = s->server_rx_fifo; + + /* + * Send event for server tx fifo + */ + if (svm_fifo_set_event (server_rx_fifo)) + { + evt.fifo = server_rx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; + if (unix_shared_memory_queue_add + (bpm->server_event_queue[thread_index], (u8 *) & evt, + 0 /* do wait for mutex */ )) + clib_warning ("failed to enqueue server rx evt"); + } + + return 0; +} + +/* *INDENT-OFF* */ +static session_cb_vft_t builtin_clients = { + .session_reset_callback = active_open_reset_callback, + .session_connected_callback = active_open_connected_callback, + .session_accept_callback = active_open_create_callback, + .session_disconnect_callback = active_open_disconnect_callback, + .builtin_server_rx_callback = active_open_rx_callback +}; +/* *INDENT-ON* */ + + +static void +create_api_loopbacks (vlib_main_t * vm) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + api_main_t *am = &api_main; + vl_shmem_hdr_t *shmem_hdr; + + shmem_hdr = am->shmem_hdr; + bpm->vl_input_queue = shmem_hdr->vl_input_queue; + bpm->server_client_index = + vl_api_memclnt_create_internal ("proxy_server", bpm->vl_input_queue); + bpm->active_open_client_index = + vl_api_memclnt_create_internal ("proxy_active_open", bpm->vl_input_queue); +} + +static int +server_attach () +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + u8 segment_name[128]; + u64 options[SESSION_OPTIONS_N_OPTIONS]; + vnet_app_attach_args_t _a, *a = &_a; + + memset (a, 0, sizeof (*a)); + memset (options, 0, sizeof (options)); + + a->api_client_index = bpm->server_client_index; + a->session_cb_vft = &builtin_session_cb_vft; + a->options = options; + a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 512 << 20; + a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size; + a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size; + a->options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count; + a->options[APP_OPTIONS_PRIVATE_SEGMENT_SIZE] = bpm->private_segment_size; + a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = + bpm->prealloc_fifos ? bpm->prealloc_fifos : 1; + + a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP; + + a->segment_name = segment_name; + a->segment_name_length = ARRAY_LEN (segment_name); + + if (vnet_application_attach (a)) + { + clib_warning ("failed to attach server"); + return -1; + } + bpm->server_app_index = a->app_index; + + return 0; +} + +static int +active_open_attach (void) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + vnet_app_attach_args_t _a, *a = &_a; + u8 segment_name[128]; + u32 segment_name_length; + u64 options[16]; + + segment_name_length = ARRAY_LEN (segment_name); + + memset (a, 0, sizeof (*a)); + memset (options, 0, sizeof (options)); + + a->api_client_index = bpm->active_open_client_index; + a->segment_name = segment_name; + a->segment_name_length = segment_name_length; + a->session_cb_vft = &builtin_clients; + + options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678; + options[SESSION_OPTIONS_SEGMENT_SIZE] = 512 << 20; + options[SESSION_OPTIONS_RX_FIFO_SIZE] = bpm->fifo_size; + options[SESSION_OPTIONS_TX_FIFO_SIZE] = bpm->fifo_size; + options[APP_OPTIONS_PRIVATE_SEGMENT_COUNT] = bpm->private_segment_count; + options[APP_OPTIONS_PRIVATE_SEGMENT_SIZE] = bpm->private_segment_size; + options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = + bpm->prealloc_fifos ? bpm->prealloc_fifos : 1; + + options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP + | APP_OPTIONS_FLAGS_IS_PROXY; + + a->options = options; + + if (vnet_application_attach (a)) + return -1; + + bpm->active_open_app_index = a->app_index; + + return 0; +} + +static int +server_listen () +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + vnet_bind_args_t _a, *a = &_a; + memset (a, 0, sizeof (*a)); + a->app_index = bpm->server_app_index; + a->uri = "tcp://0.0.0.0/23"; + return vnet_bind_uri (a); +} + +static int +server_create (vlib_main_t * vm) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + vlib_thread_main_t *vtm = vlib_get_thread_main (); + u32 num_threads; + int i; + + if (bpm->server_client_index == (u32) ~ 0) + create_api_loopbacks (vm); + + num_threads = 1 /* main thread */ + vtm->n_threads; + vec_validate (builtin_proxy_main.server_event_queue, num_threads - 1); + vec_validate (builtin_proxy_main.active_open_event_queue, num_threads - 1); + vec_validate (bpm->rx_buf, num_threads - 1); + + for (i = 0; i < num_threads; i++) + vec_validate (bpm->rx_buf[i], bpm->rcv_buffer_size); + + if (server_attach ()) + { + clib_warning ("failed to attach server app"); + return -1; + } + if (server_listen ()) + { + clib_warning ("failed to start listening"); + return -1; + } + if (active_open_attach ()) + { + clib_warning ("failed to attach active open app"); + return -1; + } + + for (i = 0; i < num_threads; i++) + { + bpm->active_open_event_queue[i] = + session_manager_get_vpp_event_queue (i); + + ASSERT (bpm->active_open_event_queue[i]); + + bpm->server_event_queue[i] = session_manager_get_vpp_event_queue (i); + } + + return 0; +} + +static clib_error_t * +proxy_server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + int rv; + u32 tmp; + + bpm->fifo_size = 64 << 10; + bpm->rcv_buffer_size = 1024; + bpm->prealloc_fifos = 0; + bpm->private_segment_count = 0; + bpm->private_segment_size = 0; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "fifo-size %d", &bpm->fifo_size)) + bpm->fifo_size <<= 10; + else if (unformat (input, "rcv-buf-size %d", &bpm->rcv_buffer_size)) + ; + else if (unformat (input, "prealloc-fifos %d", &bpm->prealloc_fifos)) + ; + else if (unformat (input, "private-segment-count %d", + &bpm->private_segment_count)) + ; + else if (unformat (input, "private-segment-size %dm", &tmp)) + bpm->private_segment_size = tmp << 20; + else if (unformat (input, "private-segment-size %dg", &tmp)) + bpm->private_segment_size = tmp << 30; + else if (unformat (input, "private-segment-size %d", &tmp)) + bpm->private_segment_size = tmp; + else + return clib_error_return (0, "unknown input `%U'", + format_unformat_error, input); + } + + vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ ); + + rv = server_create (vm); + switch (rv) + { + case 0: + break; + default: + return clib_error_return (0, "server_create returned %d", rv); + } + + return 0; +} + +/* *INDENT-OFF* */ +VLIB_CLI_COMMAND (server_create_command, static) = +{ + .path = "test proxy server", + .short_help = "test proxy server", + .function = proxy_server_create_command_fn, +}; +/* *INDENT-ON* */ + +clib_error_t * +builtin_tcp_proxy_main_init (vlib_main_t * vm) +{ + builtin_proxy_main_t *bpm = &builtin_proxy_main; + bpm->server_client_index = ~0; + bpm->active_open_client_index = ~0; + bpm->proxy_session_by_active_open_handle = hash_create (0, sizeof (uword)); + bpm->proxy_session_by_server_handle = hash_create (0, sizeof (uword)); + + return 0; +} + +VLIB_INIT_FUNCTION (builtin_tcp_proxy_main_init); + +/* +* fd.io coding-style-patch-verification: ON +* +* Local Variables: +* eval: (c-set-style "gnu") +* End: +*/ diff --git a/src/vnet/tcp/builtin_proxy.h b/src/vnet/tcp/builtin_proxy.h new file mode 100644 index 00000000000..cf707a150cf --- /dev/null +++ b/src/vnet/tcp/builtin_proxy.h @@ -0,0 +1,100 @@ + +/* + * builtin_proxy.h - skeleton vpp engine plug-in header file + * + * Copyright (c) <current-year> <your-organization> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __included_builtin_proxy_h__ +#define __included_builtin_proxy_h__ + +#include <vnet/vnet.h> +#include <vnet/ip/ip.h> +#include <vnet/ethernet/ethernet.h> + +#include <vppinfra/hash.h> +#include <vppinfra/error.h> +#include <vlibmemory/unix_shared_memory_queue.h> +#include <svm/svm_fifo_segment.h> +#include <vnet/session/session.h> +#include <vnet/session/application_interface.h> + +typedef struct +{ + svm_fifo_t *server_rx_fifo; + svm_fifo_t *server_tx_fifo; + + u64 vpp_server_handle; + u64 vpp_active_open_handle; +} proxy_session_t; + +typedef struct +{ + unix_shared_memory_queue_t *vl_input_queue; /**< vpe input queue */ + /** per-thread vectors */ + unix_shared_memory_queue_t **server_event_queue; + unix_shared_memory_queue_t **active_open_event_queue; + u8 **rx_buf; /**< intermediate rx buffers */ + + u32 cli_node_index; /**< cli process node index */ + u32 server_client_index; /**< server API client handle */ + u32 server_app_index; /**< server app index */ + u32 active_open_client_index; /**< active open API client handle */ + u32 active_open_app_index; /**< active open index after attach */ + + uword *proxy_session_by_server_handle; + uword *proxy_session_by_active_open_handle; + + /* + * Configuration params + */ + u8 *connect_uri; /**< URI for slave's connect */ + u32 configured_segment_size; + u32 fifo_size; + u32 private_segment_count; /**< Number of private fifo segs */ + u32 private_segment_size; /**< size of private fifo segs */ + int rcv_buffer_size; + + /* + * Test state variables + */ + proxy_session_t *sessions; /**< Session pool, shared */ + clib_spinlock_t sessions_lock; + u32 **connection_index_by_thread; + pthread_t client_thread_handle; + + /* + * Flags + */ + u8 is_init; + u8 prealloc_fifos; /**< Request fifo preallocation */ + + /* + * Convenience + */ + vlib_main_t *vlib_main; + vnet_main_t *vnet_main; + ethernet_main_t *ethernet_main; +} builtin_proxy_main_t; + +builtin_proxy_main_t builtin_proxy_main; + +#endif /* __included_builtin_proxy_h__ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/vnet/tcp/builtin_server.c b/src/vnet/tcp/builtin_server.c index 3416678ead6..2454a831ad7 100644 --- a/src/vnet/tcp/builtin_server.c +++ b/src/vnet/tcp/builtin_server.c @@ -18,25 +18,6 @@ #include <vnet/session/application.h> #include <vnet/session/application_interface.h> -/* define message IDs */ -#include <vpp/api/vpe_msg_enum.h> - -/* define message structures */ -#define vl_typedefs -#include <vpp/api/vpe_all_api_h.h> -#undef vl_typedefs - -/* define generated endian-swappers */ -#define vl_endianfun -#include <vpp/api/vpe_all_api_h.h> -#undef vl_endianfun - -/* instantiate all the print functions we know about */ -#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) -#define vl_printfun -#include <vpp/api/vpe_all_api_h.h> -#undef vl_printfun - typedef struct { /* @@ -279,46 +260,13 @@ static int create_api_loopback (vlib_main_t * vm) { builtin_server_main_t *bsm = &builtin_server_main; - vl_api_memclnt_create_t _m, *mp = &_m; - extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *); api_main_t *am = &api_main; vl_shmem_hdr_t *shmem_hdr; - uword *event_data = 0, event_type; - int resolved = 0; - - /* - * Create a "loopback" API client connection - * Don't do things like this unless you know what you're doing... - */ shmem_hdr = am->shmem_hdr; bsm->vl_input_queue = shmem_hdr->vl_input_queue; - memset (mp, 0, sizeof (*mp)); - mp->_vl_msg_id = VL_API_MEMCLNT_CREATE; - mp->context = 0xFEEDFACE; - mp->input_queue = pointer_to_uword (bsm->vl_input_queue); - strncpy ((char *) mp->name, "tcp_test_server", sizeof (mp->name) - 1); - - vl_api_memclnt_create_t_handler (mp); - - /* Wait for reply */ - bsm->node_index = vlib_get_current_process (vm)->node_runtime.node_index; - vlib_process_wait_for_event_or_clock (vm, 2.0); - event_type = vlib_process_get_events (vm, &event_data); - switch (event_type) - { - case 1: - resolved = 1; - break; - case ~0: - /* timed out */ - break; - default: - clib_warning ("unknown event_type %d", event_type); - } - if (!resolved) - return -1; - + bsm->my_client_index = + vl_api_memclnt_create_internal ("tcp_test_server", bsm->vl_input_queue); return 0; } @@ -413,45 +361,6 @@ server_create (vlib_main_t * vm) return 0; } -/* Get our api client index */ -static void -vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp) -{ - vlib_main_t *vm = vlib_get_main (); - builtin_server_main_t *bsm = &builtin_server_main; - bsm->my_client_index = mp->index; - vlib_process_signal_event (vm, bsm->node_index, 1 /* evt */ , - 0 /* data */ ); -} - -#define foreach_tcp_builtin_server_api_msg \ -_(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \ - -static clib_error_t * -tcp_builtin_server_api_hookup (vlib_main_t * vm) -{ - vl_msg_api_msg_config_t _c, *c = &_c; - - /* Hook up client-side static APIs to our handlers */ -#define _(N,n) do { \ - c->id = VL_API_##N; \ - c->name = #n; \ - c->handler = vl_api_##n##_t_handler; \ - c->cleanup = vl_noop_handler; \ - c->endian = vl_api_##n##_t_endian; \ - c->print = vl_api_##n##_t_print; \ - c->size = sizeof(vl_api_##n##_t); \ - c->traced = 1; /* trace, so these msgs print */ \ - c->replay = 0; /* don't replay client create/delete msgs */ \ - c->message_bounce = 0; /* don't bounce this message */ \ - vl_msg_api_config(c);} while (0); - - foreach_tcp_builtin_server_api_msg; -#undef _ - - return 0; -} - static clib_error_t * server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) @@ -491,7 +400,6 @@ server_create_command_fn (vlib_main_t * vm, unformat_input_t * input, format_unformat_error, input); } - tcp_builtin_server_api_hookup (vm); vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ ); rv = server_create (vm); |