diff options
author | Florin Coras <fcoras@cisco.com> | 2018-07-04 04:15:05 -0700 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2018-07-17 09:02:17 +0000 |
commit | 3c2fed5145d9e40a9ecd178c2866c813eddc6203 (patch) | |
tree | 7ff2408f3b1c4a52fb6d7cd091508de1ce950e5f /src/vnet/session | |
parent | 5da96a77a84ae5414debbc46d390464d51010113 (diff) |
session: use msg queue for events
Change-Id: I3c58367eec2243fe19b75be78a175c5261863e9e
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 141 | ||||
-rw-r--r-- | src/vnet/session/application.h | 7 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 83 | ||||
-rw-r--r-- | src/vnet/session/segment_manager.c | 43 | ||||
-rw-r--r-- | src/vnet/session/segment_manager.h | 9 | ||||
-rw-r--r-- | src/vnet/session/session.c | 192 | ||||
-rw-r--r-- | src/vnet/session/session.h | 47 | ||||
-rwxr-xr-x | src/vnet/session/session_api.c | 8 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 73 |
9 files changed, 424 insertions, 179 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 60019dd7961..6041b49712d 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -807,6 +807,143 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } +static inline int +app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +{ + if (PREDICT_TRUE (!svm_msg_q_is_full (mq))) + { + if (lock) + { + svm_msg_q_add_w_lock (mq, msg); + svm_msg_q_unlock (mq); + } + else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) + { + clib_warning ("msg q add returned"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + } + else + { + clib_warning ("evt q full"); + svm_msg_q_free_msg (mq, msg); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + return 0; +} + +static inline int +app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock) +{ + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; + + if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) + { + /* Session is closed so app will never clean up. Flush rx fifo */ + svm_fifo_dequeue_drop_all (s->server_rx_fifo); + return 0; + } + + /* Built-in app? Hand event to the callback... */ + if (app->cb_fns.builtin_app_rx_callback) + return app->cb_fns.builtin_app_rx_callback (s); + + /* If no need for event, return */ + if (!svm_fifo_set_event (s->server_rx_fifo)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = s->server_rx_fifo; + evt->event_type = FIFO_EVENT_APP_RX; + + return app_enqueue_evt (mq, &msg, lock); +} + +static inline int +app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock) +{ + svm_msg_q_t *mq; + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + + if (application_is_builtin (app)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = FIFO_EVENT_APP_TX; + evt->fifo = s->server_tx_fifo; + + return app_enqueue_evt (mq, &msg, lock); +} + +/* *INDENT-OFF* */ +typedef int (app_send_evt_handler_fn) (application_t *app, + stream_session_t *s, + u8 lock); +static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { + app_send_io_evt_rx, + app_send_io_evt_tx, +}; +/* *INDENT-ON* */ + +/** + * Send event to application + * + * Logic from queue perspective is non-blocking. That is, if there's + * not enough space to enqueue a message, we return. However, if the lock + * flag is set, we do wait for queue mutex. + */ +int +application_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); + return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); +} + +int +application_lock_and_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); +} + local_session_t * application_alloc_local_session (application_t * app) { @@ -949,14 +1086,14 @@ application_local_session_connect (u32 table_index, application_t * client, svm_fifo_segment_private_t *seg; segment_manager_t *sm; local_session_t *ls; - svm_queue_t *sq, *cq; + svm_msg_q_t *sq, *cq; ls = application_alloc_local_session (server); props = application_segment_manager_properties (server); cprops = application_segment_manager_properties (client); evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t); + evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin; has_transport = session_has_transport ((stream_session_t *) ll); diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index aad7089ba6f..f6c81275826 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -72,7 +72,7 @@ typedef struct _application u32 ns_index; /** Application listens for events on this svm queue */ - svm_queue_t *event_queue; + svm_msg_q_t *event_queue; /* * Callbacks: shoulder-taps for the server/client @@ -207,6 +207,11 @@ int application_local_session_disconnect_w_index (u32 app_index, u32 ls_index); void application_local_sessions_del (application_t * app); +int application_send_event (application_t * app, stream_session_t * s, + u8 evt); +int application_lock_and_send_event (application_t * app, + stream_session_t * s, u8 evt_type); + always_inline u32 local_session_id (local_session_t * ll) { diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 2c171487e70..50c043493f2 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -136,7 +136,7 @@ typedef enum _(IS_BUILTIN, "Application is builtin") \ _(IS_PROXY, "Application is proxying") \ _(USE_GLOBAL_SCOPE, "App can use global session scope") \ - _(USE_LOCAL_SCOPE, "App can use local session scope") + _(USE_LOCAL_SCOPE, "App can use local session scope") \ typedef enum _app_options { @@ -187,7 +187,7 @@ typedef struct app_session_transport_ _(volatile u8, session_state) /**< session state */ \ _(u32, session_index) /**< index in owning pool */ \ _(app_session_transport_t, transport) /**< transport info */ \ - _(svm_queue_t, *vpp_evt_q) /**< vpp event queue */ \ + _(svm_msg_q_t, *vpp_evt_q) /**< vpp event queue */ \ _(u8, is_dgram) /**< flag for dgram mode */ \ typedef struct @@ -197,13 +197,73 @@ typedef struct #undef _ } app_session_t; +/** + * Send fifo io event to vpp worker thread + * + * Because there may be multiple writers to one of vpp's queues, this + * protects message allocation and enqueueing. + * + * @param mq vpp message queue + * @param f fifo for which the event is sent + * @param evt_type type of event + * @param noblock flag to indicate is request is blocking or not + * @return 0 if success, negative integer otherwise + */ +static inline int +app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type, + u8 noblock) +{ + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + + if (noblock) + { + if (svm_msg_q_try_lock (mq)) + return -1; + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + svm_msg_q_unlock (mq); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = f; + evt->event_type = evt_type; + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; + } + else + { + svm_msg_q_lock (mq); + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + while (svm_msg_q_msg_is_invalid (&msg)) + { + svm_msg_q_wait (mq); + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = f; + evt->event_type = evt_type; + if (svm_msg_q_is_full (mq)) + svm_msg_q_wait (mq); + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; + } +} + always_inline int app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, - svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) + svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) { u32 max_enqueue, actual_write; session_dgram_hdr_t hdr; - session_fifo_event_t evt; int rv; max_enqueue = svm_fifo_max_enqueue (f); @@ -225,11 +285,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0) { if (svm_fifo_set_event (f)) - { - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); - } + app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock); } ASSERT (rv); return rv; @@ -243,20 +299,15 @@ app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock) } always_inline int -app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data, +app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) { - session_fifo_event_t evt; int rv; if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0) { if (svm_fifo_set_event (f)) - { - evt.fifo = f; - evt.event_type = FIFO_EVENT_APP_TX; - svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); - } + app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock); } return rv; } diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index 31bb0c3aab5..b00bcd5bbc8 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -599,25 +599,52 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo, segment_manager_segment_reader_unlock (sm); } +u32 +segment_manager_evt_q_expected_size (u32 q_len) +{ + u32 fifo_evt_size, notif_q_size, q_hdrs; + u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz; + + fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t)); + notif_q_size = clib_max (16, q_len >> 4); + + msg_q_sz = q_len * sizeof (svm_msg_q_msg_t); + fifo_evt_ring_sz = q_len * fifo_evt_size; + session_ntf_ring_sz = notif_q_size * 256; + q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t); + + return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs); +} + /** * Allocates shm queue in the first segment * * Must be called with lock held */ -svm_queue_t * +svm_msg_q_t * segment_manager_alloc_queue (svm_fifo_segment_private_t * segment, u32 queue_size) { - ssvm_shared_header_t *sh; - svm_queue_t *q; + u32 fifo_evt_size, session_evt_size = 256, notif_q_size; + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + svm_msg_q_t *q; void *oldheap; - sh = segment->ssvm.sh; + fifo_evt_size = sizeof (session_fifo_event_t); + notif_q_size = clib_max (16, queue_size >> 4); + /* *INDENT-OFF* */ + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {queue_size, fifo_evt_size, 0}, + {notif_q_size, session_evt_size, 0} + }; + /* *INDENT-ON* */ + cfg->consumer_pid = 0; + cfg->n_rings = 2; + cfg->q_nitems = queue_size; + cfg->ring_cfgs = rc; - oldheap = ssvm_push_heap (sh); - q = svm_queue_init (queue_size, sizeof (session_fifo_event_t), - 0 /* consumer pid */ , - 0 /* signal when queue non-empty */ ); + oldheap = ssvm_push_heap (segment->ssvm.sh); + q = svm_msg_q_alloc (cfg); ssvm_pop_heap (oldheap); return q; } diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h index 62e5e97e703..73cb4827a8b 100644 --- a/src/vnet/session/segment_manager.h +++ b/src/vnet/session/segment_manager.h @@ -17,7 +17,7 @@ #include <vnet/vnet.h> #include <svm/svm_fifo_segment.h> -#include <svm/queue.h> +#include <svm/message_queue.h> #include <vlibmemory/api.h> #include <vppinfra/lock.h> #include <vppinfra/valloc.h> @@ -61,7 +61,7 @@ typedef struct _segment_manager /** * App event queue allocated in first segment */ - svm_queue_t *event_queue; + svm_msg_q_t *event_queue; } segment_manager_t; #define segment_manager_foreach_segment_w_lock(VAR, SM, BODY) \ @@ -114,7 +114,7 @@ segment_manager_index (segment_manager_t * sm) return sm - segment_manager_main.segment_managers; } -always_inline svm_queue_t * +always_inline svm_msg_q_t * segment_manager_event_queue (segment_manager_t * sm) { return sm->event_queue; @@ -152,7 +152,8 @@ int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs, svm_fifo_t ** tx_fifo); void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo); -svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs, +u32 segment_manager_evt_q_expected_size (u32 q_size); +svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs, u32 queue_size); void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q); void segment_manager_app_detach (segment_manager_t * sm); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 26bc70e6fd7..38a0521af94 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -27,49 +27,90 @@ session_manager_main_t session_manager_main; extern transport_proto_vft_t *tp_vfts; -static void -session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type, - u32 thread_index, void *fp, void *rpc_args) +static inline int +session_send_evt_to_thread (void *data, void *args, u32 thread_index, + session_evt_type_t evt_type) { - session_fifo_event_t evt = { {0}, }; - svm_queue_t *q; + session_fifo_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; u32 tries = 0, max_tries; - evt.event_type = evt_type; - if (evt_type == FIFO_EVENT_RPC) - { - evt.rpc_args.fp = fp; - evt.rpc_args.arg = rpc_args; - } - else - evt.session_handle = session_handle; - - q = session_manager_get_vpp_event_queue (thread_index); - while (svm_queue_add (q, (u8 *) & evt, 1)) + mq = session_manager_get_vpp_event_queue (thread_index); + while (svm_msg_q_try_lock (mq)) { max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3; if (tries++ == max_tries) { SESSION_DBG ("failed to enqueue evt"); - break; + return -1; } } + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + svm_msg_q_unlock (mq); + return -2; + } + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = evt_type; + switch (evt_type) + { + case FIFO_EVENT_RPC: + evt->rpc_args.fp = data; + evt->rpc_args.arg = args; + break; + case FIFO_EVENT_APP_TX: + case FIFO_EVENT_BUILTIN_RX: + evt->fifo = data; + break; + case FIFO_EVENT_DISCONNECT: + evt->session_handle = session_handle ((stream_session_t *) data); + break; + default: + clib_warning ("evt unhandled!"); + svm_msg_q_unlock (mq); + return -1; + } + + svm_msg_q_add_w_lock (mq, &msg); + svm_msg_q_unlock (mq); + return 0; } -void -session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index) +int +session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type) +{ + return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type); +} + +int +session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index, + session_evt_type_t evt_type) { - session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0); + return session_send_evt_to_thread (f, 0, thread_index, evt_type); +} + +int +session_send_ctrl_evt_to_thread (stream_session_t * s, + session_evt_type_t evt_type) +{ + /* only event supported for now is disconnect */ + ASSERT (evt_type == FIFO_EVENT_DISCONNECT); + return session_send_evt_to_thread (s, 0, s->thread_index, + FIFO_EVENT_DISCONNECT); } void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) { if (thread_index != vlib_get_thread_index ()) - session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp, - rpc_args); + session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC); else { void (*fnp) (void *) = fp; @@ -440,24 +481,15 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) /** * Notify session peer that new data has been enqueued. * - * @param s Stream session for which the event is to be generated. - * @param block Flag to indicate if call should block if event queue is full. + * @param s Stream session for which the event is to be generated. + * @param lock Flag to indicate if call should lock message queue. * - * @return 0 on succes or negative number if failed to send notification. + * @return 0 on success or negative number if failed to send notification. */ -static int -session_enqueue_notify (stream_session_t * s, u8 block) +static inline int +session_enqueue_notify (stream_session_t * s, u8 lock) { application_t *app; - session_fifo_event_t evt; - svm_queue_t *q; - - if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) - { - /* Session is closed so app will never clean up. Flush rx fifo */ - svm_fifo_dequeue_drop_all (s->server_rx_fifo); - return 0; - } app = application_get_if_valid (s->app_index); if (PREDICT_FALSE (app == 0)) @@ -466,68 +498,32 @@ session_enqueue_notify (stream_session_t * s, u8 block) return 0; } - /* Built-in app? Hand event to the callback... */ - if (app->cb_fns.builtin_app_rx_callback) - return app->cb_fns.builtin_app_rx_callback (s); - - /* If no event, send one */ - if (svm_fifo_set_event (s->server_rx_fifo)) - { - /* Fabricate event */ - evt.fifo = s->server_rx_fifo; - evt.event_type = FIFO_EVENT_APP_RX; - - /* Add event to server's event queue */ - q = app->event_queue; - - /* Based on request block (or not) for lack of space */ - if (block || PREDICT_TRUE (q->cursize < q->maxsize)) - svm_queue_add (app->event_queue, (u8 *) & evt, - 0 /* do wait for mutex */ ); - else - { - clib_warning ("fifo full"); - return -1; - } - } - /* *INDENT-OFF* */ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({ - ed->data[0] = evt.event_type; + ed->data[0] = FIFO_EVENT_APP_RX; ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo); })); /* *INDENT-ON* */ - return 0; + if (lock) + return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); + + return application_send_event (app, s, FIFO_EVENT_APP_RX); } int session_dequeue_notify (stream_session_t * s) { application_t *app; - svm_queue_t *q; app = application_get_if_valid (s->app_index); if (PREDICT_FALSE (!app)) return -1; - if (application_is_builtin (app)) - return 0; + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) + return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX); - q = app->event_queue; - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - session_fifo_event_t evt = { - .event_type = FIFO_EVENT_APP_TX, - .fifo = s->server_tx_fifo - }; - svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT); - } - else - { - return -1; - } - return 0; + return application_send_event (app, s, FIFO_EVENT_APP_TX); } /** @@ -542,16 +538,24 @@ int session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_main_t *smm = &session_manager_main; - u32 *indices; + transport_service_type_t tp_service; + int i, errors = 0, lock; stream_session_t *s; - int i, errors = 0; + u32 *indices; indices = smm->session_to_enqueue[transport_proto][thread_index]; + tp_service = transport_protocol_service_type (transport_proto); + lock = tp_service == TRANSPORT_SERVICE_CL; for (i = 0; i < vec_len (indices); i++) { s = session_get_if_valid (indices[i], thread_index); - if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ )) + if (PREDICT_FALSE (!s)) + { + errors++; + continue; + } + if (PREDICT_FALSE (session_enqueue_notify (s, lock))) errors++; } @@ -1118,9 +1122,7 @@ stream_session_disconnect (stream_session_t * s) evt->event_type = FIFO_EVENT_DISCONNECT; } else - session_send_session_evt_to_thread (session_handle (s), - FIFO_EVENT_DISCONNECT, - s->thread_index); + session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT); } /** @@ -1231,8 +1233,18 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm) for (i = 0; i < vec_len (smm->vpp_event_queues); i++) { - smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size, - vpp_pid, 0); + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + u32 notif_q_size = clib_max (16, evt_q_length >> 4); + svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = { + {evt_q_length, evt_size, 0} + , + {notif_q_size, 256, 0} + }; + cfg->consumer_pid = 0; + cfg->n_rings = 2; + cfg->q_nitems = evt_q_length; + cfg->ring_cfgs = rc; + smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg); } if (smm->evt_qs_use_memfd_seg) diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index fe3477b63e3..879b3823e5d 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -38,10 +38,10 @@ typedef enum FIFO_EVENT_DISCONNECT, FIFO_EVENT_BUILTIN_RX, FIFO_EVENT_RPC, -} fifo_event_type_t; +} session_evt_type_t; static inline const char * -fifo_event_type_str (fifo_event_type_t et) +fifo_event_type_str (session_evt_type_t et) { switch (et) { @@ -62,6 +62,13 @@ fifo_event_type_str (fifo_event_type_t et) } } +typedef enum +{ + SESSION_MQ_IO_EVT_RING, + SESSION_MQ_CTRL_EVT_RING, + SESSION_MQ_N_RINGS +} session_mq_rings_e; + #define foreach_session_input_error \ _(NO_SESSION, "No session drops") \ _(NO_LISTENER, "No listener for dst port drops") \ @@ -86,23 +93,30 @@ typedef struct { void *fp; void *arg; -} rpc_args_t; +} session_rpc_args_t; typedef u64 session_handle_t; /* *INDENT-OFF* */ -typedef CLIB_PACKED (struct { +typedef struct +{ + u8 event_type; + u8 postponed; union + { + svm_fifo_t *fifo; + session_handle_t session_handle; + session_rpc_args_t rpc_args; + struct { - svm_fifo_t * fifo; - session_handle_t session_handle; - rpc_args_t rpc_args; + u8 data[0]; }; - u8 event_type; - u8 postponed; -}) session_fifo_event_t; + }; +} __clib_packed session_fifo_event_t; /* *INDENT-ON* */ +#define SESSION_MSG_NULL { } + typedef struct session_dgram_pre_hdr_ { u32 data_length; @@ -193,7 +207,7 @@ struct _session_manager_main session_tx_context_t *ctx; /** vpp fifo event queue */ - svm_queue_t **vpp_event_queues; + svm_msg_q_t **vpp_event_queues; /** Event queues memfd segment initialized only if so configured */ ssvm_private_t evt_qs_segment; @@ -533,9 +547,12 @@ int stream_session_stop_listen (stream_session_t * s); void stream_session_disconnect (stream_session_t * s); void stream_session_disconnect_transport (stream_session_t * s); void stream_session_cleanup (stream_session_t * s); -void session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index); +int session_send_io_evt_to_thread (svm_fifo_t * f, + session_evt_type_t evt_type); +int session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index, + session_evt_type_t evt_type); +void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, + void *rpc_args); ssvm_private_t *session_manager_get_evt_q_segment (void); u8 *format_stream_session (u8 * s, va_list * args); @@ -549,7 +566,7 @@ void session_register_transport (transport_proto_t transport_proto, clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en); -always_inline svm_queue_t * +always_inline svm_msg_q_t * session_manager_get_vpp_event_queue (u32 thread_index) { return session_manager_main.vpp_event_queues[thread_index]; diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 1a41dbd9e08..f9fddea6148 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -155,7 +155,7 @@ send_session_accept_callback (stream_session_t * s) vl_api_registration_t *reg; transport_connection_t *tc; stream_session_t *listener; - svm_queue_t *vpp_queue; + svm_msg_q_t *vpp_queue; reg = vl_mem_api_client_index_to_registration (server->api_client_index); if (!reg) @@ -300,7 +300,7 @@ send_session_connected_callback (u32 app_index, u32 api_context, vl_api_connect_session_reply_t *mp; transport_connection_t *tc; vl_api_registration_t *reg; - svm_queue_t *vpp_queue; + svm_msg_q_t *vpp_queue; application_t *app; app = application_get (app_index); @@ -485,7 +485,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp) vl_api_bind_uri_reply_t *rmp; stream_session_t *s; application_t *app = 0; - svm_queue_t *vpp_evt_q; + svm_msg_q_t *vpp_evt_q; int rv; if (session_manager_is_enabled () == 0) @@ -759,7 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp) stream_session_t *s; transport_connection_t *tc = 0; ip46_address_t *ip46; - svm_queue_t *vpp_evt_q; + svm_msg_q_t *vpp_evt_q; if (session_manager_is_enabled () == 0) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 85fd28db7a9..350282bd902 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) { session_manager_main_t *smm = vnet_get_session_manager_main (); + u32 thread_index = vm->thread_index, n_to_dequeue, n_events; session_fifo_event_t *pending_events, *e; session_fifo_event_t *fifo_events; - u32 n_to_dequeue, n_events; - svm_queue_t *q; - application_t *app; - int n_tx_packets = 0; - u32 thread_index = vm->thread_index; - int i, rv; + svm_msg_q_msg_t _msg, *msg = &_msg; f64 now = vlib_time_now (vm); + int n_tx_packets = 0, i, rv; + application_t *app; + svm_msg_q_t *mq; void (*fp) (void *); SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index); @@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, transport_update_time (now, thread_index); /* - * Get vpp queue events + * Get vpp queue events that we can dequeue without blocking */ - q = smm->vpp_event_queues[thread_index]; - if (PREDICT_FALSE (q == 0)) - return 0; - + mq = smm->vpp_event_queues[thread_index]; fifo_events = smm->free_event_vector[thread_index]; - - /* min number of events we can dequeue without blocking */ - n_to_dequeue = q->cursize; + n_to_dequeue = svm_msg_q_size (mq); pending_events = smm->pending_event_vector[thread_index]; if (!n_to_dequeue && !vec_len (pending_events) @@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, } /* See you in the next life, don't be late - * XXX: we may need priorities here - */ - if (pthread_mutex_trylock (&q->mutex)) + * XXX: we may need priorities here */ + if (svm_msg_q_try_lock (mq)) return 0; for (i = 0; i < n_to_dequeue; i++) { vec_add2 (fifo_events, e, 1); - svm_queue_sub_raw (q, (u8 *) e); + svm_msg_q_sub_w_lock (mq, msg); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e)); + svm_msg_q_free_msg (mq, msg); } - /* The other side of the connection is not polling */ - if (q->cursize < (q->maxsize / 8)) - (void) pthread_cond_broadcast (&q->condvar); - pthread_mutex_unlock (&q->mutex); + svm_msg_q_unlock (mq); vec_append (fifo_events, pending_events); vec_append (fifo_events, smm->pending_disconnects[thread_index]); @@ -760,19 +752,20 @@ dump_thread_0_event_queue (void) vlib_main_t *vm = &vlib_global_main; u32 my_thread_index = vm->thread_index; session_fifo_event_t _e, *e = &_e; + svm_msg_q_ring_t *ring; stream_session_t *s0; + svm_msg_q_msg_t *msg; + svm_msg_q_t *mq; int i, index; - i8 *headp; - - svm_queue_t *q; - q = smm->vpp_event_queues[my_thread_index]; - index = q->head; + mq = smm->vpp_event_queues[my_thread_index]; + index = mq->q->head; - for (i = 0; i < q->cursize; i++) + for (i = 0; i < mq->q->cursize; i++) { - headp = (i8 *) (&q->data[0] + q->elsize * index); - clib_memcpy (e, headp, q->elsize); + msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + ring = svm_msg_q_ring (mq, msg->ring_index); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); switch (e->event_type) { @@ -805,7 +798,7 @@ dump_thread_0_event_queue (void) index++; - if (index == q->maxsize) + if (index == mq->q->maxsize) index = 0; } } @@ -844,10 +837,11 @@ u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e) { session_manager_main_t *smm = vnet_get_session_manager_main (); - svm_queue_t *q; + svm_msg_q_t *mq; session_fifo_event_t *pending_event_vector, *evt; int i, index, found = 0; - i8 *headp; + svm_msg_q_msg_t *msg; + svm_msg_q_ring_t *ring; u8 thread_index; ASSERT (e); @@ -855,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e) /* * Search evt queue */ - q = smm->vpp_event_queues[thread_index]; - index = q->head; - for (i = 0; i < q->cursize; i++) + mq = smm->vpp_event_queues[thread_index]; + index = mq->q->head; + for (i = 0; i < mq->q->cursize; i++) { - headp = (i8 *) (&q->data[0] + q->elsize * index); - clib_memcpy (e, headp, q->elsize); + msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index); + ring = svm_msg_q_ring (mq, msg->ring_index); + clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize); found = session_node_cmp_event (e, f); if (found) return 1; - if (++index == q->maxsize) + if (++index == mq->q->maxsize) index = 0; } /* |