diff options
author | Florin Coras <fcoras@cisco.com> | 2018-04-09 09:24:52 -0700 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2018-04-18 07:23:46 +0000 |
commit | 7fb0fe1f6972a7a35146fa9115b866ba29a6fbb7 (patch) | |
tree | 46f1236450ae918383bf56204b98a68199d28501 /src/vnet/session | |
parent | 684d08c7e5378af5310346e9219a79ef1d901084 (diff) |
udp/session: refactor to support dgram mode
- adds session layer support for datagram based protocols
- updates udp to work in pure connectionless and datagram mode. The
existing connected mode is now 'accessible' for apps as a dummy UDPC,
as in, connected udp, protocol.
- updates udp_echo, echo client, echo server code to work in datagram
mode.
Change-Id: I2960c0d2d246cb166005f545794ec31fe0d546dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 13 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 157 | ||||
-rw-r--r-- | src/vnet/session/session.api | 28 | ||||
-rw-r--r-- | src/vnet/session/session.c | 95 | ||||
-rw-r--r-- | src/vnet/session/session.h | 40 | ||||
-rwxr-xr-x | src/vnet/session/session_api.c | 41 | ||||
-rwxr-xr-x | src/vnet/session/session_cli.c | 5 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 134 | ||||
-rw-r--r-- | src/vnet/session/stream_session.h | 27 | ||||
-rw-r--r-- | src/vnet/session/transport.c | 16 | ||||
-rw-r--r-- | src/vnet/session/transport.h | 19 | ||||
-rw-r--r-- | src/vnet/session/transport_interface.h | 6 |
12 files changed, 474 insertions, 107 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 1c46e786e13..68bbd59098e 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -445,13 +445,9 @@ application_start_listen (application_t * srv, session_endpoint_t * sep, s = listen_session_new (0, sst); s->app_index = srv->index; - if (stream_session_listen (s, sep)) - goto err; - /* Allocate segment manager. All sessions derived out of a listen session * have fifos allocated by the same segment manager. */ - sm = application_alloc_segment_manager (srv); - if (sm == 0) + if (!(sm = application_alloc_segment_manager (srv))) goto err; /* Add to app's listener table. Useful to find all child listeners @@ -459,6 +455,13 @@ application_start_listen (application_t * srv, session_endpoint_t * sep, handle = listen_session_get_handle (s); hash_set (srv->listeners_table, handle, segment_manager_index (sm)); + if (stream_session_listen (s, sep)) + { + segment_manager_del (sm); + hash_unset (srv->listeners_table, handle); + goto err; + } + *res = handle; return 0; diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 5dc237f6e7b..5fd218533dc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -173,6 +173,163 @@ extern const u32 test_srv_crt_rsa_len; extern const char test_srv_key_rsa[]; extern const u32 test_srv_key_rsa_len; +typedef struct app_session_transport_ +{ + ip46_address_t rmt_ip; /**< remote ip */ + ip46_address_t lcl_ip; /**< local ip */ + u16 rmt_port; /**< remote port */ + u16 lcl_port; /**< local port */ + u8 is_ip4; /**< set if uses ip4 networking */ +} app_session_transport_t; + +typedef struct app_session_ +{ + svm_fifo_t *rx_fifo; /**< rx fifo */ + svm_fifo_t *tx_fifo; /**< tx fifo */ + session_type_t session_type; /**< session type */ + volatile u8 session_state; /**< session state */ + u32 session_index; /**< index in owning pool */ + app_session_transport_t transport; /**< transport info */ + svm_queue_t *vpp_evt_q; /**< vpp event queue for session */ + u8 is_dgram; /**< set if it works in dgram mode */ +} app_session_t; + +always_inline int +app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, + svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock) +{ + u32 max_enqueue, actual_write; + session_dgram_hdr_t hdr; + session_fifo_event_t evt; + int rv; + + max_enqueue = svm_fifo_max_enqueue (f); + if (svm_fifo_max_enqueue (f) <= sizeof (session_dgram_hdr_t)) + return 0; + + max_enqueue -= sizeof (session_dgram_hdr_t); + actual_write = clib_min (len, max_enqueue); + hdr.data_length = actual_write; + hdr.data_offset = 0; + clib_memcpy (&hdr.rmt_ip, &at->rmt_ip, sizeof (ip46_address_t)); + hdr.is_ip4 = at->is_ip4; + hdr.rmt_port = at->rmt_port; + clib_memcpy (&hdr.lcl_ip, &at->lcl_ip, sizeof (ip46_address_t)); + hdr.lcl_port = at->lcl_port; + rv = svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr); + if (rv <= 0) + return 0; + + ASSERT (rv == sizeof (hdr)); + + if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0) + { + if (svm_fifo_set_event (f)) + { + evt.fifo = f; + evt.event_type = FIFO_EVENT_APP_TX; + svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); + } + } + return rv; +} + +always_inline int +app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock) +{ + return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data, + len, noblock); +} + +always_inline int +app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data, + u32 len, u8 noblock) +{ + session_fifo_event_t evt; + int rv; + + if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0) + { + if (svm_fifo_set_event (f)) + { + evt.fifo = f; + evt.event_type = FIFO_EVENT_APP_TX; + svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock); + } + } + return rv; +} + +always_inline int +app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock) +{ + return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len, noblock); +} + +always_inline int +app_send (app_session_t * s, u8 * data, u32 len, u8 noblock) +{ + if (s->is_dgram) + return app_send_dgram (s, data, len, noblock); + return app_send_stream (s, data, len, noblock); +} + +always_inline int +app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len, + app_session_transport_t * at, u8 clear_evt) +{ + session_dgram_pre_hdr_t ph; + u32 max_deq; + int rv; + + if (clear_evt) + svm_fifo_unset_event (f); + max_deq = svm_fifo_max_dequeue (f); + if (max_deq < sizeof (session_dgram_hdr_t)) + return 0; + + svm_fifo_peek (f, 0, sizeof (ph), (u8 *) & ph); + ASSERT (ph.data_length >= ph.data_offset); + if (!ph.data_offset) + svm_fifo_peek (f, sizeof (ph), sizeof (*at), (u8 *) at); + len = clib_min (len, ph.data_length - ph.data_offset); + rv = svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN, len, buf); + ph.data_offset += rv; + if (ph.data_offset == ph.data_length) + svm_fifo_dequeue_drop (f, ph.data_length + SESSION_CONN_HDR_LEN); + else + svm_fifo_overwrite_head (f, (u8 *) & ph, sizeof (ph)); + return rv; +} + +always_inline int +app_recv_dgram (app_session_t * s, u8 * buf, u32 len) +{ + return app_recv_dgram_raw (s->rx_fifo, buf, len, &s->transport, 1); +} + +always_inline int +app_recv_stream_raw (svm_fifo_t * f, u8 * buf, u32 len, u8 clear_evt) +{ + if (clear_evt) + svm_fifo_unset_event (f); + return svm_fifo_dequeue_nowait (f, len, buf); +} + +always_inline int +app_recv_stream (app_session_t * s, u8 * buf, u32 len) +{ + return app_recv_stream_raw (s->rx_fifo, buf, len, 1); +} + +always_inline int +app_recv (app_session_t * s, u8 * data, u32 len) +{ + if (s->is_dgram) + return app_recv_dgram (s, data, len); + return app_recv_stream (s, data, len); +} + #endif /* __included_uri_h__ */ /* diff --git a/src/vnet/session/session.api b/src/vnet/session/session.api index bf88e82f336..98748d8fbbb 100644 --- a/src/vnet/session/session.api +++ b/src/vnet/session/session.api @@ -13,7 +13,7 @@ * limitations under the License. */ -option version = "1.0.2"; +option version = "1.0.3"; /** \brief client->vpp, attach application to session layer @param client_index - opaque cookie to identify the sender @@ -119,13 +119,25 @@ autoreply define unmap_segment { "tcp://::/0/80" [ipv6] etc. @param options - socket options, fifo sizes, etc. */ -autoreply define bind_uri { +define bind_uri { u32 client_index; u32 context; u32 accept_cookie; u8 uri[128]; }; +define bind_uri_reply { + u32 context; + u64 handle; + i32 retval; + u64 rx_fifo; + u64 tx_fifo; + u8 lcl_is_ip4; + u8 lcl_ip[16]; + u16 lcl_port; + u64 vpp_evt_q; +}; + /** \brief Unbind a given URI @param client_index - opaque cookie to identify the sender @param context - sender context, to match reply w/ request @@ -314,8 +326,12 @@ autoreply define connect_sock { @param context - sender context, to match reply w/ request @param handle - bind handle @param retval - return code for the request - @param event_queue_address - vpp event queue address or 0 if this - connection shouldn't send events + @param lcl_is_ip4 - local ip address type + @param lcl_ip6 - local ip address + @param lcl_port - local port + @param rx_fifo - rx fifo address if allocated (connectionless) + @param tx_fifo - tx fifo address if allocated (connectionless) + @param vpp_evt_q - vpp event queue address (connectionless) @param segment_name_length - length of segment name @param segment_name - name of segment client needs to attach to */ @@ -323,10 +339,12 @@ define bind_sock_reply { u32 context; u64 handle; i32 retval; - u64 server_event_queue_address; u8 lcl_is_ip4; u8 lcl_ip[16]; u16 lcl_port; + u64 rx_fifo; + u64 tx_fifo; + u64 vpp_evt_q; u32 segment_size; u8 segment_name_length; u8 segment_name[128]; diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index d258b82c983..dfc967b12dc 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -355,14 +355,19 @@ session_enqueue_stream_connection (transport_connection_t * tc, return enqueued; } + int -session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, - u8 proto, u8 queue_event) +session_enqueue_dgram_connection (stream_session_t * s, + session_dgram_hdr_t * hdr, + vlib_buffer_t * b, u8 proto, u8 queue_event) { int enqueued = 0, rv, in_order_off; - if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length) - return -1; + ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo) + >= b->current_length + sizeof (*hdr)); + + svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t), + (u8 *) hdr); enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) @@ -530,6 +535,16 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) return errors; } +int +session_manager_flush_all_enqueue_events (u8 transport_proto) +{ + vlib_thread_main_t *vtm = vlib_get_thread_main (); + int i, errors = 0; + for (i = 0; i < 1 + vtm->n_threads; i++) + errors += session_manager_flush_enqueue_events (transport_proto, i); + return errors; +} + /** * Init fifo tail and head pointers * @@ -825,7 +840,7 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) if (session_alloc_and_init (sm, tc, 1, &s)) return -1; s->app_index = app->index; - s->session_state = SESSION_STATE_CONNECTING_READY; + s->session_state = SESSION_STATE_OPENED; /* Tell the app about the new event fifo for this session */ app->cb_fns.session_connected_callback (app->index, opaque, s, 0); @@ -841,10 +856,6 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) u64 handle; int rv; - /* TODO until udp is fixed */ - if (rmt->transport_proto == TRANSPORT_PROTO_UDP) - return session_open_cl (app_index, rmt, opaque); - tep = session_endpoint_to_transport (rmt); rv = tp_vfts[rmt->transport_proto].open (tep); if (rv < 0) @@ -912,14 +923,6 @@ session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) return session_open_srv_fns[tst] (app_index, rmt, opaque); } -/** - * Ask transport to listen on local transport endpoint. - * - * @param s Session for which listen will be called. Note that unlike - * established sessions, listen sessions are not associated to a - * thread. - * @param tep Local endpoint to be listened on. - */ int session_listen_vc (stream_session_t * s, session_endpoint_t * sep) { @@ -948,6 +951,40 @@ session_listen_vc (stream_session_t * s, session_endpoint_t * sep) } int +session_listen_cl (stream_session_t * s, session_endpoint_t * sep) +{ + transport_connection_t *tc; + application_t *server; + segment_manager_t *sm; + u32 tci; + + /* Transport bind/listen */ + tci = tp_vfts[sep->transport_proto].bind (s->session_index, + session_endpoint_to_transport + (sep)); + + if (tci == (u32) ~ 0) + return -1; + + /* Attach transport to session */ + s->connection_index = tci; + tc = tp_vfts[sep->transport_proto].get_listener (tci); + + /* Weird but handle it ... */ + if (tc == 0) + return -1; + + server = application_get (s->app_index); + sm = application_get_listen_segment_manager (server, s); + if (session_alloc_fifos (sm, s)) + return -1; + + /* Add to the main lookup table */ + session_lookup_add_connection (tc, s->session_index); + return 0; +} + +int session_listen_app (stream_session_t * s, session_endpoint_t * sep) { session_endpoint_extended_t esep; @@ -965,11 +1002,19 @@ typedef int (*session_listen_service_fn) (stream_session_t *, static session_listen_service_fn session_listen_srv_fns[TRANSPORT_N_SERVICES] = { session_listen_vc, - session_listen_vc, + session_listen_cl, session_listen_app, }; /* *INDENT-ON* */ +/** + * Ask transport to listen on local transport endpoint. + * + * @param s Session for which listen will be called. Note that unlike + * established sessions, listen sessions are not associated to a + * thread. + * @param tep Local endpoint to be listened on. + */ int stream_session_listen (stream_session_t * s, session_endpoint_t * sep) { @@ -1125,7 +1170,8 @@ session_manager_get_evt_q_segment (void) static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = { session_tx_fifo_peek_and_snd, session_tx_fifo_dequeue_and_snd, - session_tx_fifo_dequeue_internal + session_tx_fifo_dequeue_internal, + session_tx_fifo_dequeue_and_snd }; /* *INDENT-ON* */ @@ -1228,11 +1274,12 @@ session_manager_main_enable (vlib_main_t * vm) vec_validate (smm->peekers_rw_locks, num_threads - 1); for (i = 0; i < TRANSPORT_N_PROTO; i++) - for (j = 0; j < num_threads; j++) - { - vec_validate (smm->session_to_enqueue[i], num_threads - 1); - vec_validate (smm->current_enqueue_epoch[i], num_threads - 1); - } + { + vec_validate (smm->current_enqueue_epoch[i], num_threads - 1); + vec_validate (smm->session_to_enqueue[i], num_threads - 1); + for (j = 0; j < num_threads; j++) + smm->current_enqueue_epoch[i][j] = 1; + } for (i = 0; i < num_threads; i++) { diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 0b53f61a10a..9d534aec19b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -204,6 +204,31 @@ struct _session_manager_main }; +typedef struct session_dgram_pre_hdr_ +{ + u32 data_length; + u32 data_offset; +} session_dgram_pre_hdr_t; + +/* *INDENT-OFF* */ +typedef CLIB_PACKED (struct session_dgram_header_ +{ + u32 data_length; + u32 data_offset; + ip46_address_t rmt_ip; + ip46_address_t lcl_ip; + u16 rmt_port; + u16 lcl_port; + u8 is_ip4; +}) session_dgram_hdr_t; +/* *INDENT-ON* */ + +#define SESSION_CONN_ID_LEN 37 +#define SESSION_CONN_HDR_LEN 45 + +STATIC_ASSERT (sizeof (session_dgram_hdr_t) == (SESSION_CONN_ID_LEN + 8), + "session conn id wrong length"); + extern session_manager_main_t session_manager_main; extern vlib_node_registration_t session_queue_node; @@ -342,6 +367,14 @@ session_has_transport (stream_session_t * s) return (session_get_transport_proto (s) != TRANSPORT_PROTO_NONE); } +always_inline transport_service_type_t +session_transport_service_type (stream_session_t * s) +{ + transport_proto_t tp; + tp = session_get_transport_proto (s); + return transport_protocol_service_type (tp); +} + /** * Acquires a lock that blocks a session pool from expanding. * @@ -442,8 +475,10 @@ int session_enqueue_stream_connection (transport_connection_t * tc, vlib_buffer_t * b, u32 offset, u8 queue_event, u8 is_in_order); -int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, - u8 proto, u8 queue_event); +int session_enqueue_dgram_connection (stream_session_t * s, + session_dgram_hdr_t * hdr, + vlib_buffer_t * b, u8 proto, + u8 queue_event); int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, u32 offset, u32 max_bytes); u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes); @@ -490,6 +525,7 @@ session_manager_get_vpp_event_queue (u32 thread_index) } int session_manager_flush_enqueue_events (u8 proto, u32 thread_index); +int session_manager_flush_all_enqueue_events (u8 transport_proto); always_inline u64 listen_session_get_handle (stream_session_t * s) diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 2a74a196201..67c42faf692 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -480,9 +480,12 @@ done: static void vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp) { - vl_api_bind_uri_reply_t *rmp; + transport_connection_t *tc = 0; vnet_bind_args_t _a, *a = &_a; - application_t *app; + vl_api_bind_uri_reply_t *rmp; + stream_session_t *s; + application_t *app = 0; + svm_queue_t *vpp_evt_q; int rv; if (session_manager_is_enabled () == 0) @@ -505,7 +508,30 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp) } done: - REPLY_MACRO (VL_API_BIND_URI_REPLY); + + /* *INDENT-OFF* */ + REPLY_MACRO2 (VL_API_BIND_URI_REPLY, ({ + if (!rv) + { + rmp->handle = a->handle; + rmp->lcl_is_ip4 = tc->is_ip4; + rmp->lcl_port = tc->lcl_port; + if (app && application_has_global_scope (app)) + { + s = listen_session_get_from_handle (a->handle); + tc = listen_session_get_transport (s); + clib_memcpy (rmp->lcl_ip, &tc->lcl_ip, sizeof(tc->lcl_ip)); + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) + { + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + vpp_evt_q = session_manager_get_vpp_event_queue (0); + rmp->vpp_evt_q = pointer_to_uword (vpp_evt_q); + } + } + } + })); + /* *INDENT-ON* */ } static void @@ -733,6 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp) stream_session_t *s; transport_connection_t *tc = 0; ip46_address_t *ip46; + svm_queue_t *vpp_evt_q; if (session_manager_is_enabled () == 0) { @@ -775,8 +802,14 @@ done: { s = listen_session_get_from_handle (a->handle); tc = listen_session_get_transport (s); - rmp->lcl_is_ip4 = tc->is_ip4; clib_memcpy (rmp->lcl_ip, &tc->lcl_ip, sizeof (tc->lcl_ip)); + if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL) + { + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + vpp_evt_q = session_manager_get_vpp_event_queue (0); + rmp->vpp_evt_q = pointer_to_uword (vpp_evt_q); + } } } })); diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c index 52833554a53..201f6f1d66e 100755 --- a/src/vnet/session/session_cli.c +++ b/src/vnet/session/session_cli.c @@ -23,6 +23,9 @@ format_stream_session_fifos (u8 * s, va_list * args) session_fifo_event_t _e, *e = &_e; u8 found; + if (!ss->server_rx_fifo || !ss->server_tx_fifo) + return s; + s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo, 1); if (verbose > 2 && ss->server_rx_fifo->has_event) { @@ -76,6 +79,8 @@ format_stream_session (u8 * s, va_list * args) { s = format (s, "%-40U%v", tp_vft->format_listener, ss->connection_index, str); + if (verbose > 1) + s = format (s, "\n%U", format_stream_session_fifos, ss, verbose); } else if (ss->session_state == SESSION_STATE_CONNECTING) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index b8f429eb1d7..14716965547 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -70,7 +70,7 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm, vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg, u32 left_from_seg, u32 * left_to_snd0, u16 * n_bufs, u32 * tx_offset, u16 deq_per_buf, - u8 peek_data) + u8 peek_data, transport_tx_fn_type_t tx_type) { vlib_buffer_t *chain_b0, *prev_b0; u32 chain_bi0, to_deq; @@ -102,7 +102,23 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm, } else { - n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0); + if (tx_type == TRANSPORT_TX_DGRAM) + { + session_dgram_hdr_t *hdr; + u16 deq_now; + hdr = (session_dgram_hdr_t *) svm_fifo_head (fifo); + deq_now = clib_min (hdr->data_length - hdr->data_offset, + len_to_deq0); + n_bytes_read = svm_fifo_peek (fifo, hdr->data_offset, deq_now, + data0); + ASSERT (n_bytes_read > 0); + + hdr->data_offset += n_bytes_read; + if (hdr->data_offset == hdr->data_length) + svm_fifo_dequeue_drop (fifo, hdr->data_length); + } + else + n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0); } ASSERT (n_bytes_read == len_to_deq0); chain_b0->current_length = n_bytes_read; @@ -145,12 +161,35 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, int i, n_bytes_read; u32 n_bytes_per_buf, deq_per_buf, deq_per_first_buf; u32 bufs_alloc, bufs_now; + session_dgram_hdr_t hdr; next_index = next0 = smm->session_type_to_next[s0->session_type]; - tp = session_get_transport_proto (s0); transport_vft = transport_protocol_get_vft (tp); - tc0 = transport_vft->get_connection (s0->connection_index, thread_index); + if (peek_data) + { + if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY)) + { + /* Can retransmit for closed sessions but can't send new data if + * session is not ready or closed */ + vec_add1 (smm->pending_event_vector[thread_index], *e0); + return 0; + } + tc0 = + transport_vft->get_connection (s0->connection_index, thread_index); + } + else + { + if (s0->session_state == SESSION_STATE_LISTENING) + tc0 = transport_vft->get_listener (s0->connection_index); + else + { + if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED)) + return 0; + tc0 = transport_vft->get_connection (s0->connection_index, + thread_index); + } + } /* Make sure we have space to send and there's something to dequeue */ snd_mss0 = transport_vft->send_mss (tc0); @@ -168,20 +207,26 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, /* Check how much we can pull. */ max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo); - if (peek_data) { /* Offset in rx fifo from where to peek data */ tx_offset = transport_vft->tx_fifo_offset (tc0); if (PREDICT_FALSE (tx_offset >= max_dequeue0)) - max_dequeue0 = 0; - else - max_dequeue0 -= tx_offset; + return 0; + max_dequeue0 -= tx_offset; } - - /* Nothing to read return */ - if (max_dequeue0 == 0) - return 0; + else + { + if (transport_vft->tx_type == TRANSPORT_TX_DGRAM) + { + if (max_dequeue0 < sizeof (hdr)) + return 0; + svm_fifo_peek (s0->server_tx_fifo, 0, sizeof (hdr), (u8 *) & hdr); + ASSERT (hdr.data_length > hdr.data_offset); + max_dequeue0 = hdr.data_length - hdr.data_offset; + } + } + ASSERT (max_dequeue0 > 0); /* Ensure we're not writing more than transport window allows */ if (max_dequeue0 < snd_space0) @@ -286,14 +331,42 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, } else { - n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo, - len_to_deq0, data0); - if (n_bytes_read <= 0) - goto dequeue_fail; + if (transport_vft->tx_type == TRANSPORT_TX_DGRAM) + { + svm_fifo_t *f = s0->server_tx_fifo; + u16 deq_now; + u32 offset; + + ASSERT (hdr.data_length > hdr.data_offset); + deq_now = clib_min (hdr.data_length - hdr.data_offset, + len_to_deq0); + offset = hdr.data_offset + SESSION_CONN_HDR_LEN; + n_bytes_read = svm_fifo_peek (f, offset, deq_now, data0); + if (PREDICT_FALSE (n_bytes_read <= 0)) + goto dequeue_fail; + + if (s0->session_state == SESSION_STATE_LISTENING) + { + ip_copy (&tc0->rmt_ip, &hdr.rmt_ip, tc0->is_ip4); + tc0->rmt_port = hdr.rmt_port; + } + hdr.data_offset += n_bytes_read; + if (hdr.data_offset == hdr.data_length) + { + offset = hdr.data_length + SESSION_CONN_HDR_LEN; + svm_fifo_dequeue_drop (f, offset); + } + } + else + { + n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo, + len_to_deq0, data0); + if (n_bytes_read <= 0) + goto dequeue_fail; + } } b0->current_length = n_bytes_read; - left_to_snd0 -= n_bytes_read; *n_tx_packets = *n_tx_packets + 1; @@ -307,7 +380,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, s0->server_tx_fifo, b0, bi0, n_bufs_per_seg, left_for_seg, &left_to_snd0, &n_bufs, &tx_offset, - deq_per_buf, peek_data); + deq_per_buf, peek_data, + transport_vft->tx_type); } /* Ask transport to push header after current_length and @@ -345,12 +419,18 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, /* If we couldn't dequeue all bytes mark as partially read */ if (max_len_to_snd0 < max_dequeue0) + if (svm_fifo_set_event (s0->server_tx_fifo)) + vec_add1 (smm->pending_event_vector[thread_index], *e0); + + if (!peek_data && transport_vft->tx_type == TRANSPORT_TX_DGRAM) { - /* If we don't already have new event */ - if (svm_fifo_set_event (s0->server_tx_fifo)) - { - vec_add1 (smm->pending_event_vector[thread_index], *e0); - } + /* Fix dgram pre header */ + if (max_len_to_snd0 < max_dequeue0) + svm_fifo_overwrite_head (s0->server_tx_fifo, (u8 *) & hdr, + sizeof (session_dgram_pre_hdr_t)); + /* More data needs to be read */ + else if (svm_fifo_max_dequeue (s0->server_tx_fifo) > 0) + vec_add1 (smm->pending_event_vector[thread_index], *e0); } return 0; @@ -360,7 +440,6 @@ dequeue_fail: * read, return buff to free list and return */ clib_warning ("dequeue fail"); - if (svm_fifo_set_event (s0->server_tx_fifo)) { vec_add1 (smm->pending_event_vector[thread_index], *e0); @@ -638,13 +717,6 @@ skip_dequeue: clib_warning ("It's dead, Jim!"); continue; } - /* Can retransmit for closed sessions but can't do anything if - * session is not ready or closed */ - if (PREDICT_FALSE (s0->session_state < SESSION_STATE_READY)) - { - vec_add1 (smm->pending_event_vector[my_thread_index], *e0); - continue; - } /* Spray packets in per session type frames, since they go to * different nodes */ rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0, diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h index b7a5eee4b12..9e0e4d98990 100644 --- a/src/vnet/session/stream_session.h +++ b/src/vnet/session/stream_session.h @@ -31,30 +31,19 @@ typedef enum SESSION_STATE_CONNECTING, SESSION_STATE_ACCEPTING, SESSION_STATE_READY, - SESSION_STATE_CONNECTING_READY, + SESSION_STATE_OPENED, SESSION_STATE_CLOSED, SESSION_STATE_N_STATES, } stream_session_state_t; -/* TODO convert to macro once cleanup completed */ -typedef struct app_session_ +typedef struct generic_session_ { - /** fifo pointers. Once allocated, these do not move */ - svm_fifo_t *server_rx_fifo; - svm_fifo_t *server_tx_fifo; - - /** Type */ - session_type_t session_type; - - /** State */ - volatile u8 session_state; - - /** Session index in owning pool */ - u32 session_index; - - /** Application index */ - u32 app_index; -} app_session_t; + svm_fifo_t *rx_fifo; /**< rx fifo */ + svm_fifo_t *tx_fifo; /**< tx fifo */ + session_type_t session_type; /**< session type */ + volatile u8 session_state; /**< session state */ + u32 session_index; /**< index in owning pool */ +} generic_session_t; typedef struct _stream_session_t { diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 797bdad1eaa..20b912929b4 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -57,6 +57,9 @@ format_transport_proto (u8 * s, va_list * args) case TRANSPORT_PROTO_SCTP: s = format (s, "SCTP"); break; + case TRANSPORT_PROTO_UDPC: + s = format (s, "UDPC"); + break; } return s; } @@ -76,6 +79,9 @@ format_transport_proto_short (u8 * s, va_list * args) case TRANSPORT_PROTO_SCTP: s = format (s, "S"); break; + case TRANSPORT_PROTO_UDPC: + s = format (s, "U"); + break; } return s; } @@ -100,6 +106,10 @@ unformat_transport_proto (unformat_input_t * input, va_list * args) *proto = TRANSPORT_PROTO_TLS; else if (unformat (input, "TLS")) *proto = TRANSPORT_PROTO_TLS; + else if (unformat (input, "udpc")) + *proto = TRANSPORT_PROTO_UDPC; + else if (unformat (input, "UDPC")) + *proto = TRANSPORT_PROTO_UDPC; else return 0; return 1; @@ -185,6 +195,12 @@ transport_protocol_get_vft (transport_proto_t transport_proto) return &tp_vfts[transport_proto]; } +transport_service_type_t +transport_protocol_service_type (transport_proto_t tp) +{ + return tp_vfts[tp].service_type; +} + #define PORT_MASK ((1 << 16)- 1) void diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index ed9eb02754e..8340fd859ac 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -35,10 +35,10 @@ typedef struct _transport_connection { ip46_address_t rmt_ip; /**< Remote IP */ ip46_address_t lcl_ip; /**< Local IP */ - u16 lcl_port; /**< Local port */ u16 rmt_port; /**< Remote port */ - u8 proto; /**< Protocol id */ + u16 lcl_port; /**< Local port */ u8 is_ip4; /**< Flag if IP4 connection */ + u8 proto; /**< Protocol id */ u32 fib_index; /**< Network namespace */ }; /* @@ -88,6 +88,7 @@ typedef enum _transport_proto TRANSPORT_PROTO_SCTP, TRANSPORT_PROTO_NONE, TRANSPORT_PROTO_TLS, + TRANSPORT_PROTO_UDPC, TRANSPORT_N_PROTO } transport_proto_t; @@ -99,7 +100,7 @@ uword unformat_transport_proto (unformat_input_t * input, va_list * args); _(u32, sw_if_index) /**< interface endpoint is associated with */ \ _(ip46_address_t, ip) /**< ip address */ \ _(u32, fib_index) /**< fib table endpoint is associated with */ \ - _(u8, is_ip4) /**< 1 if ip4 */ \ + _(u8, is_ip4) /**< set if ip4 */ \ _(u16, port) /**< port in net order */ \ typedef struct _transport_endpoint @@ -125,18 +126,6 @@ transport_endpoint_fib_proto (transport_endpoint_t * tep) return tep->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; } -always_inline u8 -transport_is_stream (u8 proto) -{ - return ((proto == TRANSPORT_PROTO_TCP) || (proto == TRANSPORT_PROTO_SCTP)); -} - -always_inline u8 -transport_is_dgram (u8 proto) -{ - return (proto == TRANSPORT_PROTO_UDP); -} - int transport_alloc_local_port (u8 proto, ip46_address_t * ip); int transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt, ip46_address_t * lcl_addr, diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index 04a5ff263b1..f21e483c715 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -23,7 +23,8 @@ typedef enum transport_dequeue_type_ { TRANSPORT_TX_PEEK, /**< reliable transport protos */ TRANSPORT_TX_DEQUEUE, /**< unreliable transport protos */ - TRANSPORT_TX_INTERNAL, /**< apps acting as transports */ + TRANSPORT_TX_INTERNAL, /**< apps acting as transports */ + TRANSPORT_TX_DGRAM, /**< datagram mode */ TRANSPORT_TX_N_FNS } transport_tx_fn_type_t; @@ -31,7 +32,7 @@ typedef enum transport_service_type_ { TRANSPORT_SERVICE_VC, /**< virtual circuit service */ TRANSPORT_SERVICE_CL, /**< connectionless service */ - TRANSPORT_SERVICE_APP, /**< app transport service */ + TRANSPORT_SERVICE_APP, /**< app transport service */ TRANSPORT_N_SERVICES } transport_service_type_t; @@ -96,6 +97,7 @@ void transport_register_protocol (transport_proto_t transport_proto, const transport_proto_vft_t * vft, fib_protocol_t fib_proto, u32 output_node); transport_proto_vft_t *transport_protocol_get_vft (transport_proto_t tp); +transport_service_type_t transport_protocol_service_type (transport_proto_t); void transport_update_time (f64 time_now, u8 thread_index); void transport_enable_disable (vlib_main_t * vm, u8 is_en); |