diff options
author | Florin Coras <fcoras@cisco.com> | 2018-08-12 23:50:53 -0700 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2018-08-24 19:05:25 +0000 |
commit | 1553197f9a1a3258b6954adeb9536bbe0191683d (patch) | |
tree | 91f26e2e23da3971741238194474d5e5ab0ae792 /src/vnet/session/session.c | |
parent | f8b8586b699bae9e786726f2697c3e642d904c61 (diff) |
session: add support for multiple app workers
Refactor session layer to support multiple workers per application.
Change-Id: Ie67354688d396449d14bbbb8c56050206e307cd8
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r-- | src/vnet/session/session.c | 92 |
1 files changed, 53 insertions, 39 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 9790ec25edd..57ac384a519 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -488,12 +488,12 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) static inline int session_enqueue_notify (stream_session_t * s, u8 lock) { - application_t *app; + app_worker_t *app; - app = application_get_if_valid (s->app_index); + app = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app)) { - TCP_DBG ("invalid s->app_index = %d", s->app_index); + SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index); return 0; } @@ -513,9 +513,9 @@ session_enqueue_notify (stream_session_t * s, u8 lock) int session_dequeue_notify (stream_session_t * s) { - application_t *app; + app_worker_t *app; - app = application_get_if_valid (s->app_index); + app = app_worker_get_if_valid (s->app_wrk_index); if (PREDICT_FALSE (!app)) return -1; @@ -596,6 +596,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) u32 opaque = 0, new_ti, new_si; stream_session_t *new_s = 0; segment_manager_t *sm; + app_worker_t *app_wrk; application_t *app; u8 alloc_fifos; int error = 0; @@ -615,17 +616,18 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app = application_get_if_valid (handle >> 32); - if (!app) + app_wrk = app_worker_get_if_valid (handle >> 32); + if (!app_wrk) return -1; opaque = tc->s_index; + app = application_get (app_wrk->app_index); /* * Allocate new session with fifos (svm segments are allocated if needed) */ if (!is_fail) { - sm = application_get_connect_segment_manager (app); + sm = app_worker_get_connect_segment_manager (app_wrk); alloc_fifos = !application_is_builtin_proxy (app); if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s)) { @@ -634,7 +636,7 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) } else { - new_s->app_index = app->index; + new_s->app_wrk_index = app_wrk->wrk_index; new_si = new_s->session_index; new_ti = new_s->thread_index; } @@ -643,8 +645,8 @@ session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) /* * Notify client application */ - if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, - is_fail)) + if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, + new_s, is_fail)) { SESSION_DBG ("failed to notify app"); if (!is_fail) @@ -731,12 +733,16 @@ session_dgram_connect_notify (transport_connection_t * tc, void stream_session_accept_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); - server = application_get (s->app_index); - server->cb_fns.session_accept_callback (s); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } /** @@ -749,14 +755,17 @@ stream_session_accept_notify (transport_connection_t * tc) void stream_session_disconnect_notify (transport_connection_t * tc) { - application_t *server; + app_worker_t *app_wrk; + application_t *app; stream_session_t *s; s = session_get (tc->s_index, tc->thread_index); s->session_state = SESSION_STATE_CLOSING; - server = application_get_if_valid (s->app_index); - if (server) - server->cb_fns.session_disconnect_callback (s); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + app = application_get (app_wrk->app_index); + app->cb_fns.session_disconnect_callback (s); } /** @@ -806,10 +815,12 @@ void stream_session_reset_notify (transport_connection_t * tc) { stream_session_t *s; + app_worker_t *app_wrk; application_t *app; s = session_get (tc->s_index, tc->thread_index); s->session_state = SESSION_STATE_CLOSED; - app = application_get (s->app_index); + app_wrk = app_worker_get (s->app_wrk_index); + app = application_get (app_wrk->app_index); app->cb_fns.session_reset_callback (s); } @@ -820,38 +831,40 @@ int stream_session_accept (transport_connection_t * tc, u32 listener_index, u8 notify) { - application_t *server; stream_session_t *s, *listener; + app_worker_t *app_wrk; segment_manager_t *sm; int rv; /* Find the server */ listener = listen_session_get (listener_index); - server = application_get (listener->app_index); + app_wrk = app_worker_get (listener->app_wrk_index); - sm = application_get_listen_segment_manager (server, listener); + sm = app_worker_get_listen_segment_manager (app_wrk, listener); if ((rv = session_alloc_and_init (sm, tc, 1, &s))) return rv; - s->app_index = server->index; + s->app_wrk_index = app_wrk->wrk_index; s->listener_index = listener_index; s->session_state = SESSION_STATE_ACCEPTING; /* Shoulder-tap the server */ if (notify) { - server->cb_fns.session_accept_callback (s); + application_t *app = application_get (app_wrk->app_index); + app->cb_fns.session_accept_callback (s); } return 0; } int -session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; segment_manager_t *sm; + app_worker_t *app_wrk; stream_session_t *s; application_t *app; int rv; @@ -868,22 +881,23 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) /* For dgram type of service, allocate session and fifos now. */ - app = application_get (app_index); - sm = application_get_connect_segment_manager (app); + app_wrk = app_worker_get (app_wrk_index); + sm = app_worker_get_connect_segment_manager (app_wrk); if (session_alloc_and_init (sm, tc, 1, &s)) return -1; - s->app_index = app->index; + s->app_wrk_index = app_wrk->wrk_index; s->session_state = SESSION_STATE_OPENED; /* Tell the app about the new event fifo for this session */ - app->cb_fns.session_connected_callback (app->index, opaque, s, 0); + app = application_get (app_wrk->app_index); + app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0); return 0; } int -session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_t *tep; @@ -907,7 +921,7 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) * is needed when the connect notify comes and we have to notify the * external app */ - handle = (((u64) app_index) << 32) | (u64) tc->c_index; + handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index; session_lookup_add_half_open (tc, handle); /* Store api_context (opaque) for when the reply comes. Not the nicest @@ -918,10 +932,10 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) } int -session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt; - sep->app_index = app_index; + sep->app_wrk_index = app_wrk_index; sep->opaque = opaque; return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep); @@ -951,10 +965,10 @@ static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = { * on open completion. */ int -session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) +session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type; - return session_open_srv_fns[tst] (app_index, rmt, opaque); + return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque); } int @@ -988,7 +1002,7 @@ int session_listen_cl (stream_session_t * s, session_endpoint_t * sep) { transport_connection_t *tc; - application_t *server; + app_worker_t *server; segment_manager_t *sm; u32 tci; @@ -1008,8 +1022,8 @@ session_listen_cl (stream_session_t * s, session_endpoint_t * sep) if (tc == 0) return -1; - server = application_get (s->app_index); - sm = application_get_listen_segment_manager (server, s); + server = app_worker_get (s->app_wrk_index); + sm = app_worker_get_listen_segment_manager (server, s); if (session_alloc_fifos (sm, s)) return -1; @@ -1023,7 +1037,7 @@ session_listen_app (stream_session_t * s, session_endpoint_t * sep) { session_endpoint_extended_t esep; clib_memcpy (&esep, sep, sizeof (*sep)); - esep.app_index = s->app_index; + esep.app_wrk_index = s->app_wrk_index; return tp_vfts[sep->transport_proto].bind (s->session_index, (transport_endpoint_t *) & esep); |