aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet')
-rw-r--r--src/vnet/session/application.c50
-rw-r--r--src/vnet/session/application.h3
-rw-r--r--src/vnet/session/application_interface.h48
-rw-r--r--src/vnet/session/session.h5
-rw-r--r--src/vnet/session/session_node.c85
5 files changed, 176 insertions, 15 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 19c8fa2f2e0..85b5f939427 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle)
return 0;
}
+int
+app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s)
+{
+ segment_manager_t *sm;
+ svm_fifo_t *rxf, *txf;
+
+ s->app_wrk_index = app_wrk->wrk_index;
+
+ rxf = s->server_rx_fifo;
+ txf = s->server_tx_fifo;
+
+ if (!rxf || !txf)
+ return 0;
+
+ s->server_rx_fifo = 0;
+ s->server_tx_fifo = 0;
+
+ sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+ if (session_alloc_fifos (sm, s))
+ return -1;
+
+ if (!svm_fifo_is_empty (rxf))
+ {
+ clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems);
+ s->server_rx_fifo->head = rxf->head;
+ s->server_rx_fifo->tail = rxf->tail;
+ s->server_rx_fifo->cursize = rxf->cursize;
+ }
+
+ if (!svm_fifo_is_empty (txf))
+ {
+ clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems);
+ s->server_tx_fifo->head = txf->head;
+ s->server_tx_fifo->tail = txf->tail;
+ s->server_tx_fifo->cursize = txf->cursize;
+ }
+
+ segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
+
+ return 0;
+}
+
/**
* Start listening local transport endpoint for requested transport.
*
@@ -890,6 +932,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app)
}
segment_manager_t *
+app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
+{
+ if (app_wrk->connects_seg_manager == (u32) ~ 0)
+ app_worker_alloc_connects_segment_manager (app_wrk);
+ return segment_manager_get (app_wrk->connects_seg_manager);
+}
+
+segment_manager_t *
app_worker_get_listen_segment_manager (app_worker_t * app,
stream_session_t * listener)
{
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index e33f2ff797e..1d2064df62e 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk);
app_worker_t *app_worker_get (u32 wrk_index);
app_worker_t *app_worker_get_if_valid (u32 wrk_index);
application_t *app_worker_get_app (u32 wrk_index);
+int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s);
void app_worker_free (app_worker_t * app_wrk);
int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep,
u32 api_context);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
stream_session_t *);
segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
+segment_manager_t
+ * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *);
int app_worker_alloc_connects_segment_manager (app_worker_t * app);
int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle);
u32 app_worker_n_listeners (app_worker_t * app);
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index a156c82a745..9c48faa8abc 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -220,9 +220,9 @@ typedef struct session_bound_msg_
u8 lcl_is_ip4;
u8 lcl_ip[16];
u16 lcl_port;
- u64 rx_fifo;
- u64 tx_fifo;
- u64 vpp_evt_q;
+ uword rx_fifo;
+ uword tx_fifo;
+ uword vpp_evt_q;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
@@ -233,12 +233,12 @@ typedef struct session_accepted_msg_
u32 context;
u64 listener_handle;
u64 handle;
- u64 server_rx_fifo;
- u64 server_tx_fifo;
+ uword server_rx_fifo;
+ uword server_tx_fifo;
u64 segment_handle;
- u64 vpp_event_queue_address;
- u64 server_event_queue_address;
- u64 client_event_queue_address;
+ uword vpp_event_queue_address;
+ uword server_event_queue_address;
+ uword client_event_queue_address;
u16 port;
u8 is_ip4;
u8 ip[16];
@@ -260,12 +260,12 @@ typedef struct session_connected_msg_
u32 context;
i32 retval;
u64 handle;
- u64 server_rx_fifo;
- u64 server_tx_fifo;
+ uword server_rx_fifo;
+ uword server_tx_fifo;
u64 segment_handle;
- u64 vpp_event_queue_address;
- u64 client_event_queue_address;
- u64 server_event_queue_address;
+ uword vpp_event_queue_address;
+ uword client_event_queue_address;
+ uword server_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[64];
@@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_
u64 handle;
} __clib_packed session_reset_reply_msg_t;
+typedef struct session_req_worker_update_msg_
+{
+ u64 session_handle;
+} __clib_packed session_req_worker_update_msg_t;
+
+/* NOTE: using u16 for wrk indices because message needs to fit in 18B */
+typedef struct session_worker_update_msg_
+{
+ u32 client_index;
+ u16 wrk_index;
+ u16 req_wrk_index;
+ u64 handle;
+} __clib_packed session_worker_update_msg_t;
+
+typedef struct session_worker_update_reply_msg_
+{
+ u64 handle;
+ uword rx_fifo;
+ uword tx_fifo;
+ u64 segment_handle;
+} __clib_packed session_worker_update_reply_msg_t;
+
typedef struct app_session_event_
{
svm_msg_q_msg_t msg;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e3c73000edf..cf1b3e99f4d 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -49,7 +49,10 @@ typedef enum
SESSION_CTRL_EVT_DISCONNECTED,
SESSION_CTRL_EVT_DISCONNECTED_REPLY,
SESSION_CTRL_EVT_RESET,
- SESSION_CTRL_EVT_RESET_REPLY
+ SESSION_CTRL_EVT_RESET_REPLY,
+ SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
+ SESSION_CTRL_EVT_WORKER_UPDATE,
+ SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
} session_evt_type_t;
static inline const char *
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 98965f334af..880f16388b8 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data)
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+ evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
rmp = (session_disconnected_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->context = mp->context;
@@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data)
}
}
+static void
+session_mq_worker_update_handler (void *data)
+{
+ session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+ session_worker_update_reply_msg_t *rmp;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ app_worker_t *app_wrk;
+ u32 owner_app_wrk_map;
+ session_event_t *evt;
+ stream_session_t *s;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("invalid handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("app %u does not own session %llu", app->app_index,
+ mp->handle);
+ return;
+ }
+ owner_app_wrk_map = app_wrk->wrk_map_index;
+ app_wrk = application_get_worker (app, mp->wrk_index);
+
+ /* This needs to come from the new owner */
+ if (mp->req_wrk_index == owner_app_wrk_map)
+ {
+ session_req_worker_update_msg_t *wump;
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+ wump = (session_req_worker_update_msg_t *) evt->data;
+ wump->session_handle = mp->handle;
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+ return;
+ }
+
+ app_worker_own_session (app_wrk, s);
+
+ /*
+ * Send reply
+ */
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+ rmp = (session_worker_update_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+ rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+ rmp->segment_handle = session_segment_handle (s);
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+ /*
+ * Retransmit messages that may have been lost
+ */
+ if (!svm_fifo_is_empty (s->server_tx_fifo))
+ session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+ if (!svm_fifo_is_empty (s->server_rx_fifo))
+ app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ app->cb_fns.session_disconnect_callback (s);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
@@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
case SESSION_CTRL_EVT_RESET_REPLY:
session_mq_reset_reply_handler (e->data);
break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (e->data);
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}