From 30e79c2e388a98160a3660f4f03103890c9b1b7c Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 2 Jan 2019 19:31:22 -0800 Subject: vcl/session: add api for changing session app worker In case of multi process apps, after forking, the parent may decide to close part or all of the sessions it shares with the child. Because the sessions have fifos allocated in the parent's segment manager, they must be moved to the child's segment manager. Change-Id: I85b4c8c8545005724023ee14043647719cef61dd Signed-off-by: Florin Coras --- src/vnet/session/application.c | 50 +++++++++++++++++++ src/vnet/session/application.h | 3 ++ src/vnet/session/application_interface.h | 48 +++++++++++++----- src/vnet/session/session.h | 5 +- src/vnet/session/session_node.c | 85 +++++++++++++++++++++++++++++++- 5 files changed, 176 insertions(+), 15 deletions(-) (limited to 'src/vnet/session') diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 19c8fa2f2e0..85b5f939427 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) return 0; } +int +app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s) +{ + segment_manager_t *sm; + svm_fifo_t *rxf, *txf; + + s->app_wrk_index = app_wrk->wrk_index; + + rxf = s->server_rx_fifo; + txf = s->server_tx_fifo; + + if (!rxf || !txf) + return 0; + + s->server_rx_fifo = 0; + s->server_tx_fifo = 0; + + sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); + if (session_alloc_fifos (sm, s)) + return -1; + + if (!svm_fifo_is_empty (rxf)) + { + clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems); + s->server_rx_fifo->head = rxf->head; + s->server_rx_fifo->tail = rxf->tail; + s->server_rx_fifo->cursize = rxf->cursize; + } + + if (!svm_fifo_is_empty (txf)) + { + clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems); + s->server_tx_fifo->head = txf->head; + s->server_tx_fifo->tail = txf->tail; + s->server_tx_fifo->cursize = txf->cursize; + } + + segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); + + return 0; +} + /** * Start listening local transport endpoint for requested transport. * @@ -889,6 +931,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app) return segment_manager_get (app->connects_seg_manager); } +segment_manager_t * +app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) +{ + if (app_wrk->connects_seg_manager == (u32) ~ 0) + app_worker_alloc_connects_segment_manager (app_wrk); + return segment_manager_get (app_wrk->connects_seg_manager); +} + segment_manager_t * app_worker_get_listen_segment_manager (app_worker_t * app, stream_session_t * listener) diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index e33f2ff797e..1d2064df62e 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk); app_worker_t *app_worker_get (u32 wrk_index); app_worker_t *app_worker_get_if_valid (u32 wrk_index); application_t *app_worker_get_app (u32 wrk_index); +int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s); void app_worker_free (app_worker_t * app_wrk); int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, u32 api_context); segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, stream_session_t *); segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); +segment_manager_t + * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *); int app_worker_alloc_connects_segment_manager (app_worker_t * app); int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle); u32 app_worker_n_listeners (app_worker_t * app); diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index a156c82a745..9c48faa8abc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -220,9 +220,9 @@ typedef struct session_bound_msg_ u8 lcl_is_ip4; u8 lcl_ip[16]; u16 lcl_port; - u64 rx_fifo; - u64 tx_fifo; - u64 vpp_evt_q; + uword rx_fifo; + uword tx_fifo; + uword vpp_evt_q; u32 segment_size; u8 segment_name_length; u8 segment_name[128]; @@ -233,12 +233,12 @@ typedef struct session_accepted_msg_ u32 context; u64 listener_handle; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 server_event_queue_address; - u64 client_event_queue_address; + uword vpp_event_queue_address; + uword server_event_queue_address; + uword client_event_queue_address; u16 port; u8 is_ip4; u8 ip[16]; @@ -260,12 +260,12 @@ typedef struct session_connected_msg_ u32 context; i32 retval; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 client_event_queue_address; - u64 server_event_queue_address; + uword vpp_event_queue_address; + uword client_event_queue_address; + uword server_event_queue_address; u32 segment_size; u8 segment_name_length; u8 segment_name[64]; @@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_ u64 handle; } __clib_packed session_reset_reply_msg_t; +typedef struct session_req_worker_update_msg_ +{ + u64 session_handle; +} __clib_packed session_req_worker_update_msg_t; + +/* NOTE: using u16 for wrk indices because message needs to fit in 18B */ +typedef struct session_worker_update_msg_ +{ + u32 client_index; + u16 wrk_index; + u16 req_wrk_index; + u64 handle; +} __clib_packed session_worker_update_msg_t; + +typedef struct session_worker_update_reply_msg_ +{ + u64 handle; + uword rx_fifo; + uword tx_fifo; + u64 segment_handle; +} __clib_packed session_worker_update_reply_msg_t; + typedef struct app_session_event_ { svm_msg_q_msg_t msg; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e3c73000edf..cf1b3e99f4d 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -49,7 +49,10 @@ typedef enum SESSION_CTRL_EVT_DISCONNECTED, SESSION_CTRL_EVT_DISCONNECTED_REPLY, SESSION_CTRL_EVT_RESET, - SESSION_CTRL_EVT_RESET_REPLY + SESSION_CTRL_EVT_RESET_REPLY, + SESSION_CTRL_EVT_REQ_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE_REPLY, } session_evt_type_t; static inline const char * diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 98965f334af..880f16388b8 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data) svm_msg_q_unlock (app_wrk->event_queue); evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = SESSION_CTRL_EVT_DISCONNECTED; + evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY; rmp = (session_disconnected_reply_msg_t *) evt->data; rmp->handle = mp->handle; rmp->context = mp->context; @@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data) } } +static void +session_mq_worker_update_handler (void *data) +{ + session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data; + session_worker_update_reply_msg_t *rmp; + svm_msg_q_msg_t _msg, *msg = &_msg; + app_worker_t *app_wrk; + u32 owner_app_wrk_map; + session_event_t *evt; + stream_session_t *s; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + if (!(s = session_get_from_handle_if_valid (mp->handle))) + { + clib_warning ("invalid handle %llu", mp->handle); + return; + } + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != app->app_index) + { + clib_warning ("app %u does not own session %llu", app->app_index, + mp->handle); + return; + } + owner_app_wrk_map = app_wrk->wrk_map_index; + app_wrk = application_get_worker (app, mp->wrk_index); + + /* This needs to come from the new owner */ + if (mp->req_wrk_index == owner_app_wrk_map) + { + session_req_worker_update_msg_t *wump; + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE; + wump = (session_req_worker_update_msg_t *) evt->data; + wump->session_handle = mp->handle; + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + return; + } + + app_worker_own_session (app_wrk, s); + + /* + * Send reply + */ + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY; + rmp = (session_worker_update_reply_msg_t *) evt->data; + rmp->handle = mp->handle; + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + rmp->segment_handle = session_segment_handle (s); + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + + /* + * Retransmit messages that may have been lost + */ + if (!svm_fifo_is_empty (s->server_tx_fifo)) + session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX); + + if (!svm_fifo_is_empty (s->server_rx_fifo)) + app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + app->cb_fns.session_disconnect_callback (s); +} + vlib_node_registration_t session_queue_node; typedef struct @@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, case SESSION_CTRL_EVT_RESET_REPLY: session_mq_reset_reply_handler (e->data); break; + case SESSION_CTRL_EVT_WORKER_UPDATE: + session_mq_worker_update_handler (e->data); + break; default: clib_warning ("unhandled event type %d", e->event_type); } -- cgit 1.2.3-korg