diff options
-rw-r--r-- | src/vnet/session/session.c | 7 | ||||
-rw-r--r-- | src/vnet/session/session.h | 46 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 124 | ||||
-rw-r--r-- | src/vppinfra/llist.h | 38 |
4 files changed, 126 insertions, 89 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 1c8b7fb4be4..318e01dbf54 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -155,11 +155,10 @@ session_program_transport_close (session_t * s) if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { wrk = session_main_get_worker (s->thread_index); - elt = session_evt_elt_alloc (wrk); + elt = session_evt_alloc_ctrl (wrk); clib_memset (&elt->evt, 0, sizeof (session_event_t)); elt->evt.session_handle = session_handle (s); elt->evt.event_type = SESSION_CTRL_EVT_CLOSE; - session_evt_add_pending_disconnects (wrk, elt); } else session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE); @@ -1404,11 +1403,9 @@ session_manager_main_enable (vlib_main_t * vm) for (i = 0; i < num_threads; i++) { wrk = &smm->wrk[i]; + wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); - wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list); - wrk->disconnects_head = clib_llist_make_head (wrk->event_elts, - evt_list); wrk->vm = vlib_mains[i]; wrk->last_vlib_time = vlib_time_now (vlib_mains[i]); diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 8f7bd6c999c..5af824ade60 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -96,18 +96,15 @@ typedef struct session_worker_ /** Pool of session event list elements */ session_evt_elt_t *event_elts; + /** Head of control events list */ + clib_llist_index_t ctrl_head; + /** Head of list of elements */ clib_llist_index_t new_head; /** Head of list of pending events */ clib_llist_index_t old_head; - /** Head of list of postponed events */ - clib_llist_index_t postponed_head; - - /** Head of list of disconnect events */ - clib_llist_index_t disconnects_head; - /** Peekers rw lock */ clib_rwlock_t peekers_rw_locks; @@ -203,44 +200,21 @@ session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt) pool_put (wrk->event_elts, elt); } -static inline session_evt_elt_t * -session_evt_old_head (session_worker_t * wrk) -{ - return pool_elt_at_index (wrk->event_elts, wrk->old_head); -} - -static inline session_evt_elt_t * -session_evt_postponed_head (session_worker_t * wrk) -{ - return pool_elt_at_index (wrk->event_elts, wrk->postponed_head); -} - -static inline session_evt_elt_t * -session_evt_pending_disconnects_head (session_worker_t * wrk) -{ - return pool_elt_at_index (wrk->event_elts, wrk->disconnects_head); -} - static inline void session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt) { clib_llist_add_tail (wrk->event_elts, evt_list, elt, - session_evt_old_head (wrk)); -} - -static inline void -session_evt_add_postponed (session_worker_t * wrk, session_evt_elt_t * elt) -{ - clib_llist_add_tail (wrk->event_elts, evt_list, elt, - session_evt_postponed_head (wrk)); + pool_elt_at_index (wrk->event_elts, wrk->old_head)); } -static inline void -session_evt_add_pending_disconnects (session_worker_t * wrk, - session_evt_elt_t * elt) +static inline session_evt_elt_t * +session_evt_alloc_ctrl (session_worker_t * wrk) { + session_evt_elt_t *elt; + elt = session_evt_elt_alloc (wrk); clib_llist_add_tail (wrk->event_elts, evt_list, elt, - session_evt_pending_disconnects_head (wrk)); + pool_elt_at_index (wrk->event_elts, wrk->ctrl_head)); + return elt; } static inline session_evt_elt_t * diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 1bb5cb2717f..35704b75a0a 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -665,7 +665,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, } ctx->snd_space = transport_connection_snd_space (ctx->tc, - wrk->vm->clib_time. + vm->clib_time. last_cpu_time, ctx->snd_mss); @@ -693,6 +693,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, if (n_bufs) vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); session_evt_add_old (wrk, elt); + vlib_node_increment_counter (wrk->vm, node->node_index, + SESSION_QUEUE_ERROR_NO_BUFFER, 1); return SESSION_TX_NO_BUFFERS; } @@ -854,41 +856,29 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node, { session_main_t *smm = &session_main; app_worker_t *app_wrk; + clib_llist_index_t ei; void (*fp) (void *); session_event_t *e; session_t *s; - int rv; + ei = clib_llist_entry_index (wrk->event_elts, elt); e = &elt->evt; + switch (e->event_type) { case SESSION_IO_EVT_TX_FLUSH: case SESSION_IO_EVT_TX: - /* Don't try to send more that one frame per dispatch cycle */ - if (*n_tx_packets == VLIB_FRAME_SIZE) - { - session_evt_add_postponed (wrk, elt); - return; - } - s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s)) { - clib_warning ("session was freed!"); + clib_warning ("session %u was freed!", e->session_index); break; } CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); wrk->ctx.s = s; /* Spray packets in per session type frames, since they go to * different nodes */ - rv = (smm->session_tx_fns[s->session_type]) (wrk, node, elt, - n_tx_packets); - if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) - { - vlib_node_increment_counter (wrk->vm, node->node_index, - SESSION_QUEUE_ERROR_NO_BUFFER, 1); - break; - } + (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets); break; case SESSION_IO_EVT_RX: s = session_event_get_session (e, thread_index); @@ -941,6 +931,11 @@ session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node, default: clib_warning ("unhandled event type %d", e->event_type); } + + /* Regrab elements in case pool moved */ + elt = pool_elt_at_index (wrk->event_elts, ei); + if (!clib_llist_elt_is_linked (elt, evt_list)) + session_evt_elt_free (wrk, elt); } static uword @@ -950,10 +945,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, session_main_t *smm = vnet_get_session_main (); u32 thread_index = vm->thread_index, n_to_dequeue; session_worker_t *wrk = &smm->wrk[thread_index]; - session_evt_elt_t *elt, *new_he, *new_te, *old_he; - session_evt_elt_t *disconnects_he, *postponed_he; + session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he; svm_msg_q_msg_t _msg, *msg = &_msg; + clib_llist_index_t old_ti; int i, n_tx_packets = 0; + session_event_t *evt; svm_msg_q_t *mq; SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk); @@ -965,12 +961,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, */ transport_update_time (wrk->last_vlib_time, thread_index); - /* Make sure postponed events are handled first */ - new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); - new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); - - postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head); - clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he); + /* + * Dequeue and handle new events + */ /* Try to dequeue what is available. Don't wait for lock. * XXX: we may need priorities here */ @@ -980,44 +973,78 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { for (i = 0; i < n_to_dequeue; i++) { - elt = session_evt_elt_alloc (wrk); svm_msg_q_sub_w_lock (mq, msg); + evt = svm_msg_q_msg_data (mq, msg); + if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX) + elt = session_evt_alloc_ctrl (wrk); + else + elt = session_evt_alloc_new (wrk); /* Works because reply messages are smaller than a session evt. * If we ever need to support bigger messages this needs to be * fixed */ - clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg), - sizeof (elt->evt)); + clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt)); svm_msg_q_free_msg (mq, msg); - new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); - clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he); } svm_msg_q_unlock (mq); } - old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); - disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head); + /* + * Handle control events + */ - new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); - clib_llist_splice (wrk->event_elts, evt_list, new_te, old_he); - new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); - clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he); + ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head); - while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he)) + /* *INDENT-OFF* */ + clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({ + clib_llist_remove (wrk->event_elts, evt_list, elt); + session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets); + })); + /* *INDENT-ON* */ + + /* + * Handle the new io events. + */ + + new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); + + /* *INDENT-OFF* */ + clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({ + session_evt_type_t et; + + et = elt->evt.event_type; + clib_llist_remove (wrk->event_elts, evt_list, elt); + + /* Postpone tx events if we can't handle them this dispatch cycle */ + if (n_tx_packets >= VLIB_FRAME_SIZE + && (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH)) + { + clib_llist_add (wrk->event_elts, evt_list, elt, new_he); + continue; + } + + session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets); + })); + /* *INDENT-ON* */ + + /* + * Handle the old io events + */ + + old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); + old_ti = clib_llist_prev_index (old_he, evt_list); + + while (!clib_llist_is_empty (wrk->event_elts, evt_list, old_he)) { clib_llist_index_t ei; - clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he); + clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he); ei = clib_llist_entry_index (wrk->event_elts, elt); - session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets); - /* Regrab elements in case pool moved */ - elt = pool_elt_at_index (wrk->event_elts, ei); - if (!clib_llist_elt_is_linked (elt, evt_list)) - session_evt_elt_free (wrk, elt); - - new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); - } + old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); + if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti) + break; + }; vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); @@ -1166,7 +1193,8 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) /* *INDENT-OFF* */ clib_llist_foreach (wrk->event_elts, evt_list, - session_evt_old_head (wrk), elt, ({ + pool_elt_at_index (wrk->event_elts, wrk->old_head), + elt, ({ found = session_node_cmp_event (&elt->evt, f); if (found) { diff --git a/src/vppinfra/llist.h b/src/vppinfra/llist.h index d521a725fc3..25fb95ff66e 100644 --- a/src/vppinfra/llist.h +++ b/src/vppinfra/llist.h @@ -52,6 +52,22 @@ typedef struct clib_llist_anchor */ #define clib_llist_entry_index(LP,E) ((E) - (LP)) /** + * Get prev list entry index + * + * @param E pool entry + * @name list anchor name + * @return previous index + */ +#define clib_llist_prev_index(E,name) _lprev(E,name) +/** + * Get next list entry index + * + * @param E pool entry + * @name list anchor name + * @return next index + */ +#define clib_llist_next_index(E,name) _lnext(E,name) +/** * Get next pool entry * * @param LP linked list pool @@ -248,6 +264,28 @@ do { \ } \ } while (0) /** + * Walk list starting at head safe + * + * @param LP linked list pool + * @param name list anchor name + * @param HI head index + * @param EI entry index iterator + * @param body code to be executed + */ +#define clib_llist_foreach_safe(LP,name,H,E,body) \ +do { \ + clib_llist_index_t _ll_var (HI) = clib_llist_entry_index (LP, H); \ + clib_llist_index_t _ll_var (EI), _ll_var (NI); \ + _ll_var (EI) = _lnext ((H),name); \ + while (_ll_var (EI) != _ll_var (HI)) \ + { \ + (E) = pool_elt_at_index (LP, _ll_var (EI)); \ + _ll_var (NI) = _lnext ((E),name); \ + do { body; } while (0); \ + _ll_var (EI) = _ll_var (NI); \ + } \ +} while (0) +/** * Walk list starting at head in reverse order * * @param LP linked list pool |