diff options
Diffstat (limited to 'src/vnet')
-rw-r--r-- | src/vnet/session/session.c | 25 | ||||
-rw-r--r-- | src/vnet/session/session.h | 82 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 104 |
3 files changed, 145 insertions, 66 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index e7542aef901..9769c013e2f 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -122,18 +122,19 @@ static void session_program_transport_close (session_t * s) { u32 thread_index = vlib_get_thread_index (); + session_evt_elt_t *elt; session_worker_t *wrk; - session_event_t *evt; /* If we are in the handler thread, or being called with the worker barrier * held, just append a new event to pending disconnects vector. */ if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) { wrk = session_main_get_worker (s->thread_index); - vec_add2 (wrk->pending_disconnects, evt, 1); - clib_memset (evt, 0, sizeof (*evt)); - evt->session_handle = session_handle (s); - evt->event_type = SESSION_CTRL_EVT_CLOSE; + elt = session_evt_elt_alloc (wrk); + clib_memset (&elt->evt, 0, sizeof (session_event_t)); + elt->evt.session_handle = session_handle (s); + elt->evt.event_type = SESSION_CTRL_EVT_CLOSE; + session_evt_add_pending_disconnects (wrk, elt); } else session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE); @@ -1380,15 +1381,11 @@ session_manager_main_enable (vlib_main_t * vm) for (i = 0; i < num_threads; i++) { wrk = &smm->wrk[i]; - vec_validate (wrk->free_event_vector, 128); - _vec_len (wrk->free_event_vector) = 0; - vec_validate (wrk->pending_event_vector, 128); - _vec_len (wrk->pending_event_vector) = 0; - vec_validate (wrk->pending_disconnects, 128); - _vec_len (wrk->pending_disconnects) = 0; - vec_validate (wrk->postponed_event_vector, 128); - _vec_len (wrk->postponed_event_vector) = 0; - + wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->pending_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->disconnects_head = clib_llist_make_head (wrk->event_elts, + evt_list); wrk->last_vlib_time = vlib_time_now (vlib_mains[i]); wrk->dispatch_period = 500e-6; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index d1268188962..73c6dc82988 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -15,6 +15,7 @@ #ifndef __included_session_h__ #define __included_session_h__ +#include <vppinfra/llist.h> #include <vnet/session/session_types.h> #include <vnet/session/session_lookup.h> #include <vnet/session/session_debug.h> @@ -61,6 +62,12 @@ typedef struct session_tx_context_ session_dgram_hdr_t hdr; } session_tx_context_t; +typedef struct session_evt_elt +{ + clib_llist_anchor_t evt_list; + session_event_t evt; +} session_evt_elt_t; + typedef struct session_worker_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); @@ -86,17 +93,20 @@ typedef struct session_worker_ /** Vector of tx buffer free lists */ u32 *tx_buffers; - /** Vector of partially read events */ - session_event_t *free_event_vector; + /** Pool of session event list elements */ + session_evt_elt_t *event_elts; - /** Vector of active event vectors */ - session_event_t *pending_event_vector; + /** Head of list of elements */ + clib_llist_index_t new_head; - /** Vector of postponed disconnects */ - session_event_t *pending_disconnects; + /** Head of list of pending events */ + clib_llist_index_t pending_head; - /** Vector of postponed events */ - session_event_t *postponed_event_vector; + /** Head of list of postponed events */ + clib_llist_index_t postponed_head; + + /** Head of list of disconnect events */ + clib_llist_index_t disconnects_head; /** Peekers rw lock */ clib_rwlock_t peekers_rw_locks; @@ -108,7 +118,7 @@ typedef struct session_worker_ typedef int (session_fifo_rx_fn) (vlib_main_t * vm, vlib_node_runtime_t * node, session_worker_t * wrk, - session_event_t * e, int *n_tx_pkts); + session_evt_elt_t * e, int *n_tx_pkts); extern session_fifo_rx_fn session_tx_fifo_peek_and_snd; extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd; @@ -186,6 +196,60 @@ extern vlib_node_registration_t session_queue_pre_input_node; #define SESSION_Q_PROCESS_FLUSH_FRAMES 1 #define SESSION_Q_PROCESS_STOP 2 +static inline session_evt_elt_t * +session_evt_elt_alloc (session_worker_t * wrk) +{ + session_evt_elt_t *elt; + pool_get (wrk->event_elts, elt); + return elt; +} + +static inline void +session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt) +{ + pool_put (wrk->event_elts, elt); +} + +static inline session_evt_elt_t * +session_evt_pending_head (session_worker_t * wrk) +{ + return pool_elt_at_index (wrk->event_elts, wrk->pending_head); +} + +static inline session_evt_elt_t * +session_evt_postponed_head (session_worker_t * wrk) +{ + return pool_elt_at_index (wrk->event_elts, wrk->postponed_head); +} + +static inline session_evt_elt_t * +session_evt_pending_disconnects_head (session_worker_t * wrk) +{ + return pool_elt_at_index (wrk->event_elts, wrk->disconnects_head); +} + +static inline void +session_evt_add_pending (session_worker_t * wrk, session_evt_elt_t * elt) +{ + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + session_evt_pending_head (wrk)); +} + +static inline void +session_evt_add_postponed (session_worker_t * wrk, session_evt_elt_t * elt) +{ + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + session_evt_postponed_head (wrk)); +} + +static inline void +session_evt_add_pending_disconnects (session_worker_t * wrk, + session_evt_elt_t * elt) +{ + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + session_evt_pending_disconnects_head (wrk)); +} + always_inline u8 session_is_valid (u32 si, u8 thread_index) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 6c432070a0d..ca6663c0b01 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -624,13 +624,14 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, always_inline int session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, session_worker_t * wrk, - session_event_t * e, int *n_tx_packets, + session_evt_elt_t * elt, int *n_tx_packets, u8 peek_data) { u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi; u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0; session_main_t *smm = &session_main; session_tx_context_t *ctx = &wrk->ctx; + session_event_t *e = &elt->evt; transport_proto_t tp; vlib_buffer_t *pb; u16 n_bufs, rv; @@ -638,7 +639,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data)))) { if (rv < 2) - vec_add1 (wrk->pending_event_vector, *e); + session_evt_add_pending (wrk, elt); return SESSION_TX_NO_DATA; } @@ -662,7 +663,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ctx->snd_mss); if (ctx->snd_space == 0 || ctx->snd_mss == 0) { - vec_add1 (wrk->pending_event_vector, *e); + session_evt_add_pending (wrk, elt); return SESSION_TX_NO_DATA; } @@ -684,7 +685,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, { if (n_bufs) vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); - vec_add1 (wrk->pending_event_vector, *e); + session_evt_add_pending (wrk, elt); return SESSION_TX_NO_BUFFERS; } @@ -780,7 +781,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ASSERT (ctx->left_to_snd == 0); if (ctx->max_len_to_snd < ctx->max_dequeue) if (svm_fifo_set_event (ctx->s->tx_fifo)) - vec_add1 (wrk->pending_event_vector, *e); + session_evt_add_pending (wrk, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) @@ -792,7 +793,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, /* More data needs to be read */ else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0) if (svm_fifo_set_event (ctx->s->tx_fifo)) - vec_add1 (wrk->pending_event_vector, *e); + session_evt_add_pending (wrk, elt); if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd)) session_dequeue_notify (ctx->s); @@ -803,7 +804,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, int session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, session_worker_t * wrk, - session_event_t * e, int *n_tx_pkts) + session_evt_elt_t * e, int *n_tx_pkts) { return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1); } @@ -811,7 +812,7 @@ session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, int session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, session_worker_t * wrk, - session_event_t * e, int *n_tx_pkts) + session_evt_elt_t * e, int *n_tx_pkts) { return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0); } @@ -820,7 +821,7 @@ int session_tx_fifo_dequeue_internal (vlib_main_t * vm, vlib_node_runtime_t * node, session_worker_t * wrk, - session_event_t * e, int *n_tx_pkts) + session_evt_elt_t * e, int *n_tx_pkts) { session_t *s = wrk->ctx.s; @@ -853,9 +854,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) { session_main_t *smm = vnet_get_session_main (); - u32 thread_index = vm->thread_index, n_to_dequeue, n_events; + u32 thread_index = vm->thread_index, n_to_dequeue; session_worker_t *wrk = &smm->wrk[thread_index]; - session_event_t *e, *fifo_events; + session_evt_elt_t *elt, *new_he, *new_te, *pending_he; + session_evt_elt_t *disconnects_he, *postponed_he; svm_msg_q_msg_t _msg, *msg = &_msg; f64 now = vlib_time_now (vm); int n_tx_packets = 0, i, rv; @@ -874,9 +876,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0); /* Make sure postponed events are handled first */ - fifo_events = wrk->free_event_vector; - vec_append (fifo_events, wrk->postponed_event_vector); - _vec_len (wrk->postponed_event_vector) = 0; + new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); + new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); + + postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head); + clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he); /* Try to dequeue what is available. Don't wait for lock. * XXX: we may need priorities here */ @@ -886,33 +890,38 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { for (i = 0; i < n_to_dequeue; i++) { - vec_add2 (fifo_events, e, 1); + elt = session_evt_elt_alloc (wrk); svm_msg_q_sub_w_lock (mq, msg); /* Works because reply messages are smaller than a session evt. * If we ever need to support bigger messages this needs to be * fixed */ - clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e)); + clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg), + sizeof (elt->evt)); svm_msg_q_free_msg (mq, msg); + new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he); } svm_msg_q_unlock (mq); } - vec_append (fifo_events, wrk->pending_event_vector); - vec_append (fifo_events, wrk->pending_disconnects); - - _vec_len (wrk->pending_event_vector) = 0; - _vec_len (wrk->pending_disconnects) = 0; + pending_he = pool_elt_at_index (wrk->event_elts, wrk->pending_head); + postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head); + disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head); - n_events = vec_len (fifo_events); - if (PREDICT_FALSE (!n_events)) - return 0; + new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); + clib_llist_splice (wrk->event_elts, evt_list, new_te, pending_he); + new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); + clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he); - for (i = 0; i < n_events; i++) + while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he)) { + clib_llist_index_t ei; session_event_t *e; session_t *s; - e = &fifo_events[i]; + clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he); + ei = clib_llist_entry_index (wrk->event_elts, elt); + e = &elt->evt; switch (e->event_type) { case SESSION_IO_EVT_TX_FLUSH: @@ -920,27 +929,27 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, /* Don't try to send more that one frame per dispatch cycle */ if (n_tx_packets == VLIB_FRAME_SIZE) { - vec_add1 (wrk->postponed_event_vector, *e); - break; + session_evt_add_postponed (wrk, elt); + continue; } s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s)) { clib_warning ("session was freed!"); - continue; + break; } CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); wrk->ctx.s = s; /* Spray packets in per session type frames, since they go to * different nodes */ - rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e, + rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, elt, &n_tx_packets); if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) { vlib_node_increment_counter (vm, node->node_index, SESSION_QUEUE_ERROR_NO_BUFFER, 1); - continue; + break; } break; case SESSION_IO_EVT_RX: @@ -964,7 +973,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, && svm_fifo_max_dequeue_cons (s->tx_fifo))) { e->postponed += 1; - vec_add1 (wrk->pending_disconnects, *e); + session_evt_add_pending (wrk, elt); continue; } @@ -973,7 +982,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, case SESSION_IO_EVT_BUILTIN_RX: s = session_event_get_session (e, thread_index); if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING)) - continue; + break; svm_fifo_unset_event (s->rx_fifo); app_wrk = app_worker_get (s->app_wrk_index); app_worker_builtin_rx (app_wrk, s); @@ -982,7 +991,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, s = session_get_from_handle_if_valid (e->session_handle); wrk->ctx.s = s; if (PREDICT_TRUE (s != 0)) - session_tx_fifo_dequeue_internal (vm, node, wrk, e, + session_tx_fifo_dequeue_internal (vm, node, wrk, elt, &n_tx_packets); break; case SESSION_CTRL_EVT_RPC: @@ -1009,10 +1018,15 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, default: clib_warning ("unhandled event type %d", e->event_type); } + + /* Regrab elements in case pool moved */ + elt = pool_elt_at_index (wrk->event_elts, ei); + if (!clib_llist_elt_is_linked (elt, evt_list)) + session_evt_elt_free (wrk, elt); + + new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); } - _vec_len (fifo_events) = 0; - wrk->free_event_vector = fifo_events; wrk->last_tx_packets = n_tx_packets; vlib_node_increment_counter (vm, session_queue_node.index, @@ -1128,7 +1142,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f) u8 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) { - session_event_t *pending_event_vector, *evt; + session_evt_elt_t *elt; session_worker_t *wrk; int i, index, found = 0; svm_msg_q_msg_t *msg; @@ -1159,16 +1173,20 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) /* * Search pending events vector */ - pending_event_vector = wrk->pending_event_vector; - vec_foreach (evt, pending_event_vector) - { - found = session_node_cmp_event (evt, f); + + /* *INDENT-OFF* */ + clib_llist_foreach (wrk->event_elts, evt_list, + session_evt_pending_head (wrk), elt, ({ + found = session_node_cmp_event (&elt->evt, f); if (found) { - clib_memcpy_fast (e, evt, sizeof (*evt)); + clib_memcpy_fast (e, &elt->evt, sizeof (*e)); break; } - } + + })); + /* *INDENT-ON* */ + return found; } |