aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session_node.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session/session_node.c')
-rw-r--r--src/vnet/session/session_node.c85
1 files changed, 84 insertions, 1 deletions
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 98965f334af..880f16388b8 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data)
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+ evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
rmp = (session_disconnected_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->context = mp->context;
@@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data)
}
}
+static void
+session_mq_worker_update_handler (void *data)
+{
+ session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+ session_worker_update_reply_msg_t *rmp;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ app_worker_t *app_wrk;
+ u32 owner_app_wrk_map;
+ session_event_t *evt;
+ stream_session_t *s;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("invalid handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("app %u does not own session %llu", app->app_index,
+ mp->handle);
+ return;
+ }
+ owner_app_wrk_map = app_wrk->wrk_map_index;
+ app_wrk = application_get_worker (app, mp->wrk_index);
+
+ /* This needs to come from the new owner */
+ if (mp->req_wrk_index == owner_app_wrk_map)
+ {
+ session_req_worker_update_msg_t *wump;
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+ wump = (session_req_worker_update_msg_t *) evt->data;
+ wump->session_handle = mp->handle;
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+ return;
+ }
+
+ app_worker_own_session (app_wrk, s);
+
+ /*
+ * Send reply
+ */
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+ rmp = (session_worker_update_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+ rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+ rmp->segment_handle = session_segment_handle (s);
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+ /*
+ * Retransmit messages that may have been lost
+ */
+ if (!svm_fifo_is_empty (s->server_tx_fifo))
+ session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+ if (!svm_fifo_is_empty (s->server_rx_fifo))
+ app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ app->cb_fns.session_disconnect_callback (s);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
@@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
case SESSION_CTRL_EVT_RESET_REPLY:
session_mq_reset_reply_handler (e->data);
break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (e->data);
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}