diff options
author | Florin Coras <fcoras@cisco.com> | 2020-04-08 01:55:39 +0000 |
---|---|---|
committer | Florin Coras <fcoras@cisco.com> | 2020-04-08 22:24:41 +0000 |
commit | e759bb543c38a682dc31492a3a0a58669a811538 (patch) | |
tree | b3fa09028b32fb3e50f60d2e6de0ccbaf29b35ad | |
parent | 8324c55f95dd5ddbf1f5f9c47907204a12e152ef (diff) |
udp: cleanup input node
Type: refactor
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ida9daefc20a161b36d6f36c56267123c7f2efc01
-rw-r--r-- | src/plugins/gtpu/test/test_gtpu.py | 4 | ||||
-rw-r--r-- | src/vnet/session/session.c | 30 | ||||
-rw-r--r-- | src/vnet/session/session.h | 2 | ||||
-rw-r--r-- | src/vnet/udp/udp.c | 4 | ||||
-rw-r--r-- | src/vnet/udp/udp.h | 3 | ||||
-rw-r--r-- | src/vnet/udp/udp_error.def | 20 | ||||
-rw-r--r-- | src/vnet/udp/udp_input.c | 359 |
7 files changed, 245 insertions, 177 deletions
diff --git a/src/plugins/gtpu/test/test_gtpu.py b/src/plugins/gtpu/test/test_gtpu.py index 667d61c0143..2daafe26713 100644 --- a/src/plugins/gtpu/test/test_gtpu.py +++ b/src/plugins/gtpu/test/test_gtpu.py @@ -43,7 +43,7 @@ class TestGtpuUDP(VppTestCase): self.pg_start() err = self.statistics.get_counter( - '/err/ip4-udp-lookup/no listener for dst port')[0] + '/err/ip4-udp-lookup/No listener for dst port')[0] if enabled: self.assertEqual(err, self.ip4_err) @@ -62,7 +62,7 @@ class TestGtpuUDP(VppTestCase): self.pg_start() err = self.statistics.get_counter( - '/err/ip6-udp-lookup/no listener for dst port')[0] + '/err/ip6-udp-lookup/No listener for dst port')[0] if enabled: self.assertEqual(err, self.ip6_err) diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 37c5d915008..9d531240f27 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1085,6 +1085,36 @@ session_stream_accept (transport_connection_t * tc, u32 listener_index, } int +session_dgram_accept (transport_connection_t * tc, u32 listener_index, + u32 thread_index) +{ + app_worker_t *app_wrk; + session_t *s; + int rv; + + s = session_alloc_for_connection (tc); + s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index; + + if ((rv = app_worker_init_accepted (s))) + { + session_free (s); + return rv; + } + + app_wrk = app_worker_get (s->app_wrk_index); + if ((rv = app_worker_accept_notify (app_wrk, s))) + { + session_free_w_fifos (s); + return rv; + } + + s->session_state = SESSION_STATE_READY; + session_lookup_add_connection (tc, session_handle (s)); + + return 0; +} + +int session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 5e6e4060e55..956bff068b4 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -466,6 +466,8 @@ void session_transport_closed_notify (transport_connection_t * tc); void session_transport_reset_notify (transport_connection_t * tc); int session_stream_accept (transport_connection_t * tc, u32 listener_index, u32 thread_index, u8 notify); +int session_dgram_accept (transport_connection_t * tc, u32 listener_index, + u32 thread_index); /** * Initialize session layer for given transport proto and ip version * diff --git a/src/vnet/udp/udp.c b/src/vnet/udp/udp.c index 1a5621fb52c..9c427a4a982 100644 --- a/src/vnet/udp/udp.c +++ b/src/vnet/udp/udp.c @@ -240,7 +240,7 @@ udp_push_header (transport_connection_t * tc, vlib_buffer_t * b) udp_connection_t *uc; vlib_main_t *vm = vlib_get_main (); - uc = udp_get_connection_from_transport (tc); + uc = udp_connection_from_transport (tc); vlib_buffer_push_udp (b, uc->c_lcl_port, uc->c_rmt_port, 1); if (tc->is_ip4) @@ -307,7 +307,7 @@ udp_session_send_params (transport_connection_t * tconn, { udp_connection_t *uc; - uc = udp_get_connection_from_transport (tconn); + uc = udp_connection_from_transport (tconn); /* No constraint on TX window */ sp->snd_space = ~0; diff --git a/src/vnet/udp/udp.h b/src/vnet/udp/udp.h index b8ddbc2208d..0cb085b1d72 100644 --- a/src/vnet/udp/udp.h +++ b/src/vnet/udp/udp.h @@ -202,7 +202,7 @@ vnet_get_udp_main () } always_inline udp_connection_t * -udp_get_connection_from_transport (transport_connection_t * tc) +udp_connection_from_transport (transport_connection_t * tc) { return ((udp_connection_t *) tc); } @@ -213,6 +213,7 @@ udp_connection_index (udp_connection_t * uc) return (uc - udp_main.connections[uc->c_thread_index]); } +void udp_connection_free (udp_connection_t * uc); udp_connection_t *udp_connection_alloc (u32 thread_index); /** diff --git a/src/vnet/udp/udp_error.def b/src/vnet/udp/udp_error.def index e9baa334383..776d94a8ec1 100644 --- a/src/vnet/udp/udp_error.def +++ b/src/vnet/udp/udp_error.def @@ -15,13 +15,13 @@ * limitations under the License. */ -udp_error (NONE, "no error") -udp_error (NO_LISTENER, "no listener for dst port") -udp_error (LENGTH_ERROR, "UDP packets with length errors") -udp_error (PUNT, "no listener punt") -udp_error (ENQUEUED, "UDP packets enqueued") -udp_error (FIFO_FULL, "UDP fifo full") -udp_error (NOT_READY, "UDP connection not ready") -udp_error (LISTENER, "UDP connected session") -udp_error (CREATE_SESSION, "Failed to create UDP session") -udp_error (EVENT_FIFO_FULL, "UDP session event fifo full") +udp_error (NONE, "No error") +udp_error (NO_LISTENER, "No listener for dst port") +udp_error (LENGTH_ERROR, "Packets with length errors") +udp_error (PUNT, "No listener punt") +udp_error (ENQUEUED, "Packets enqueued") +udp_error (FIFO_FULL, "Fifo full") +udp_error (NOT_READY, "Connection not ready") +udp_error (ACCEPT, "Accepted session") +udp_error (CREATE_SESSION, "Failed to create session") +udp_error (MQ_FULL, "Application msg queue full") diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c index 1c55ee57aa3..0a4af605c18 100644 --- a/src/vnet/udp/udp_input.c +++ b/src/vnet/udp/udp_input.c @@ -67,236 +67,271 @@ typedef enum always_inline void udp_input_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val) { - if (PREDICT_TRUE (!val)) - return; - if (is_ip4) vlib_node_increment_counter (vm, udp4_input_node.index, evt, val); else vlib_node_increment_counter (vm, udp6_input_node.index, evt, val); } +#define udp_store_err_counters(vm, is_ip4, cnts) \ +{ \ + int i; \ + for (i = 0; i < UDP_N_ERROR; i++) \ + if (cnts[i]) \ + udp_input_inc_counter(vm, is_ip4, i, cnts[i]); \ +} + +#define udp_inc_err_counter(cnts, err, val) \ +{ \ + cnts[err] += val; \ +} + +static void +udp_trace_buffer (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_buffer_t * b, session_t * s, u16 error0) +{ + udp_input_trace_t *t; + + if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_IS_TRACED))) + return; + + t = vlib_add_trace (vm, node, b, sizeof (*t)); + t->connection = s ? s->connection_index : ~0; + t->disposition = error0; + t->thread_index = s->thread_index; +} + +static udp_connection_t * +udp_connection_accept (udp_connection_t * listener, session_dgram_hdr_t * hdr, + u32 thread_index) +{ + udp_connection_t *uc; + + uc = udp_connection_alloc (thread_index); + ip_copy (&uc->c_lcl_ip, &hdr->lcl_ip, hdr->is_ip4); + ip_copy (&uc->c_rmt_ip, &hdr->rmt_ip, hdr->is_ip4); + uc->c_lcl_port = hdr->lcl_port; + uc->c_rmt_port = hdr->rmt_port; + uc->c_is_ip4 = hdr->is_ip4; + uc->c_fib_index = listener->c_fib_index; + uc->mss = listener->mss; + uc->flags |= UDP_CONN_F_CONNECTED; + + if (session_dgram_accept (&uc->connection, listener->c_s_index, + listener->c_thread_index)) + { + udp_connection_free (uc); + return 0; + } + udp_connection_share_port (clib_net_to_host_u16 + (uc->c_lcl_port), uc->c_is_ip4); + return uc; +} + +static void +udp_connection_enqueue (udp_connection_t * uc0, session_t * s0, + session_dgram_hdr_t * hdr0, u32 thread_index, + vlib_buffer_t * b, u8 queue_event, u32 * error0) +{ + int wrote0; + + clib_spinlock_lock (&uc0->rx_lock); + + if (svm_fifo_max_enqueue_prod (s0->rx_fifo) + < hdr0->data_length + sizeof (session_dgram_hdr_t)) + { + *error0 = UDP_ERROR_FIFO_FULL; + goto unlock_rx_lock; + } + + /* If session is owned by another thread and rx event needed, + * enqueue event now while we still have the peeker lock */ + if (s0->thread_index != thread_index) + { + wrote0 = session_enqueue_dgram_connection (s0, hdr0, b, + TRANSPORT_PROTO_UDP, + /* queue event */ 0); + if (queue_event && !svm_fifo_has_event (s0->rx_fifo)) + session_enqueue_notify (s0); + } + else + { + wrote0 = session_enqueue_dgram_connection (s0, hdr0, b, + TRANSPORT_PROTO_UDP, + queue_event); + } + ASSERT (wrote0 > 0); + +unlock_rx_lock: + + clib_spinlock_unlock (&uc0->rx_lock); +} + +always_inline session_t * +udp_parse_and_lookup_buffer (vlib_buffer_t * b, session_dgram_hdr_t * hdr, + u8 is_ip4) +{ + udp_header_t *udp; + u32 fib_index; + session_t *s; + + /* udp_local hands us a pointer to the udp data */ + udp = (udp_header_t *) (vlib_buffer_get_current (b) - sizeof (*udp)); + fib_index = vnet_buffer (b)->ip.fib_index; + + hdr->data_offset = 0; + hdr->lcl_port = udp->dst_port; + hdr->rmt_port = udp->src_port; + hdr->is_ip4 = is_ip4; + + if (is_ip4) + { + ip4_header_t *ip4; + + /* TODO: must fix once udp_local does ip options correctly */ + ip4 = (ip4_header_t *) (((u8 *) udp) - sizeof (*ip4)); + ip_set (&hdr->lcl_ip, &ip4->dst_address, 1); + ip_set (&hdr->rmt_ip, &ip4->src_address, 1); + hdr->data_length = clib_net_to_host_u16 (ip4->length); + hdr->data_length -= sizeof (ip4_header_t) + sizeof (udp_header_t); + s = session_lookup_safe4 (fib_index, &ip4->dst_address, + &ip4->src_address, udp->dst_port, + udp->src_port, TRANSPORT_PROTO_UDP); + } + else + { + ip6_header_t *ip60; + + ip60 = (ip6_header_t *) (((u8 *) udp) - sizeof (*ip60)); + ip_set (&hdr->lcl_ip, &ip60->dst_address, 0); + ip_set (&hdr->rmt_ip, &ip60->src_address, 0); + hdr->data_length = clib_net_to_host_u16 (ip60->payload_length); + hdr->data_length -= sizeof (udp_header_t); + s = session_lookup_safe6 (fib_index, &ip60->dst_address, + &ip60->src_address, udp->dst_port, + udp->src_port, TRANSPORT_PROTO_UDP); + } + + if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT))) + b->current_length = hdr->data_length; + else + b->total_length_not_including_first_buffer = hdr->data_length + - b->current_length; + + return s; +} + always_inline uword udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame, u8 is_ip4) { - u32 n_left_from, *from; - u32 errors, *first_buffer; - u32 my_thread_index = vm->thread_index; + u32 n_left_from, *from, errors, *first_buffer; + vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b; + u16 err_counters[UDP_N_ERROR] = { 0 }; + u32 thread_index = vm->thread_index; from = first_buffer = vlib_frame_vector_args (frame); n_left_from = frame->n_vectors; + vlib_get_buffers (vm, from, bufs, n_left_from); + + b = bufs; while (n_left_from > 0) { - u32 bi0, fib_index0, data_len; - vlib_buffer_t *b0; u32 error0 = UDP_ERROR_ENQUEUED; - udp_header_t *udp0; - ip4_header_t *ip40; - ip6_header_t *ip60; - u8 *data0; - session_t *s0; - udp_connection_t *uc0, *child0, *new_uc0; - transport_connection_t *tc0; - int wrote0; - void *rmt_addr, *lcl_addr; session_dgram_hdr_t hdr0; - u8 queue_event = 1; - - /* speculatively enqueue b0 to the current next frame */ - bi0 = from[0]; - from += 1; - n_left_from -= 1; - - b0 = vlib_get_buffer (vm, bi0); - - /* udp_local hands us a pointer to the udp data */ - data0 = vlib_buffer_get_current (b0); - udp0 = (udp_header_t *) (data0 - sizeof (*udp0)); - fib_index0 = vnet_buffer (b0)->ip.fib_index; - - if (is_ip4) - { - /* TODO: must fix once udp_local does ip options correctly */ - ip40 = (ip4_header_t *) (((u8 *) udp0) - sizeof (*ip40)); - s0 = session_lookup_safe4 (fib_index0, &ip40->dst_address, - &ip40->src_address, udp0->dst_port, - udp0->src_port, TRANSPORT_PROTO_UDP); - lcl_addr = &ip40->dst_address; - rmt_addr = &ip40->src_address; - data_len = clib_net_to_host_u16 (ip40->length); - data_len -= sizeof (ip4_header_t) + sizeof (udp_header_t); - } - else - { - ip60 = (ip6_header_t *) (((u8 *) udp0) - sizeof (*ip60)); - s0 = session_lookup_safe6 (fib_index0, &ip60->dst_address, - &ip60->src_address, udp0->dst_port, - udp0->src_port, TRANSPORT_PROTO_UDP); - lcl_addr = &ip60->dst_address; - rmt_addr = &ip60->src_address; - data_len = clib_net_to_host_u16 (ip60->payload_length); - data_len -= sizeof (udp_header_t); - } + udp_connection_t *uc0; + session_t *s0; + s0 = udp_parse_and_lookup_buffer (b[0], &hdr0, is_ip4); if (PREDICT_FALSE (!s0)) { error0 = UDP_ERROR_NO_LISTENER; - goto trace0; + goto done; } + /* + * If session exists pool peeker lock is taken at this point unless + * the session is already on the right thread or is a listener + */ + if (s0->session_state == SESSION_STATE_OPENED) { - /* TODO optimization: move cl session to right thread - * However, since such a move would affect the session handle, - * which we pass 'raw' to the app, we'd also have notify the - * app of the change or change the way we pass handles to apps. - */ - tc0 = session_get_transport (s0); - uc0 = udp_get_connection_from_transport (tc0); + u8 queue_event = 1; + uc0 = udp_connection_from_transport (session_get_transport (s0)); if (uc0->flags & UDP_CONN_F_CONNECTED) { - if (s0->thread_index != vlib_get_thread_index ()) + if (s0->thread_index != thread_index) { /* * Clone the transport. It will be cleaned up with the * session once we notify the session layer. */ - new_uc0 = - udp_connection_clone_safe (s0->connection_index, - s0->thread_index); - ASSERT (s0->session_index == new_uc0->c_s_index); + uc0 = udp_connection_clone_safe (s0->connection_index, + s0->thread_index); + ASSERT (s0->session_index == uc0->c_s_index); /* - * Drop the 'lock' on pool resize + * Drop the peeker lock on pool resize and ask session + * layer for a new session. */ session_pool_remove_peeker (s0->thread_index); - session_dgram_connect_notify (&new_uc0->connection, + session_dgram_connect_notify (&uc0->connection, s0->thread_index, &s0); - tc0 = &new_uc0->connection; - uc0 = new_uc0; queue_event = 0; } else s0->session_state = SESSION_STATE_READY; } + udp_connection_enqueue (uc0, s0, &hdr0, thread_index, b[0], + queue_event, &error0); + session_pool_remove_peeker (s0->thread_index); } else if (s0->session_state == SESSION_STATE_READY) { - tc0 = session_get_transport (s0); - uc0 = udp_get_connection_from_transport (tc0); + uc0 = udp_connection_from_transport (session_get_transport (s0)); + udp_connection_enqueue (uc0, s0, &hdr0, thread_index, b[0], 1, + &error0); } else if (s0->session_state == SESSION_STATE_LISTENING) { - tc0 = listen_session_get_transport (s0); - uc0 = udp_get_connection_from_transport (tc0); + uc0 = udp_connection_from_transport (session_get_transport (s0)); if (uc0->flags & UDP_CONN_F_CONNECTED) { - child0 = udp_connection_alloc (my_thread_index); - if (is_ip4) - { - ip_set (&child0->c_lcl_ip, &ip40->dst_address, 1); - ip_set (&child0->c_rmt_ip, &ip40->src_address, 1); - } - else - { - ip_set (&child0->c_lcl_ip, &ip60->dst_address, 0); - ip_set (&child0->c_rmt_ip, &ip60->src_address, 0); - } - child0->c_lcl_port = udp0->dst_port; - child0->c_rmt_port = udp0->src_port; - child0->c_is_ip4 = is_ip4; - child0->c_fib_index = tc0->fib_index; - child0->mss = uc0->mss; - child0->flags |= UDP_CONN_F_CONNECTED; - - if (session_stream_accept (&child0->connection, - tc0->s_index, tc0->thread_index, 1)) + uc0 = udp_connection_accept (uc0, &hdr0, thread_index); + if (!uc0) { error0 = UDP_ERROR_CREATE_SESSION; - goto trace0; + goto done; } - s0 = session_get (child0->c_s_index, child0->c_thread_index); - s0->session_state = SESSION_STATE_READY; - tc0 = &child0->connection; - uc0 = udp_get_connection_from_transport (tc0); - udp_connection_share_port (clib_net_to_host_u16 - (uc0->c_lcl_port), uc0->c_is_ip4); - error0 = UDP_ERROR_LISTENER; + s0 = session_get (uc0->c_s_index, uc0->c_thread_index); + error0 = UDP_ERROR_ACCEPT; } + udp_connection_enqueue (uc0, s0, &hdr0, thread_index, b[0], 1, + &error0); } else { error0 = UDP_ERROR_NOT_READY; - goto trace0; + session_pool_remove_peeker (s0->thread_index); } + done: - if (svm_fifo_max_enqueue_prod (s0->rx_fifo) - < data_len + sizeof (session_dgram_hdr_t)) - { - error0 = UDP_ERROR_FIFO_FULL; - goto trace0; - } - - hdr0.data_length = data_len; - if (PREDICT_TRUE (!(b0->flags & VLIB_BUFFER_NEXT_PRESENT))) - b0->current_length = data_len; - else - b0->total_length_not_including_first_buffer = data_len - - b0->current_length; - - hdr0.data_offset = 0; - ip_set (&hdr0.lcl_ip, lcl_addr, is_ip4); - ip_set (&hdr0.rmt_ip, rmt_addr, is_ip4); - hdr0.lcl_port = udp0->dst_port; - hdr0.rmt_port = udp0->src_port; - hdr0.is_ip4 = is_ip4; - - clib_spinlock_lock (&uc0->rx_lock); - /* If session is owned by another thread and rx event needed, - * enqueue event now while we still have the peeker lock */ - if (s0->thread_index != my_thread_index) - { - wrote0 = session_enqueue_dgram_connection (s0, &hdr0, b0, - TRANSPORT_PROTO_UDP, - /* queue event */ 0); - if (queue_event && !svm_fifo_has_event (s0->rx_fifo)) - session_enqueue_notify (s0); - } - else - { - wrote0 = session_enqueue_dgram_connection (s0, &hdr0, b0, - TRANSPORT_PROTO_UDP, - queue_event); - } - clib_spinlock_unlock (&uc0->rx_lock); - ASSERT (wrote0 > 0); - - if (s0->session_state != SESSION_STATE_LISTENING) - session_pool_remove_peeker (s0->thread_index); - - trace0: + b += 1; + n_left_from -= 1; - b0->error = node->errors[error0]; + udp_inc_err_counter (err_counters, error0, 1); - if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) - && (b0->flags & VLIB_BUFFER_IS_TRACED))) - { - udp_input_trace_t *t = vlib_add_trace (vm, node, b0, - sizeof (*t)); - - t->connection = s0 ? s0->connection_index : ~0; - t->disposition = error0; - t->thread_index = my_thread_index; - } + if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE)) + udp_trace_buffer (vm, node, b[0], s0, error0); } vlib_buffer_free (vm, first_buffer, frame->n_vectors); errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, - my_thread_index); - udp_input_inc_counter (vm, is_ip4, UDP_ERROR_EVENT_FIFO_FULL, errors); + thread_index); + err_counters[UDP_ERROR_MQ_FULL] = errors; + udp_store_err_counters (vm, is_ip4, err_counters); return frame->n_vectors; } |