diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/session.c | 22 | ||||
-rw-r--r-- | src/vnet/session/session.h | 11 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 108 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 6 | ||||
-rw-r--r-- | src/vnet/session/transport_interface.h | 1 |
5 files changed, 94 insertions, 54 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index a8a9c66ac81..66cad2acff3 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1103,6 +1103,28 @@ stream_session_cleanup (stream_session_t * s) s->thread_index); } +transport_service_type_t +session_transport_service_type (stream_session_t * s) +{ + transport_proto_t tp; + tp = session_get_transport_proto (s); + return transport_protocol_service_type (tp); +} + +transport_tx_fn_type_t +session_transport_tx_fn_type (stream_session_t * s) +{ + transport_proto_t tp; + tp = session_get_transport_proto (s); + return transport_protocol_tx_fn_type (tp); +} + +u8 +session_tx_is_dgram (stream_session_t * s) +{ + return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM); +} + /** * Allocate event queues in the shared-memory segment * diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index c5779b4ffd1..d6cc2cb3327 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -143,6 +143,7 @@ typedef struct session_tx_context_ u16 deq_per_first_buf; u16 deq_per_buf; u16 snd_mss; + u16 n_segs_per_evt; u8 n_bufs_per_seg; CLIB_CACHE_LINE_ALIGN_MARK (cacheline1); session_dgram_hdr_t hdr; @@ -389,13 +390,9 @@ session_has_transport (stream_session_t * s) return (session_get_transport_proto (s) != TRANSPORT_PROTO_NONE); } -always_inline transport_service_type_t -session_transport_service_type (stream_session_t * s) -{ - transport_proto_t tp; - tp = session_get_transport_proto (s); - return transport_protocol_service_type (tp); -} +transport_service_type_t session_transport_service_type (stream_session_t *); +transport_tx_fn_type_t session_transport_tx_fn_type (stream_session_t *); +u8 session_tx_is_dgram (stream_session_t * s); /** * Acquires a lock that blocks a session pool from expanding. diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 269e2fb591e..07cca6d46e7 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -159,23 +159,14 @@ session_output_try_get_buffers (vlib_main_t * vm, session_manager_main_t * smm, u32 thread_index, u16 * n_bufs, u32 wanted) { - u32 bufs_alloc = 0, bufs_now; - vec_validate_aligned (smm->tx_buffers[thread_index], *n_bufs + wanted - 1, + u32 n_alloc; + vec_validate_aligned (smm->tx_buffers[thread_index], wanted - 1, CLIB_CACHE_LINE_BYTES); - do - { - bufs_now = - vlib_buffer_alloc (vm, - &smm->tx_buffers[thread_index][*n_bufs + - bufs_alloc], - wanted - bufs_alloc); - bufs_alloc += bufs_now; - } - while (bufs_now > 0 && ((bufs_alloc + *n_bufs < wanted))); - - *n_bufs += bufs_alloc; + n_alloc = vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][*n_bufs], + wanted - *n_bufs); + *n_bufs += n_alloc; _vec_len (smm->tx_buffers[thread_index]) = *n_bufs; - return bufs_alloc; + return n_alloc; } always_inline void @@ -295,7 +286,7 @@ session_tx_get_transport (session_tx_context_t * ctx, u8 peek_data) always_inline void session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, - u8 peek_data) + u32 max_segs, u8 peek_data) { u32 n_bytes_per_buf, n_bytes_per_seg; ctx->max_dequeue = svm_fifo_max_dequeue (ctx->s->server_tx_fifo); @@ -342,11 +333,19 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, ctx->max_len_to_snd = ctx->snd_space; } + /* Check if we're tx constrained by the node */ + ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss); + if (ctx->n_segs_per_evt > max_segs) + { + ctx->n_segs_per_evt = max_segs; + ctx->max_len_to_snd = max_segs * ctx->snd_mss; + } + n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); ASSERT (n_bytes_per_buf > MAX_HDRS_LEN); n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss; - ctx->n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf); + ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf); ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf); ctx->deq_per_first_buf = clib_min (ctx->snd_mss, n_bytes_per_buf - MAX_HDRS_LEN); @@ -359,8 +358,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, u8 peek_data) { u32 next_index, next0, next1, next2, next3, *to_next, n_left_to_next; - u32 n_trace = vlib_get_trace_count (vm, node), n_packets = 0, pbi; - u32 n_bufs_per_frame, thread_index = s->thread_index; + u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0; + u32 thread_index = s->thread_index, n_left, pbi; session_manager_main_t *smm = &session_manager_main; session_tx_context_t *ctx = &smm->ctx[thread_index]; transport_proto_t tp; @@ -392,22 +391,23 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, svm_fifo_unset_event (s->server_tx_fifo); /* Check how much we can pull. */ - session_tx_set_dequeue_params (vm, ctx, peek_data); + session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets, + peek_data); + if (PREDICT_FALSE (!ctx->max_len_to_snd)) return 0; n_bufs = vec_len (smm->tx_buffers[thread_index]); - ctx->left_to_snd = ctx->max_len_to_snd; + n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg; /* * Make sure we have at least one full frame of buffers ready */ - n_bufs_per_frame = ctx->n_bufs_per_seg * VLIB_FRAME_SIZE; - if (n_bufs < n_bufs_per_frame) + if (n_bufs < n_bufs_needed) { session_output_try_get_buffers (vm, smm, thread_index, &n_bufs, - n_bufs_per_frame); - if (PREDICT_FALSE (n_bufs < n_bufs_per_frame)) + ctx->n_bufs_per_seg * VLIB_FRAME_SIZE); + if (PREDICT_FALSE (n_bufs < n_bufs_needed)) { vec_add1 (smm->pending_event_vector[thread_index], *e); return -1; @@ -418,9 +418,16 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, * Write until we fill up a frame */ vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - while (ctx->left_to_snd && n_left_to_next) + if (PREDICT_FALSE (ctx->n_segs_per_evt > n_left_to_next)) { - while (ctx->left_to_snd > 3 * ctx->snd_mss && n_left_to_next >= 4) + ctx->n_segs_per_evt = n_left_to_next; + ctx->max_len_to_snd = ctx->snd_mss * n_left_to_next; + } + ctx->left_to_snd = ctx->max_len_to_snd; + n_left = ctx->n_segs_per_evt; + while (n_left) + { + while (n_left >= 4) { vlib_buffer_t *b0, *b1; u32 bi0, bi1; @@ -446,7 +453,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, to_next += 2; n_left_to_next -= 2; - n_packets += 2; + n_left -= 2; VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b1); @@ -463,7 +470,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, n_left_to_next, bi0, bi1, next0, next1); } - while (ctx->left_to_snd && n_left_to_next) + while (n_left) { vlib_buffer_t *b0; u32 bi0; @@ -479,7 +486,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, to_next += 1; n_left_to_next -= 1; - n_packets += 1; + n_left -= 1; VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); if (PREDICT_FALSE (n_trace > 0)) @@ -490,10 +497,11 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, } } _vec_len (smm->tx_buffers[thread_index]) = n_bufs; - *n_tx_packets += n_packets; + *n_tx_packets += ctx->n_segs_per_evt; vlib_put_next_frame (vm, node, next_index, n_left_to_next); /* If we couldn't dequeue all bytes mark as partially read */ + ASSERT (ctx->left_to_snd == 0); if (ctx->max_len_to_snd < ctx->max_dequeue) if (svm_fifo_set_event (s->server_tx_fifo)) vec_add1 (smm->pending_event_vector[thread_index], *e); @@ -687,31 +695,31 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, svm_queue_t *q; application_t *app; int n_tx_packets = 0; - u32 my_thread_index = vm->thread_index; + u32 thread_index = vm->thread_index; int i, rv; f64 now = vlib_time_now (vm); void (*fp) (void *); - SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, my_thread_index); + SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index); /* * Update transport time */ - transport_update_time (now, my_thread_index); + transport_update_time (now, thread_index); /* * Get vpp queue events */ - q = smm->vpp_event_queues[my_thread_index]; + q = smm->vpp_event_queues[thread_index]; if (PREDICT_FALSE (q == 0)) return 0; - my_fifo_events = smm->free_event_vector[my_thread_index]; + my_fifo_events = smm->free_event_vector[thread_index]; /* min number of events we can dequeue without blocking */ n_to_dequeue = q->cursize; - my_pending_event_vector = smm->pending_event_vector[my_thread_index]; - pending_disconnects = smm->pending_disconnects[my_thread_index]; + my_pending_event_vector = smm->pending_event_vector[thread_index]; + pending_disconnects = smm->pending_disconnects[thread_index]; if (!n_to_dequeue && !vec_len (my_pending_event_vector) && !vec_len (pending_disconnects)) @@ -746,11 +754,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, pthread_mutex_unlock (&q->mutex); vec_append (my_fifo_events, my_pending_event_vector); - vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]); + vec_append (my_fifo_events, smm->pending_disconnects[thread_index]); _vec_len (my_pending_event_vector) = 0; - smm->pending_event_vector[my_thread_index] = my_pending_event_vector; - _vec_len (smm->pending_disconnects[my_thread_index]) = 0; + smm->pending_event_vector[thread_index] = my_pending_event_vector; + _vec_len (smm->pending_disconnects[thread_index]) = 0; skip_dequeue: n_events = vec_len (my_fifo_events); @@ -760,17 +768,22 @@ skip_dequeue: session_fifo_event_t *e0; e0 = &my_fifo_events[i]; - switch (e0->event_type) { case FIFO_EVENT_APP_TX: - s0 = session_event_get_session (e0, my_thread_index); + if (n_tx_packets == VLIB_FRAME_SIZE) + { + vec_add1 (smm->pending_event_vector[thread_index], *e0); + break; + } + s0 = session_event_get_session (e0, thread_index); if (PREDICT_FALSE (!s0)) { clib_warning ("It's dead, Jim!"); continue; } + /* Spray packets in per session type frames, since they go to * different nodes */ rv = (smm->session_tx_fns[s0->session_type]) (vm, node, e0, s0, @@ -784,18 +797,19 @@ skip_dequeue: } break; case FIFO_EVENT_DISCONNECT: - /* Make sure disconnects run after the pending list is drained */ + /* Make sure stream disconnects run after the pending list is drained */ s0 = session_get_from_handle (e0->session_handle); if (!e0->postponed || svm_fifo_max_dequeue (s0->server_tx_fifo)) { e0->postponed = 1; - vec_add1 (smm->pending_disconnects[my_thread_index], *e0); + vec_add1 (smm->pending_disconnects[thread_index], *e0); continue; } + stream_session_disconnect_transport (s0); break; case FIFO_EVENT_BUILTIN_RX: - s0 = session_event_get_session (e0, my_thread_index); + s0 = session_event_get_session (e0, thread_index); if (PREDICT_FALSE (!s0)) continue; svm_fifo_unset_event (s0->server_rx_fifo); @@ -813,7 +827,7 @@ skip_dequeue: } _vec_len (my_fifo_events) = 0; - smm->free_event_vector[my_thread_index] = my_fifo_events; + smm->free_event_vector[thread_index] = my_fifo_events; vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 20b912929b4..b3d42d0f7da 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -201,6 +201,12 @@ transport_protocol_service_type (transport_proto_t tp) return tp_vfts[tp].service_type; } +transport_tx_fn_type_t +transport_protocol_tx_fn_type (transport_proto_t tp) +{ + return tp_vfts[tp].tx_type; +} + #define PORT_MASK ((1 << 16)- 1) void diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index f21e483c715..745a7db444c 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -98,6 +98,7 @@ void transport_register_protocol (transport_proto_t transport_proto, fib_protocol_t fib_proto, u32 output_node); transport_proto_vft_t *transport_protocol_get_vft (transport_proto_t tp); transport_service_type_t transport_protocol_service_type (transport_proto_t); +transport_tx_fn_type_t transport_protocol_tx_fn_type (transport_proto_t tp); void transport_update_time (f64 time_now, u8 thread_index); void transport_enable_disable (vlib_main_t * vm, u8 is_en); |