diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application_local.c | 2 | ||||
-rw-r--r-- | src/vnet/session/session.c | 25 | ||||
-rw-r--r-- | src/vnet/session/session.h | 22 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 23 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 1 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 6 |
6 files changed, 71 insertions, 8 deletions
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 4a268c7d30b..7d8fb468f5a 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -467,7 +467,7 @@ format_ct_connection_id (u8 * s, va_list * args) } static int -ct_custom_tx (void *session) +ct_custom_tx (void *session, u32 max_burst_size) { session_t *s = (session_t *) session; if (session_has_transport (s)) diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 477215e3d34..1c8b7fb4be4 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -118,6 +118,31 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) } } +void +session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) +{ + session_t *s; + + s = session_get (tc->s_index, tc->thread_index); + ASSERT (s->thread_index == vlib_get_thread_index ()); + if (!(s->flags & SESSION_F_CUSTOM_TX)) + { + s->flags |= SESSION_F_CUSTOM_TX; + if (svm_fifo_set_event (s->tx_fifo)) + { + session_worker_t *wrk; + session_evt_elt_t *elt; + wrk = session_main_get_worker (tc->thread_index); + if (has_prio) + elt = session_evt_alloc_new (wrk); + else + elt = session_evt_alloc_old (wrk); + elt->evt.session_index = tc->s_index; + elt->evt.event_type = SESSION_IO_EVT_TX; + } + } +} + static void session_program_transport_close (session_t * s) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index a3f2a01929b..8f7bd6c999c 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -243,6 +243,26 @@ session_evt_add_pending_disconnects (session_worker_t * wrk, session_evt_pending_disconnects_head (wrk)); } +static inline session_evt_elt_t * +session_evt_alloc_new (session_worker_t * wrk) +{ + session_evt_elt_t *elt; + elt = session_evt_elt_alloc (wrk); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + pool_elt_at_index (wrk->event_elts, wrk->new_head)); + return elt; +} + +static inline session_evt_elt_t * +session_evt_alloc_old (session_worker_t * wrk) +{ + session_evt_elt_t *elt; + elt = session_evt_elt_alloc (wrk); + clib_llist_add_tail (wrk->event_elts, evt_list, elt, + pool_elt_at_index (wrk->event_elts, wrk->old_head)); + return elt; +} + always_inline u8 session_is_valid (u32 si, u8 thread_index) { @@ -394,6 +414,8 @@ void session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args); void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp, void *rpc_args); +void session_add_self_custom_tx_evt (transport_connection_t * tc, + u8 has_prio); transport_connection_t *session_get_transport (session_t * s); void session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 999776f02bd..1bb5cb2717f 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -618,7 +618,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, session_evt_elt_t * elt, int *n_tx_packets, u8 peek_data) { - u32 next_index, next0, next1, *to_next, n_left_to_next; + u32 next_index, next0, next1, *to_next, n_left_to_next, max_burst; u32 n_trace, n_bufs_needed = 0, n_left, pbi; session_tx_context_t *ctx = &wrk->ctx; session_main_t *smm = &session_main; @@ -637,6 +637,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, next_index = smm->session_type_to_next[ctx->s->session_type]; next0 = next1 = next_index; + max_burst = VLIB_FRAME_SIZE - *n_tx_packets; tp = session_get_transport_proto (ctx->s); ctx->transport_vft = transport_protocol_get_vft (tp); @@ -649,6 +650,20 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, ctx->transport_vft->flush_data (ctx->tc); } + if (ctx->s->flags & SESSION_F_CUSTOM_TX) + { + u32 n_custom_tx; + ctx->s->flags &= ~SESSION_F_CUSTOM_TX; + n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, max_burst); + *n_tx_packets += n_custom_tx; + max_burst -= n_custom_tx; + if (!max_burst) + { + session_evt_add_old (wrk, elt); + return SESSION_TX_OK; + } + } + ctx->snd_space = transport_connection_snd_space (ctx->tc, wrk->vm->clib_time. last_cpu_time, @@ -664,8 +679,7 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk, svm_fifo_unset_event (ctx->s->tx_fifo); /* Check how much we can pull. */ - session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets, - peek_data); + session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data); if (PREDICT_FALSE (!ctx->max_len_to_snd)) return SESSION_TX_NO_DATA; @@ -823,7 +837,8 @@ session_tx_fifo_dequeue_internal (session_worker_t * wrk, if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)) return 0; svm_fifo_unset_event (s->tx_fifo); - return transport_custom_tx (session_get_transport_proto (s), s); + return transport_custom_tx (session_get_transport_proto (s), s, + VLIB_FRAME_SIZE - *n_tx_packets); } always_inline session_t * diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 25b61166b11..9a5bc768808 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -142,6 +142,7 @@ typedef enum session_flags_ { SESSION_F_RX_EVT = 1, SESSION_F_PROXY = (1 << 1), + SESSION_F_CUSTOM_TX = (1 << 2), } session_flags_t; typedef struct session_ diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 6e2feb02ba4..058a9aee34c 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -52,7 +52,7 @@ typedef struct _transport_proto_vft u32 (*tx_fifo_offset) (transport_connection_t * tc); void (*update_time) (f64 time_now, u8 thread_index); void (*flush_data) (transport_connection_t *tconn); - int (*custom_tx) (void *session); + int (*custom_tx) (void *session, u32 max_burst_size); int (*app_rx_evt) (transport_connection_t *tconn); /* @@ -127,9 +127,9 @@ transport_get_half_open (transport_proto_t tp, u32 conn_index) } static inline int -transport_custom_tx (transport_proto_t tp, void *s) +transport_custom_tx (transport_proto_t tp, void *s, u32 max_burst_size) { - return tp_vfts[tp].custom_tx (s); + return tp_vfts[tp].custom_tx (s, max_burst_size); } static inline int |