summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/session.c25
-rw-r--r--src/vnet/session/session.h82
-rw-r--r--src/vnet/session/session_node.c104
3 files changed, 145 insertions, 66 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index e7542aef901..9769c013e2f 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -122,18 +122,19 @@ static void
session_program_transport_close (session_t * s)
{
u32 thread_index = vlib_get_thread_index ();
+ session_evt_elt_t *elt;
session_worker_t *wrk;
- session_event_t *evt;
/* If we are in the handler thread, or being called with the worker barrier
* held, just append a new event to pending disconnects vector. */
if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
{
wrk = session_main_get_worker (s->thread_index);
- vec_add2 (wrk->pending_disconnects, evt, 1);
- clib_memset (evt, 0, sizeof (*evt));
- evt->session_handle = session_handle (s);
- evt->event_type = SESSION_CTRL_EVT_CLOSE;
+ elt = session_evt_elt_alloc (wrk);
+ clib_memset (&elt->evt, 0, sizeof (session_event_t));
+ elt->evt.session_handle = session_handle (s);
+ elt->evt.event_type = SESSION_CTRL_EVT_CLOSE;
+ session_evt_add_pending_disconnects (wrk, elt);
}
else
session_send_ctrl_evt_to_thread (s, SESSION_CTRL_EVT_CLOSE);
@@ -1380,15 +1381,11 @@ session_manager_main_enable (vlib_main_t * vm)
for (i = 0; i < num_threads; i++)
{
wrk = &smm->wrk[i];
- vec_validate (wrk->free_event_vector, 128);
- _vec_len (wrk->free_event_vector) = 0;
- vec_validate (wrk->pending_event_vector, 128);
- _vec_len (wrk->pending_event_vector) = 0;
- vec_validate (wrk->pending_disconnects, 128);
- _vec_len (wrk->pending_disconnects) = 0;
- vec_validate (wrk->postponed_event_vector, 128);
- _vec_len (wrk->postponed_event_vector) = 0;
-
+ wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
+ wrk->pending_head = clib_llist_make_head (wrk->event_elts, evt_list);
+ wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list);
+ wrk->disconnects_head = clib_llist_make_head (wrk->event_elts,
+ evt_list);
wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
wrk->dispatch_period = 500e-6;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index d1268188962..73c6dc82988 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -15,6 +15,7 @@
#ifndef __included_session_h__
#define __included_session_h__
+#include <vppinfra/llist.h>
#include <vnet/session/session_types.h>
#include <vnet/session/session_lookup.h>
#include <vnet/session/session_debug.h>
@@ -61,6 +62,12 @@ typedef struct session_tx_context_
session_dgram_hdr_t hdr;
} session_tx_context_t;
+typedef struct session_evt_elt
+{
+ clib_llist_anchor_t evt_list;
+ session_event_t evt;
+} session_evt_elt_t;
+
typedef struct session_worker_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -86,17 +93,20 @@ typedef struct session_worker_
/** Vector of tx buffer free lists */
u32 *tx_buffers;
- /** Vector of partially read events */
- session_event_t *free_event_vector;
+ /** Pool of session event list elements */
+ session_evt_elt_t *event_elts;
- /** Vector of active event vectors */
- session_event_t *pending_event_vector;
+ /** Head of list of elements */
+ clib_llist_index_t new_head;
- /** Vector of postponed disconnects */
- session_event_t *pending_disconnects;
+ /** Head of list of pending events */
+ clib_llist_index_t pending_head;
- /** Vector of postponed events */
- session_event_t *postponed_event_vector;
+ /** Head of list of postponed events */
+ clib_llist_index_t postponed_head;
+
+ /** Head of list of disconnect events */
+ clib_llist_index_t disconnects_head;
/** Peekers rw lock */
clib_rwlock_t peekers_rw_locks;
@@ -108,7 +118,7 @@ typedef struct session_worker_
typedef int (session_fifo_rx_fn) (vlib_main_t * vm,
vlib_node_runtime_t * node,
session_worker_t * wrk,
- session_event_t * e, int *n_tx_pkts);
+ session_evt_elt_t * e, int *n_tx_pkts);
extern session_fifo_rx_fn session_tx_fifo_peek_and_snd;
extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd;
@@ -186,6 +196,60 @@ extern vlib_node_registration_t session_queue_pre_input_node;
#define SESSION_Q_PROCESS_FLUSH_FRAMES 1
#define SESSION_Q_PROCESS_STOP 2
+static inline session_evt_elt_t *
+session_evt_elt_alloc (session_worker_t * wrk)
+{
+ session_evt_elt_t *elt;
+ pool_get (wrk->event_elts, elt);
+ return elt;
+}
+
+static inline void
+session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ pool_put (wrk->event_elts, elt);
+}
+
+static inline session_evt_elt_t *
+session_evt_pending_head (session_worker_t * wrk)
+{
+ return pool_elt_at_index (wrk->event_elts, wrk->pending_head);
+}
+
+static inline session_evt_elt_t *
+session_evt_postponed_head (session_worker_t * wrk)
+{
+ return pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+}
+
+static inline session_evt_elt_t *
+session_evt_pending_disconnects_head (session_worker_t * wrk)
+{
+ return pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
+}
+
+static inline void
+session_evt_add_pending (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+ session_evt_pending_head (wrk));
+}
+
+static inline void
+session_evt_add_postponed (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+ session_evt_postponed_head (wrk));
+}
+
+static inline void
+session_evt_add_pending_disconnects (session_worker_t * wrk,
+ session_evt_elt_t * elt)
+{
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+ session_evt_pending_disconnects_head (wrk));
+}
+
always_inline u8
session_is_valid (u32 si, u8 thread_index)
{
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 6c432070a0d..ca6663c0b01 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -624,13 +624,14 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
always_inline int
session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
session_worker_t * wrk,
- session_event_t * e, int *n_tx_packets,
+ session_evt_elt_t * elt, int *n_tx_packets,
u8 peek_data)
{
u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi;
u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0;
session_main_t *smm = &session_main;
session_tx_context_t *ctx = &wrk->ctx;
+ session_event_t *e = &elt->evt;
transport_proto_t tp;
vlib_buffer_t *pb;
u16 n_bufs, rv;
@@ -638,7 +639,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data))))
{
if (rv < 2)
- vec_add1 (wrk->pending_event_vector, *e);
+ session_evt_add_pending (wrk, elt);
return SESSION_TX_NO_DATA;
}
@@ -662,7 +663,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
ctx->snd_mss);
if (ctx->snd_space == 0 || ctx->snd_mss == 0)
{
- vec_add1 (wrk->pending_event_vector, *e);
+ session_evt_add_pending (wrk, elt);
return SESSION_TX_NO_DATA;
}
@@ -684,7 +685,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
{
if (n_bufs)
vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
- vec_add1 (wrk->pending_event_vector, *e);
+ session_evt_add_pending (wrk, elt);
return SESSION_TX_NO_BUFFERS;
}
@@ -780,7 +781,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
ASSERT (ctx->left_to_snd == 0);
if (ctx->max_len_to_snd < ctx->max_dequeue)
if (svm_fifo_set_event (ctx->s->tx_fifo))
- vec_add1 (wrk->pending_event_vector, *e);
+ session_evt_add_pending (wrk, elt);
if (!peek_data
&& ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
@@ -792,7 +793,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
/* More data needs to be read */
else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0)
if (svm_fifo_set_event (ctx->s->tx_fifo))
- vec_add1 (wrk->pending_event_vector, *e);
+ session_evt_add_pending (wrk, elt);
if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
session_dequeue_notify (ctx->s);
@@ -803,7 +804,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
int
session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
session_worker_t * wrk,
- session_event_t * e, int *n_tx_pkts)
+ session_evt_elt_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1);
}
@@ -811,7 +812,7 @@ session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
int
session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
session_worker_t * wrk,
- session_event_t * e, int *n_tx_pkts)
+ session_evt_elt_t * e, int *n_tx_pkts)
{
return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0);
}
@@ -820,7 +821,7 @@ int
session_tx_fifo_dequeue_internal (vlib_main_t * vm,
vlib_node_runtime_t * node,
session_worker_t * wrk,
- session_event_t * e, int *n_tx_pkts)
+ session_evt_elt_t * e, int *n_tx_pkts)
{
session_t *s = wrk->ctx.s;
@@ -853,9 +854,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
session_main_t *smm = vnet_get_session_main ();
- u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+ u32 thread_index = vm->thread_index, n_to_dequeue;
session_worker_t *wrk = &smm->wrk[thread_index];
- session_event_t *e, *fifo_events;
+ session_evt_elt_t *elt, *new_he, *new_te, *pending_he;
+ session_evt_elt_t *disconnects_he, *postponed_he;
svm_msg_q_msg_t _msg, *msg = &_msg;
f64 now = vlib_time_now (vm);
int n_tx_packets = 0, i, rv;
@@ -874,9 +876,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
/* Make sure postponed events are handled first */
- fifo_events = wrk->free_event_vector;
- vec_append (fifo_events, wrk->postponed_event_vector);
- _vec_len (wrk->postponed_event_vector) = 0;
+ new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+ new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+
+ postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+ clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
/* Try to dequeue what is available. Don't wait for lock.
* XXX: we may need priorities here */
@@ -886,33 +890,38 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
{
for (i = 0; i < n_to_dequeue; i++)
{
- vec_add2 (fifo_events, e, 1);
+ elt = session_evt_elt_alloc (wrk);
svm_msg_q_sub_w_lock (mq, msg);
/* Works because reply messages are smaller than a session evt.
* If we ever need to support bigger messages this needs to be
* fixed */
- clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+ clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
+ sizeof (elt->evt));
svm_msg_q_free_msg (mq, msg);
+ new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
}
svm_msg_q_unlock (mq);
}
- vec_append (fifo_events, wrk->pending_event_vector);
- vec_append (fifo_events, wrk->pending_disconnects);
-
- _vec_len (wrk->pending_event_vector) = 0;
- _vec_len (wrk->pending_disconnects) = 0;
+ pending_he = pool_elt_at_index (wrk->event_elts, wrk->pending_head);
+ postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
+ disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
- n_events = vec_len (fifo_events);
- if (PREDICT_FALSE (!n_events))
- return 0;
+ new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+ clib_llist_splice (wrk->event_elts, evt_list, new_te, pending_he);
+ new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
+ clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
- for (i = 0; i < n_events; i++)
+ while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
{
+ clib_llist_index_t ei;
session_event_t *e;
session_t *s;
- e = &fifo_events[i];
+ clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+ ei = clib_llist_entry_index (wrk->event_elts, elt);
+ e = &elt->evt;
switch (e->event_type)
{
case SESSION_IO_EVT_TX_FLUSH:
@@ -920,27 +929,27 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Don't try to send more that one frame per dispatch cycle */
if (n_tx_packets == VLIB_FRAME_SIZE)
{
- vec_add1 (wrk->postponed_event_vector, *e);
- break;
+ session_evt_add_postponed (wrk, elt);
+ continue;
}
s = session_event_get_session (e, thread_index);
if (PREDICT_FALSE (!s))
{
clib_warning ("session was freed!");
- continue;
+ break;
}
CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
wrk->ctx.s = s;
/* Spray packets in per session type frames, since they go to
* different nodes */
- rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, e,
+ rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, elt,
&n_tx_packets);
if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
{
vlib_node_increment_counter (vm, node->node_index,
SESSION_QUEUE_ERROR_NO_BUFFER, 1);
- continue;
+ break;
}
break;
case SESSION_IO_EVT_RX:
@@ -964,7 +973,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
&& svm_fifo_max_dequeue_cons (s->tx_fifo)))
{
e->postponed += 1;
- vec_add1 (wrk->pending_disconnects, *e);
+ session_evt_add_pending (wrk, elt);
continue;
}
@@ -973,7 +982,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
case SESSION_IO_EVT_BUILTIN_RX:
s = session_event_get_session (e, thread_index);
if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
- continue;
+ break;
svm_fifo_unset_event (s->rx_fifo);
app_wrk = app_worker_get (s->app_wrk_index);
app_worker_builtin_rx (app_wrk, s);
@@ -982,7 +991,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
s = session_get_from_handle_if_valid (e->session_handle);
wrk->ctx.s = s;
if (PREDICT_TRUE (s != 0))
- session_tx_fifo_dequeue_internal (vm, node, wrk, e,
+ session_tx_fifo_dequeue_internal (vm, node, wrk, elt,
&n_tx_packets);
break;
case SESSION_CTRL_EVT_RPC:
@@ -1009,10 +1018,15 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
default:
clib_warning ("unhandled event type %d", e->event_type);
}
+
+ /* Regrab elements in case pool moved */
+ elt = pool_elt_at_index (wrk->event_elts, ei);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ session_evt_elt_free (wrk, elt);
+
+ new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
}
- _vec_len (fifo_events) = 0;
- wrk->free_event_vector = fifo_events;
wrk->last_tx_packets = n_tx_packets;
vlib_node_increment_counter (vm, session_queue_node.index,
@@ -1128,7 +1142,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
u8
session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
{
- session_event_t *pending_event_vector, *evt;
+ session_evt_elt_t *elt;
session_worker_t *wrk;
int i, index, found = 0;
svm_msg_q_msg_t *msg;
@@ -1159,16 +1173,20 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
/*
* Search pending events vector
*/
- pending_event_vector = wrk->pending_event_vector;
- vec_foreach (evt, pending_event_vector)
- {
- found = session_node_cmp_event (evt, f);
+
+ /* *INDENT-OFF* */
+ clib_llist_foreach (wrk->event_elts, evt_list,
+ session_evt_pending_head (wrk), elt, ({
+ found = session_node_cmp_event (&elt->evt, f);
if (found)
{
- clib_memcpy_fast (e, evt, sizeof (*evt));
+ clib_memcpy_fast (e, &elt->evt, sizeof (*e));
break;
}
- }
+
+ }));
+ /* *INDENT-ON* */
+
return found;
}