diff options
Diffstat (limited to 'src/vnet/session')
-rw-r--r-- | src/vnet/session/application.c | 50 | ||||
-rw-r--r-- | src/vnet/session/application.h | 3 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 48 | ||||
-rw-r--r-- | src/vnet/session/session.h | 5 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 85 |
5 files changed, 176 insertions, 15 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 19c8fa2f2e0..85b5f939427 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) return 0; } +int +app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s) +{ + segment_manager_t *sm; + svm_fifo_t *rxf, *txf; + + s->app_wrk_index = app_wrk->wrk_index; + + rxf = s->server_rx_fifo; + txf = s->server_tx_fifo; + + if (!rxf || !txf) + return 0; + + s->server_rx_fifo = 0; + s->server_tx_fifo = 0; + + sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); + if (session_alloc_fifos (sm, s)) + return -1; + + if (!svm_fifo_is_empty (rxf)) + { + clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems); + s->server_rx_fifo->head = rxf->head; + s->server_rx_fifo->tail = rxf->tail; + s->server_rx_fifo->cursize = rxf->cursize; + } + + if (!svm_fifo_is_empty (txf)) + { + clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems); + s->server_tx_fifo->head = txf->head; + s->server_tx_fifo->tail = txf->tail; + s->server_tx_fifo->cursize = txf->cursize; + } + + segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); + + return 0; +} + /** * Start listening local transport endpoint for requested transport. * @@ -890,6 +932,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app) } segment_manager_t * +app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) +{ + if (app_wrk->connects_seg_manager == (u32) ~ 0) + app_worker_alloc_connects_segment_manager (app_wrk); + return segment_manager_get (app_wrk->connects_seg_manager); +} + +segment_manager_t * app_worker_get_listen_segment_manager (app_worker_t * app, stream_session_t * listener) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index e33f2ff797e..1d2064df62e 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk); app_worker_t *app_worker_get (u32 wrk_index); app_worker_t *app_worker_get_if_valid (u32 wrk_index); application_t *app_worker_get_app (u32 wrk_index); +int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s); void app_worker_free (app_worker_t * app_wrk); int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, u32 api_context); segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, stream_session_t *); segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); +segment_manager_t + * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *); int app_worker_alloc_connects_segment_manager (app_worker_t * app); int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle); u32 app_worker_n_listeners (app_worker_t * app); diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index a156c82a745..9c48faa8abc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -220,9 +220,9 @@ typedef struct session_bound_msg_ u8 lcl_is_ip4; u8 lcl_ip[16]; u16 lcl_port; - u64 rx_fifo; - u64 tx_fifo; - u64 vpp_evt_q; + uword rx_fifo; + uword tx_fifo; + uword vpp_evt_q; u32 segment_size; u8 segment_name_length; u8 segment_name[128]; @@ -233,12 +233,12 @@ typedef struct session_accepted_msg_ u32 context; u64 listener_handle; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 server_event_queue_address; - u64 client_event_queue_address; + uword vpp_event_queue_address; + uword server_event_queue_address; + uword client_event_queue_address; u16 port; u8 is_ip4; u8 ip[16]; @@ -260,12 +260,12 @@ typedef struct session_connected_msg_ u32 context; i32 retval; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 client_event_queue_address; - u64 server_event_queue_address; + uword vpp_event_queue_address; + uword client_event_queue_address; + uword server_event_queue_address; u32 segment_size; u8 segment_name_length; u8 segment_name[64]; @@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_ u64 handle; } __clib_packed session_reset_reply_msg_t; +typedef struct session_req_worker_update_msg_ +{ + u64 session_handle; +} __clib_packed session_req_worker_update_msg_t; + +/* NOTE: using u16 for wrk indices because message needs to fit in 18B */ +typedef struct session_worker_update_msg_ +{ + u32 client_index; + u16 wrk_index; + u16 req_wrk_index; + u64 handle; +} __clib_packed session_worker_update_msg_t; + +typedef struct session_worker_update_reply_msg_ +{ + u64 handle; + uword rx_fifo; + uword tx_fifo; + u64 segment_handle; +} __clib_packed session_worker_update_reply_msg_t; + typedef struct app_session_event_ { svm_msg_q_msg_t msg; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e3c73000edf..cf1b3e99f4d 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -49,7 +49,10 @@ typedef enum SESSION_CTRL_EVT_DISCONNECTED, SESSION_CTRL_EVT_DISCONNECTED_REPLY, SESSION_CTRL_EVT_RESET, - SESSION_CTRL_EVT_RESET_REPLY + SESSION_CTRL_EVT_RESET_REPLY, + SESSION_CTRL_EVT_REQ_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE_REPLY, } session_evt_type_t; static inline const char * diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 98965f334af..880f16388b8 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data) svm_msg_q_unlock (app_wrk->event_queue); evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = SESSION_CTRL_EVT_DISCONNECTED; + evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY; rmp = (session_disconnected_reply_msg_t *) evt->data; rmp->handle = mp->handle; rmp->context = mp->context; @@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data) } } +static void +session_mq_worker_update_handler (void *data) +{ + session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data; + session_worker_update_reply_msg_t *rmp; + svm_msg_q_msg_t _msg, *msg = &_msg; + app_worker_t *app_wrk; + u32 owner_app_wrk_map; + session_event_t *evt; + stream_session_t *s; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + if (!(s = session_get_from_handle_if_valid (mp->handle))) + { + clib_warning ("invalid handle %llu", mp->handle); + return; + } + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != app->app_index) + { + clib_warning ("app %u does not own session %llu", app->app_index, + mp->handle); + return; + } + owner_app_wrk_map = app_wrk->wrk_map_index; + app_wrk = application_get_worker (app, mp->wrk_index); + + /* This needs to come from the new owner */ + if (mp->req_wrk_index == owner_app_wrk_map) + { + session_req_worker_update_msg_t *wump; + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE; + wump = (session_req_worker_update_msg_t *) evt->data; + wump->session_handle = mp->handle; + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + return; + } + + app_worker_own_session (app_wrk, s); + + /* + * Send reply + */ + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY; + rmp = (session_worker_update_reply_msg_t *) evt->data; + rmp->handle = mp->handle; + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + rmp->segment_handle = session_segment_handle (s); + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + + /* + * Retransmit messages that may have been lost + */ + if (!svm_fifo_is_empty (s->server_tx_fifo)) + session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX); + + if (!svm_fifo_is_empty (s->server_rx_fifo)) + app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + app->cb_fns.session_disconnect_callback (s); +} + vlib_node_registration_t session_queue_node; typedef struct @@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, case SESSION_CTRL_EVT_RESET_REPLY: session_mq_reset_reply_handler (e->data); break; + case SESSION_CTRL_EVT_WORKER_UPDATE: + session_mq_worker_update_handler (e->data); + break; default: clib_warning ("unhandled event type %d", e->event_type); } |