diff options
Diffstat (limited to 'src/vnet')
-rw-r--r-- | src/vnet/session-apps/echo_server.c | 6 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 20 | ||||
-rw-r--r-- | src/vnet/session/application_local.c | 134 | ||||
-rw-r--r-- | src/vnet/session/application_local.h | 6 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 5 | ||||
-rw-r--r-- | src/vnet/session/segment_manager.c | 10 | ||||
-rw-r--r-- | src/vnet/session/session.c | 21 | ||||
-rw-r--r-- | src/vnet/session/session.h | 5 | ||||
-rwxr-xr-x | src/vnet/session/session_api.c | 87 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 22 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 2 |
11 files changed, 158 insertions, 160 deletions
diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c index 2762347d441..941c16d0e08 100644 --- a/src/vnet/session-apps/echo_server.c +++ b/src/vnet/session-apps/echo_server.c @@ -240,14 +240,16 @@ echo_server_rx_callback (session_t * s) n_written = app_send_stream_raw (tx_fifo, esm->vpp_queue[thread_index], esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, 0); + actual_transfer, SESSION_IO_EVT_TX, + 1 /* do_evt */ , 0); } else { n_written = app_send_dgram_raw (tx_fifo, &at, esm->vpp_queue[s->thread_index], esm->rx_buf[thread_index], - actual_transfer, SESSION_IO_EVT_TX, 0); + actual_transfer, SESSION_IO_EVT_TX, + 1 /* do_evt */ , 0); } if (n_written != max_transfer) diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 935a352a436..d4dfeec54dc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -283,8 +283,6 @@ typedef struct session_accepted_msg_ uword server_tx_fifo; u64 segment_handle; uword vpp_event_queue_address; - uword server_event_queue_address; - uword client_event_queue_address; u16 port; u8 is_ip4; u8 ip[16]; @@ -309,9 +307,10 @@ typedef struct session_connected_msg_ uword server_rx_fifo; uword server_tx_fifo; u64 segment_handle; + uword ct_rx_fifo; + uword ct_tx_fifo; + u64 ct_segment_handle; uword vpp_event_queue_address; - uword client_event_queue_address; - uword server_event_queue_address; u32 segment_size; u8 segment_name_length; u8 segment_name[64]; @@ -454,7 +453,7 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type, always_inline int app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type, - u8 noblock) + u8 do_evt, u8 noblock) { u32 max_enqueue, actual_write; session_dgram_hdr_t hdr; @@ -478,7 +477,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at, if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0) { - if (svm_fifo_set_event (f)) + if (do_evt && svm_fifo_set_event (f)) app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock); } ASSERT (rv); @@ -489,18 +488,19 @@ always_inline int app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock) { return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data, - len, SESSION_IO_EVT_TX, noblock); + len, SESSION_IO_EVT_TX, 1 /* do_evt */ , + noblock); } always_inline int app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data, - u32 len, u8 evt_type, u8 noblock) + u32 len, u8 evt_type, u8 do_evt, u8 noblock) { int rv; if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0) { - if (svm_fifo_set_event (f)) + if (do_evt && svm_fifo_set_event (f)) app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock); } return rv; @@ -510,7 +510,7 @@ always_inline int app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock) { return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len, - SESSION_IO_EVT_TX, noblock); + SESSION_IO_EVT_TX, 1 /* do_evt */ , noblock); } always_inline int diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c index 9378e5e6380..745b202f580 100644 --- a/src/vnet/session/application_local.c +++ b/src/vnet/session/application_local.c @@ -16,9 +16,28 @@ #include <vnet/session/application_local.h> #include <vnet/session/session.h> -ct_connection_t *connections; +static ct_connection_t *connections; -ct_connection_t * +static void +ct_enable_disable_main_pre_input_node (u8 is_add) +{ + u32 n_conns; + + n_conns = pool_elts (connections); + if (n_conns > 2) + return; + + if (n_conns > 0 && is_add) + vlib_node_set_state (vlib_get_main (), + session_queue_pre_input_node.index, + VLIB_NODE_STATE_POLLING); + else if (n_conns == 0) + vlib_node_set_state (vlib_get_main (), + session_queue_pre_input_node.index, + VLIB_NODE_STATE_DISABLED); +} + +static ct_connection_t * ct_connection_alloc (void) { ct_connection_t *ct; @@ -31,7 +50,7 @@ ct_connection_alloc (void) return ct; } -ct_connection_t * +static ct_connection_t * ct_connection_get (u32 ct_index) { if (pool_is_free_index (connections, ct_index)) @@ -39,7 +58,7 @@ ct_connection_get (u32 ct_index) return pool_elt_at_index (connections, ct_index); } -void +static void ct_connection_free (ct_connection_t * ct) { if (CLIB_DEBUG) @@ -110,8 +129,14 @@ ct_session_connect_notify (session_t * ss) cs->session_state = SESSION_STATE_CONNECTING; cs->app_wrk_index = client_wrk->wrk_index; cs->connection_index = cct->c_c_index; + cs->t_app_index = client_wrk->app_index; cct->c_s_index = cs->session_index; + cct->client_rx_fifo = ss->tx_fifo; + cct->client_tx_fifo = ss->rx_fifo; + + cct->client_rx_fifo->refcnt++; + cct->client_tx_fifo->refcnt++; /* This will allocate fifos for the session. They won't be used for * exchanging data but they will be used to close the connection if @@ -135,47 +160,25 @@ ct_session_connect_notify (session_t * ss) return 0; } -static void -ct_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) -{ - int fd; - - /* - * segment manager initializes only the producer eventds, since vpp is - * typically the producer. But for local sessions, we also pass to the - * apps the mqs they listen on for events from peer apps, so they are also - * consumer fds. - */ - fd = svm_msg_q_get_producer_eventfd (sq); - svm_msg_q_set_consumer_eventfd (sq, fd); - fd = svm_msg_q_get_producer_eventfd (cq); - svm_msg_q_set_consumer_eventfd (cq, fd); -} - -int +static int ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, ct_connection_t * ct, session_t * ls, session_t * ll) { - u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; - u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index; - segment_manager_properties_t *props, *cprops; + u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size; + segment_manager_properties_t *props; svm_fifo_segment_private_t *seg; - application_t *server, *client; + application_t *server; segment_manager_t *sm; - svm_msg_q_t *sq, *cq; + u32 margin = 16 << 10; u64 segment_handle; int seg_index, rv; server = application_get (server_wrk->app_index); - client = application_get (client_wrk->app_index); props = application_segment_manager_properties (server); - cprops = application_segment_manager_properties (client); - evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); - seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; + seg_size = round_rx_fifo_sz + round_tx_fifo_sz + margin; sm = app_worker_get_listen_segment_manager (server_wrk, ll); seg_index = segment_manager_add_segment (sm, seg_size); @@ -185,14 +188,7 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, return seg_index; } seg = segment_manager_get_segment_w_lock (sm, seg_index); - sq = segment_manager_alloc_queue (seg, props); - cq = segment_manager_alloc_queue (seg, cprops); - - if (props->use_mq_eventfd) - ct_session_fix_eventds (sq, cq); - ct->server_evt_q = pointer_to_uword (sq); - ct->client_evt_q = pointer_to_uword (cq); rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, props->tx_fifo_size, &ls->rx_fifo, &ls->tx_fifo); @@ -204,8 +200,8 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk, } sm_index = segment_manager_index (sm); - ls->rx_fifo->ct_session_index = ls->session_index; - ls->tx_fifo->ct_session_index = ls->session_index; + ls->rx_fifo->master_session_index = ls->session_index; + ls->tx_fifo->master_session_index = ls->session_index; ls->rx_fifo->segment_manager = sm_index; ls->tx_fifo->segment_manager = sm_index; ls->rx_fifo->segment_index = seg_index; @@ -228,7 +224,7 @@ failed: return rv; } -int +static int ct_connect (app_worker_t * client_wrk, session_t * ll, session_endpoint_cfg_t * sep) { @@ -255,6 +251,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, cct->c_is_ip4 = sep->is_ip4; clib_memcpy (&cct->c_rmt_ip, &sep->ip, sizeof (sep->ip)); cct->actual_tp = ll_ct->actual_tp; + cct->is_client = 1; /* * Init server transport @@ -285,6 +282,7 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, server_wrk = application_listener_select_worker (ll); ss->app_wrk_index = server_wrk->wrk_index; + ss->t_app_index = server_wrk->app_index; sct->c_s_index = ss->session_index; sct->server_wrk = ss->app_wrk_index; @@ -306,14 +304,12 @@ ct_connect (app_worker_t * client_wrk, session_t * ll, return -1; } - cct->client_evt_q = sct->client_evt_q; - cct->server_evt_q = sct->server_evt_q; cct->segment_handle = sct->segment_handle; - + ct_enable_disable_main_pre_input_node (1 /* is_add */ ); return 0; } -u32 +static u32 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep) { session_endpoint_cfg_t *sep; @@ -326,25 +322,27 @@ ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep) clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip)); ct->c_lcl_port = sep->port; ct->actual_tp = sep->transport_proto; + ct_enable_disable_main_pre_input_node (1 /* is_add */ ); return ct->c_c_index; } -u32 +static u32 ct_stop_listen (u32 ct_index) { ct_connection_t *ct; ct = ct_connection_get (ct_index); ct_connection_free (ct); + ct_enable_disable_main_pre_input_node (0 /* is_add */ ); return 0; } -transport_connection_t * +static transport_connection_t * ct_listener_get (u32 ct_index) { return (transport_connection_t *) ct_connection_get (ct_index); } -int +static int ct_session_connect (transport_endpoint_cfg_t * tep) { session_endpoint_cfg_t *sep_ext; @@ -407,10 +405,11 @@ global_scope: return 1; } -void +static void ct_session_close (u32 ct_index, u32 thread_index) { ct_connection_t *ct, *peer_ct; + app_worker_t *app_wrk; session_t *s; ct = ct_connection_get (ct_index); @@ -422,13 +421,18 @@ ct_session_close (u32 ct_index, u32 thread_index) } s = session_get (ct->c_s_index, 0); - app_worker_del_segment_notify (app_worker_get (s->app_wrk_index), - ct->segment_handle); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (app_wrk) + app_worker_del_segment_notify (app_wrk, ct->segment_handle); session_free_w_fifos (s); + if (ct->is_client) + segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo); + ct_connection_free (ct); + ct_enable_disable_main_pre_input_node (0 /* is_add */ ); } -transport_connection_t * +static transport_connection_t * ct_session_get (u32 ct_index, u32 thread_index) { return (transport_connection_t *) ct_connection_get (ct_index); @@ -460,7 +464,7 @@ format_ct_connection_id (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_listener (u8 * s, va_list * args) { u32 tc_index = va_arg (*args, u32); @@ -472,7 +476,7 @@ format_ct_listener (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_connection (u8 * s, va_list * args) { ct_connection_t *ct = va_arg (*args, ct_connection_t *); @@ -492,7 +496,7 @@ format_ct_connection (u8 * s, va_list * args) return s; } -u8 * +static u8 * format_ct_session (u8 * s, va_list * args) { u32 ct_index = va_arg (*args, u32); @@ -526,11 +530,29 @@ const static transport_proto_vft_t cut_thru_proto = { }; /* *INDENT-ON* */ +int +ct_session_tx (session_t * s) +{ + ct_connection_t *ct, *peer_ct; + session_t *peer_s; + + ct = (ct_connection_t *) session_get_transport (s); + peer_ct = ct_connection_get (ct->peer_index); + if (!peer_ct) + return -1; + peer_s = session_get (peer_ct->c_s_index, 0); + if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + return 0; + return session_enqueue_notify (peer_s); +} + static clib_error_t * ct_transport_init (vlib_main_t * vm) { transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto, FIB_PROTOCOL_IP4, ~0); + transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto, + FIB_PROTOCOL_IP6, ~0); return 0; } diff --git a/src/vnet/session/application_local.h b/src/vnet/session/application_local.h index 5d6e6c1ec7b..7b937d32f0b 100644 --- a/src/vnet/session/application_local.h +++ b/src/vnet/session/application_local.h @@ -27,16 +27,18 @@ typedef struct ct_connection_ u32 server_wrk; u32 transport_listener_index; transport_proto_t actual_tp; - u64 server_evt_q; - u64 client_evt_q; u32 client_opaque; u32 peer_index; u64 segment_handle; + svm_fifo_t *client_rx_fifo; + svm_fifo_t *client_tx_fifo; + u8 is_client; } ct_connection_t; session_t *ct_session_get_peer (session_t * s); void ct_session_endpoint (session_t * ll, session_endpoint_t * sep); int ct_session_connect_notify (session_t * ls); +int ct_session_tx (session_t * s); #endif /* SRC_VNET_SESSION_APPLICATION_LOCAL_H_ */ diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 9dfa3aa0243..7c888882093 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -543,7 +543,7 @@ app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock) return app->cb_fns.builtin_app_rx_callback (s); } - if (svm_fifo_has_event (s->rx_fifo) || svm_fifo_is_empty (s->rx_fifo)) + if (svm_fifo_has_event (s->rx_fifo)) return 0; mq = app_wrk->event_queue; @@ -608,9 +608,8 @@ app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock) typedef int (app_send_evt_handler_fn) (app_worker_t *app, session_t *s, u8 lock); -static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { +static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = { app_send_io_evt_rx, - 0, app_send_io_evt_tx, }; /* *INDENT-ON* */ diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index b7467bbbd43..25b641de167 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -408,12 +408,10 @@ segment_manager_del_sessions (segment_manager_t * sm) */ while (fifo) { - if (fifo->ct_session_index != SVM_FIFO_INVALID_SESSION_INDEX) - session = session_get (fifo->ct_session_index, 0); - else - session = session_get (fifo->master_session_index, - fifo->master_thread_index); - vec_add1 (handles, session_handle (session)); + session = session_get_if_valid (fifo->master_session_index, + fifo->master_thread_index); + if (session) + vec_add1 (handles, session_handle (session)); fifo = fifo->next; } diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 57567926a1c..6e24d562f98 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -124,13 +124,6 @@ session_program_transport_close (session_t * s) session_worker_t *wrk; session_event_t *evt; - if (!session_has_transport (s)) - { - /* Polling may not be enabled on main thread so close now */ - session_transport_close (s); - return; - } - /* If we are in the handler thread, or being called with the worker barrier * held, just append a new event to pending disconnects vector. */ if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index) @@ -483,7 +476,7 @@ session_notify_subscribers (u32 app_index, session_t * s, * @return 0 on success or negative number if failed to send notification. */ static inline int -session_enqueue_notify (session_t * s) +session_enqueue_notify_inline (session_t * s) { app_worker_t *app_wrk; @@ -513,6 +506,12 @@ session_enqueue_notify (session_t * s) } int +session_enqueue_notify (session_t * s) +{ + return session_enqueue_notify_inline (s); +} + +int session_dequeue_notify (session_t * s) { app_worker_t *app_wrk; @@ -560,7 +559,11 @@ session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index) errors++; continue; } - if (PREDICT_FALSE (session_enqueue_notify (s))) + + if (svm_fifo_is_empty (s->rx_fifo)) + continue; + + if (PREDICT_FALSE (session_enqueue_notify_inline (s))) errors++; } diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index a7c91949eb3..cea1b375108 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -187,6 +187,7 @@ typedef struct session_main_ extern session_main_t session_main; extern vlib_node_registration_t session_queue_node; extern vlib_node_registration_t session_queue_process_node; +extern vlib_node_registration_t session_queue_pre_input_node; #define SESSION_Q_PROCESS_FLUSH_FRAMES 1 #define SESSION_Q_PROCESS_STOP 2 @@ -196,6 +197,9 @@ session_is_valid (u32 si, u8 thread_index) { session_t *s; s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si); + if (s->session_state == SESSION_STATE_CLOSED) + return 1; + if (s->thread_index != thread_index || s->session_index != si) return 0; return 1; @@ -331,6 +335,7 @@ void session_transport_close (session_t * s); void session_transport_cleanup (session_t * s); int session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type); +int session_enqueue_notify (session_t * s); int session_dequeue_notify (session_t * s); int session_send_io_evt_to_thread_custom (void *data, u32 thread_index, session_evt_type_t evt_type); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 8ee25a9a2ce..525f63799be 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -150,54 +150,6 @@ send_del_segment_callback (u32 api_client_index, u64 segment_handle) } static int -send_app_cut_through_registration_add (u32 api_client_index, - u32 wrk_map_index, u64 mq_addr, - u64 peer_mq_addr) -{ - vl_api_app_cut_through_registration_add_t *mp; - vl_api_registration_t *reg; - svm_msg_q_t *mq, *peer_mq; - int fds[2]; - - reg = vl_mem_api_client_index_to_registration (api_client_index); - if (!reg) - { - clib_warning ("no registration: %u", api_client_index); - return -1; - } - - mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp)); - clib_memset (mp, 0, sizeof (*mp)); - mp->_vl_msg_id = - clib_host_to_net_u16 (VL_API_APP_CUT_THROUGH_REGISTRATION_ADD); - - mp->evt_q_address = mq_addr; - mp->peer_evt_q_address = peer_mq_addr; - mp->wrk_index = wrk_map_index; - - mq = uword_to_pointer (mq_addr, svm_msg_q_t *); - peer_mq = uword_to_pointer (peer_mq_addr, svm_msg_q_t *); - - if (svm_msg_q_get_producer_eventfd (mq) != -1) - { - mp->fd_flags |= SESSION_FD_F_MQ_EVENTFD; - mp->n_fds = 2; - /* app will overwrite exactly the fds we pass here. So - * when we swap mq with peer_mq (accept vs connect) the - * fds will still be valid */ - fds[0] = svm_msg_q_get_consumer_eventfd (mq); - fds[1] = svm_msg_q_get_producer_eventfd (peer_mq); - } - - vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp); - - if (mp->n_fds != 0) - session_send_fds (reg, fds, mp->n_fds); - - return 0; -} - -static int mq_try_lock_and_alloc_msg (svm_msg_q_t * app_mq, svm_msg_q_msg_t * msg) { int rv; @@ -268,25 +220,17 @@ mq_send_session_accepted_cb (session_t * s) } else { - u8 main_thread = vlib_num_workers ()? 1 : 0; ct_connection_t *ct; ct = (ct_connection_t *) session_get_transport (s); - send_app_cut_through_registration_add (app_wrk->api_client_index, - app_wrk->wrk_map_index, - ct->server_evt_q, - ct->client_evt_q); - listener = listen_session_get (s->listener_index); al = app_listener_get (app, listener->al_index); mp->listener_handle = app_listener_handle (al); mp->is_ip4 = session_type_is_ip4 (listener->session_type); mp->handle = session_handle (s); mp->port = ct->c_rmt_port; - vpp_queue = session_main_get_vpp_event_queue (main_thread); + vpp_queue = session_main_get_vpp_event_queue (0); mp->vpp_event_queue_address = pointer_to_uword (vpp_queue); - mp->client_event_queue_address = ct->client_evt_q; - mp->server_event_queue_address = ct->server_evt_q; } svm_msg_q_add_and_unlock (app_mq, msg); @@ -415,26 +359,22 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context, } else { - u8 main_thread = vlib_num_workers ()? 1 : 0; ct_connection_t *cct; session_t *ss; cct = (ct_connection_t *) session_get_transport (s); - send_app_cut_through_registration_add (app_wrk->api_client_index, - app_wrk->wrk_map_index, - cct->client_evt_q, - cct->server_evt_q); - mp->handle = session_handle (s); mp->lcl_port = cct->c_lcl_port; - vpp_mq = session_main_get_vpp_event_queue (main_thread); + mp->is_ip4 = cct->c_is_ip4; + vpp_mq = session_main_get_vpp_event_queue (0); mp->vpp_event_queue_address = pointer_to_uword (vpp_mq); - mp->client_event_queue_address = cct->client_evt_q; - mp->server_event_queue_address = cct->server_evt_q; + mp->server_rx_fifo = pointer_to_uword (s->rx_fifo); + mp->server_tx_fifo = pointer_to_uword (s->tx_fifo); + mp->segment_handle = session_segment_handle (s); ss = ct_session_get_peer (s); - mp->server_rx_fifo = pointer_to_uword (ss->tx_fifo); - mp->server_tx_fifo = pointer_to_uword (ss->rx_fifo); - mp->segment_handle = session_segment_handle (ss); + mp->ct_rx_fifo = pointer_to_uword (ss->tx_fifo); + mp->ct_tx_fifo = pointer_to_uword (ss->rx_fifo); + mp->ct_segment_handle = session_segment_handle (ss); } done: @@ -505,11 +445,20 @@ done: return 0; } +static int +mq_app_tx_callback (session_t * s) +{ + if (session_has_transport (s)) + return 0; + return ct_session_tx (s); +} + static session_cb_vft_t session_mq_cb_vft = { .session_accept_callback = mq_send_session_accepted_cb, .session_disconnect_callback = mq_send_session_disconnected_cb, .session_connected_callback = mq_send_session_connected_cb, .session_reset_callback = mq_send_session_reset_cb, + .builtin_app_tx_callback = mq_app_tx_callback, .add_segment_callback = send_add_segment_callback, .del_segment_callback = send_del_segment_callback, }; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index d0936c7e13d..db5123b8b2d 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -74,9 +74,9 @@ session_mq_accepted_reply_handler (void *data) if (!session_has_transport (s)) { + s->session_state = SESSION_STATE_READY; if (ct_session_connect_notify (s)) return; - s->session_state = SESSION_STATE_READY; } else { @@ -1234,6 +1234,26 @@ VLIB_REGISTER_NODE (session_queue_process_node) = }; /* *INDENT-ON* */ +static_always_inline uword +session_queue_pre_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + session_main_t *sm = &session_main; + if (!sm->wrk[0].vpp_event_queue) + return 0; + return session_queue_node_fn (vm, node, frame); +} + +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (session_queue_pre_input_node) = +{ + .function = session_queue_pre_input_inline, + .type = VLIB_NODE_TYPE_PRE_INPUT, + .name = "session-queue-main", + .state = VLIB_NODE_STATE_DISABLED, +}; +/* *INDENT-ON* */ + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index c4240ab420a..9e51d69db42 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -278,9 +278,7 @@ session_parse_handle (session_handle_t handle, u32 * index, typedef enum { SESSION_IO_EVT_RX, - SESSION_IO_EVT_CT_RX, SESSION_IO_EVT_TX, - SESSION_IO_EVT_CT_TX, SESSION_IO_EVT_TX_FLUSH, SESSION_IO_EVT_BUILTIN_RX, SESSION_IO_EVT_BUILTIN_TX, |