aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session.c
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2018-08-12 23:50:53 -0700
committerDamjan Marion <dmarion@me.com>2018-08-24 19:05:25 +0000
commit1553197f9a1a3258b6954adeb9536bbe0191683d (patch)
tree91f26e2e23da3971741238194474d5e5ab0ae792 /src/vnet/session/session.c
parentf8b8586b699bae9e786726f2697c3e642d904c61 (diff)
session: add support for multiple app workers
Refactor session layer to support multiple workers per application. Change-Id: Ie67354688d396449d14bbbb8c56050206e307cd8 Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r--src/vnet/session/session.c92
1 files changed, 53 insertions, 39 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 9790ec25edd..57ac384a519 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -488,12 +488,12 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
static inline int
session_enqueue_notify (stream_session_t * s, u8 lock)
{
- application_t *app;
+ app_worker_t *app;
- app = application_get_if_valid (s->app_index);
+ app = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app))
{
- TCP_DBG ("invalid s->app_index = %d", s->app_index);
+ SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
return 0;
}
@@ -513,9 +513,9 @@ session_enqueue_notify (stream_session_t * s, u8 lock)
int
session_dequeue_notify (stream_session_t * s)
{
- application_t *app;
+ app_worker_t *app;
- app = application_get_if_valid (s->app_index);
+ app = app_worker_get_if_valid (s->app_wrk_index);
if (PREDICT_FALSE (!app))
return -1;
@@ -596,6 +596,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
u32 opaque = 0, new_ti, new_si;
stream_session_t *new_s = 0;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
application_t *app;
u8 alloc_fifos;
int error = 0;
@@ -615,17 +616,18 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
/* Get the app's index from the handle we stored when opening connection
* and the opaque (api_context for external apps) from transport session
* index */
- app = application_get_if_valid (handle >> 32);
- if (!app)
+ app_wrk = app_worker_get_if_valid (handle >> 32);
+ if (!app_wrk)
return -1;
opaque = tc->s_index;
+ app = application_get (app_wrk->app_index);
/*
* Allocate new session with fifos (svm segments are allocated if needed)
*/
if (!is_fail)
{
- sm = application_get_connect_segment_manager (app);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
alloc_fifos = !application_is_builtin_proxy (app);
if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
{
@@ -634,7 +636,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
}
else
{
- new_s->app_index = app->index;
+ new_s->app_wrk_index = app_wrk->wrk_index;
new_si = new_s->session_index;
new_ti = new_s->thread_index;
}
@@ -643,8 +645,8 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
/*
* Notify client application
*/
- if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
- is_fail))
+ if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
+ new_s, is_fail))
{
SESSION_DBG ("failed to notify app");
if (!is_fail)
@@ -731,12 +733,16 @@ session_dgram_connect_notify (transport_connection_t * tc,
void
stream_session_accept_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
- server = application_get (s->app_index);
- server->cb_fns.session_accept_callback (s);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
/**
@@ -749,14 +755,17 @@ stream_session_accept_notify (transport_connection_t * tc)
void
stream_session_disconnect_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
s->session_state = SESSION_STATE_CLOSING;
- server = application_get_if_valid (s->app_index);
- if (server)
- server->cb_fns.session_disconnect_callback (s);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_disconnect_callback (s);
}
/**
@@ -806,10 +815,12 @@ void
stream_session_reset_notify (transport_connection_t * tc)
{
stream_session_t *s;
+ app_worker_t *app_wrk;
application_t *app;
s = session_get (tc->s_index, tc->thread_index);
s->session_state = SESSION_STATE_CLOSED;
- app = application_get (s->app_index);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
app->cb_fns.session_reset_callback (s);
}
@@ -820,38 +831,40 @@ int
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 notify)
{
- application_t *server;
stream_session_t *s, *listener;
+ app_worker_t *app_wrk;
segment_manager_t *sm;
int rv;
/* Find the server */
listener = listen_session_get (listener_index);
- server = application_get (listener->app_index);
+ app_wrk = app_worker_get (listener->app_wrk_index);
- sm = application_get_listen_segment_manager (server, listener);
+ sm = app_worker_get_listen_segment_manager (app_wrk, listener);
if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
return rv;
- s->app_index = server->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->listener_index = listener_index;
s->session_state = SESSION_STATE_ACCEPTING;
/* Shoulder-tap the server */
if (notify)
{
- server->cb_fns.session_accept_callback (s);
+ application_t *app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
return 0;
}
int
-session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
stream_session_t *s;
application_t *app;
int rv;
@@ -868,22 +881,23 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
/* For dgram type of service, allocate session and fifos now.
*/
- app = application_get (app_index);
- sm = application_get_connect_segment_manager (app);
+ app_wrk = app_worker_get (app_wrk_index);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
if (session_alloc_and_init (sm, tc, 1, &s))
return -1;
- s->app_index = app->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->session_state = SESSION_STATE_OPENED;
/* Tell the app about the new event fifo for this session */
- app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0);
return 0;
}
int
-session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
@@ -907,7 +921,7 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
* is needed when the connect notify comes and we have to notify the
* external app
*/
- handle = (((u64) app_index) << 32) | (u64) tc->c_index;
+ handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
session_lookup_add_half_open (tc, handle);
/* Store api_context (opaque) for when the reply comes. Not the nicest
@@ -918,10 +932,10 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
}
int
-session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt;
- sep->app_index = app_index;
+ sep->app_wrk_index = app_wrk_index;
sep->opaque = opaque;
return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep);
@@ -951,10 +965,10 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
* on open completion.
*/
int
-session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
- return session_open_srv_fns[tst] (app_index, rmt, opaque);
+ return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
}
int
@@ -988,7 +1002,7 @@ int
session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
{
transport_connection_t *tc;
- application_t *server;
+ app_worker_t *server;
segment_manager_t *sm;
u32 tci;
@@ -1008,8 +1022,8 @@ session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
if (tc == 0)
return -1;
- server = application_get (s->app_index);
- sm = application_get_listen_segment_manager (server, s);
+ server = app_worker_get (s->app_wrk_index);
+ sm = app_worker_get_listen_segment_manager (server, s);
if (session_alloc_fifos (sm, s))
return -1;
@@ -1023,7 +1037,7 @@ session_listen_app (stream_session_t * s, session_endpoint_t * sep)
{
session_endpoint_extended_t esep;
clib_memcpy (&esep, sep, sizeof (*sep));
- esep.app_index = s->app_index;
+ esep.app_wrk_index = s->app_wrk_index;
return tp_vfts[sep->transport_proto].bind (s->session_index,
(transport_endpoint_t *) & esep);