diff options
author | Matus Fabian <matfabia@cisco.com> | 2024-12-30 20:40:51 +0100 |
---|---|---|
committer | Matus Fabian <matfabia@cisco.com> | 2024-12-30 20:42:37 +0100 |
commit | e210d413d53081f8603af16f203fb92e08127cdc (patch) | |
tree | b60aac85839a9339011e815b7192b204127caa73 | |
parent | d11d31653d948c9f3ac80afd69f111657b007f0f (diff) |
session: proxy session migration fix
Type: fix
Change-Id: I487ee4e69d8885f46d7a4af2c66a710da66108c5
Signed-off-by: Matus Fabian <matfabia@cisco.com>
-rw-r--r-- | extras/hs-test/infra/suite_vpp_udp_proxy.go | 9 | ||||
-rw-r--r-- | extras/hs-test/proxy_test.go | 23 | ||||
-rw-r--r-- | src/plugins/hs_apps/proxy.c | 53 | ||||
-rw-r--r-- | src/vnet/session/application_worker.c | 5 | ||||
-rw-r--r-- | src/vnet/session/session.c | 13 |
5 files changed, 82 insertions, 21 deletions
diff --git a/extras/hs-test/infra/suite_vpp_udp_proxy.go b/extras/hs-test/infra/suite_vpp_udp_proxy.go index 6a65a0be5ff..2290aeec6a2 100644 --- a/extras/hs-test/infra/suite_vpp_udp_proxy.go +++ b/extras/hs-test/infra/suite_vpp_udp_proxy.go @@ -63,6 +63,11 @@ func (s *VppUdpProxySuite) SetupTest() { s.Interfaces.Server.Ip4AddressString(), s.Interfaces.Server.HwAddress) vpp.Vppctl(arp) + arp = fmt.Sprintf("set ip neighbor %s %s %s", + s.Interfaces.Client.Peer.Name(), + s.Interfaces.Client.Ip4AddressString(), + s.Interfaces.Client.HwAddress) + vpp.Vppctl(arp) if *DryRun { s.LogStartedContainers() @@ -127,7 +132,7 @@ func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (i } defer proxiedConn.Close() - err = proxiedConn.SetReadDeadline(time.Now().Add(time.Second * 5)) + err = proxiedConn.SetDeadline(time.Now().Add(time.Second * 5)) if err != nil { return 0, err } @@ -173,7 +178,7 @@ var _ = Describe("VppUdpProxySuite", Ordered, ContinueOnFailure, func() { } }) -var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, func() { +var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, Serial, func() { var s VppUdpProxySuite BeforeAll(func() { s.SetupSuite() diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go index 3afdc3103a3..d371de46cbb 100644 --- a/extras/hs-test/proxy_test.go +++ b/extras/hs-test/proxy_test.go @@ -25,6 +25,7 @@ func init() { RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest, VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest) RegisterVppUdpProxyTests(VppProxyUdpTest) + RegisterVppUdpProxySoloTests(VppProxyUdpMigrationMTTest) RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest) RegisterNginxProxyTests(NginxMirroringTest) RegisterNginxProxySoloTests(MirrorMultiThreadTest) @@ -350,3 +351,25 @@ func VppProxyUdpTest(s *VppUdpProxySuite) { s.AssertNil(err, fmt.Sprint(err)) s.AssertEqual([]byte("hello"), b[:n]) } + +func VppProxyUdpMigrationMTTest(s *VppUdpProxySuite) { + remoteServerConn := s.StartEchoServer() + defer remoteServerConn.Close() + + vppProxy := s.Containers.VppProxy.VppInstance + cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri udp://%s/%d", s.VppProxyAddr(), s.ProxyPort()) + cmd += fmt.Sprintf(" client-uri udp://%s/%d", s.ServerAddr(), s.ServerPort()) + s.Log(vppProxy.Vppctl(cmd)) + + b := make([]byte, 1500) + + n, err := s.ClientSendReceive([]byte("hello"), b) + s.AssertNil(err, fmt.Sprint(err)) + s.AssertEqual([]byte("hello"), b[:n]) + + n, err = s.ClientSendReceive([]byte("world"), b) + s.AssertNil(err, fmt.Sprint(err)) + s.AssertEqual([]byte("world"), b[:n]) + + s.Log(s.Containers.VppProxy.VppInstance.Vppctl("show session verbose 2")) +} diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c index 9d5b94959fb..82b904f9f1b 100644 --- a/src/plugins/hs_apps/proxy.c +++ b/src/plugins/hs_apps/proxy.c @@ -26,6 +26,14 @@ proxy_main_t proxy_main; #define TCP_MSS 1460 +#define PROXY_DEBUG 0 + +#if PROXY_DEBUG +#define PROXY_DBG(_fmt, _args...) clib_warning (_fmt, ##_args) +#else +#define PROXY_DBG(_fmt, _args...) +#endif + static proxy_session_side_ctx_t * proxy_session_side_ctx_alloc (proxy_worker_t *wrk) { @@ -200,6 +208,8 @@ proxy_session_postponed_free_rpc (void *arg) clib_spinlock_lock_if_init (&pm->sessions_lock); + PROXY_DBG ("[%u] ps %u postponed free", vlib_get_thread_index (), ps_index); + ps = proxy_session_get (ps_index); segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo); proxy_session_free (ps); @@ -262,6 +272,9 @@ proxy_try_close_session (session_t * s, int is_active_open) wrk = proxy_worker_get (s->thread_index); sc = proxy_session_side_ctx_get (wrk, s->opaque); + PROXY_DBG ("[%u] ps %u close (is ao %u)", vlib_get_thread_index (), + sc->ps_index, is_active_open); + clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (sc->ps_index); @@ -304,6 +317,8 @@ proxy_try_side_ctx_cleanup (session_t *s) if (sc->state == PROXY_SC_S_CREATED) return; + PROXY_DBG ("[%u] ps %u side ctx cleanup", vlib_get_thread_index (), + sc->ps_index); clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (sc->ps_index); @@ -330,6 +345,9 @@ proxy_try_delete_session (session_t * s, u8 is_active_open) sc = proxy_session_side_ctx_get (wrk, s->opaque); ps_index = sc->ps_index; + PROXY_DBG ("[%u] ps %u delete (is ao %u)", vlib_get_thread_index (), + sc->ps_index, is_active_open); + proxy_session_side_ctx_free (wrk, sc); clib_spinlock_lock_if_init (&pm->sessions_lock); @@ -436,6 +454,8 @@ proxy_accept_callback (session_t * s) ps = proxy_session_alloc (); + PROXY_DBG ("[%u] ps %u new", vlib_get_thread_index (), ps->ps_index); + ps->po.session_handle = session_handle (s); ps->po.rx_fifo = s->rx_fifo; ps->po.tx_fifo = s->tx_fifo; @@ -614,6 +634,8 @@ proxy_rx_callback (session_t *s) if (sc->state == PROXY_SC_S_CREATED) { + PROXY_DBG ("[%u] ps %u start connect", vlib_get_thread_index (), + sc->ps_index); proxy_session_start_connect (sc, s); sc->state = PROXY_SC_S_CONNECTING; return 0; @@ -719,6 +741,8 @@ active_open_alloc_session_fifos (session_t *s) svm_fifo_t *rxf, *txf; proxy_session_t *ps; + PROXY_DBG ("[%u] ps %u ao alloc fifos", vlib_get_thread_index (), s->opaque); + clib_spinlock_lock_if_init (&pm->sessions_lock); /* Active open opaque is pointing at proxy session */ @@ -788,6 +812,8 @@ active_open_connected_callback (u32 app_index, u32 opaque, /* Connection failed */ if (err) { + PROXY_DBG ("[%u] ps %u connect failed: %d", vlib_get_thread_index (), + opaque, err); clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (opaque); @@ -806,6 +832,8 @@ active_open_connected_callback (u32 app_index, u32 opaque, return 0; } + PROXY_DBG ("[%u] ps %u connected", vlib_get_thread_index (), opaque); + wrk = proxy_worker_get (s->thread_index); clib_spinlock_lock_if_init (&pm->sessions_lock); @@ -867,6 +895,9 @@ active_open_migrate_po_fixup_rpc (void *arg) proxy_session_t *ps; session_t *po_s; + PROXY_DBG ("[%u] ps %u migrate (po fixup)", vlib_get_thread_index (), + ps_index); + wrk = proxy_worker_get (vlib_get_thread_index ()); clib_spinlock_lock_if_init (&pm->sessions_lock); @@ -874,8 +905,6 @@ active_open_migrate_po_fixup_rpc (void *arg) ps = proxy_session_get (ps_index); po_s = session_get_from_handle (ps->po.session_handle); - po_s->rx_fifo = ps->po.rx_fifo; - po_s->tx_fifo = ps->po.tx_fifo; po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque); po_sc->pair = ps->ao; @@ -896,6 +925,9 @@ active_open_migrate_rpc (void *arg) proxy_session_t *ps; session_t *s; + PROXY_DBG ("[%u] ps %u migrate (alloc new sc)", vlib_get_thread_index (), + ps_index); + wrk = proxy_worker_get (vlib_get_thread_index ()); sc = proxy_session_side_ctx_alloc (wrk); @@ -908,15 +940,6 @@ active_open_migrate_rpc (void *arg) s->opaque = sc->sc_index; s->flags &= ~SESSION_F_IS_MIGRATING; - /* Fixup passive open session because of migration and zc */ - ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo; - ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo; - - ps->po.tx_fifo->shr->master_session_index = - session_index_from_handle (ps->po.session_handle); - ps->po.tx_fifo->master_thread_index = - session_thread_from_handle (ps->po.session_handle); - sc->pair = ps->po; clib_spinlock_unlock_if_init (&pm->sessions_lock); @@ -937,14 +960,18 @@ active_open_migrate_callback (session_t *s, session_handle_t new_sh) wrk = proxy_worker_get (s->thread_index); sc = proxy_session_side_ctx_get (wrk, s->opaque); + PROXY_DBG ("[%u] ps %u migrate (free sc)", vlib_get_thread_index (), + sc->ps_index); + /* NOTE: this is just an example. ZC makes this migration rather * tedious. Probably better approaches could be found */ clib_spinlock_lock_if_init (&pm->sessions_lock); ps = proxy_session_get (sc->ps_index); ps->ao.session_handle = new_sh; - ps->ao.rx_fifo = 0; - ps->ao.tx_fifo = 0; + ps->ao.tx_fifo->shr->master_session_index = + session_index_from_handle (new_sh); + ps->ao.tx_fifo->master_thread_index = session_thread_from_handle (new_sh); clib_spinlock_unlock_if_init (&pm->sessions_lock); diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index f056aad6c3c..cae340cd64e 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -455,7 +455,10 @@ app_worker_init_connected (app_worker_t * app_wrk, session_t * s) /* Allocate fifos for session, unless the app is a builtin proxy */ if (application_is_builtin_proxy (app)) - return app->cb_fns.proxy_alloc_session_fifos (s); + { + s->flags |= SESSION_F_PROXY; + return app->cb_fns.proxy_alloc_session_fifos (s); + } sm = app_worker_get_connect_segment_manager (app_wrk); return app_worker_alloc_session_fifos (sm, s); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index be2c5dc4df7..cc0e89fd1e2 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1030,10 +1030,13 @@ session_switch_pool (void *cb_args) if (!app_wrk) goto app_closed; - /* Cleanup fifo segment slice state for fifos */ - sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_detach_fifo (sm, &s->rx_fifo); - segment_manager_detach_fifo (sm, &s->tx_fifo); + if (!(s->flags & SESSION_F_PROXY)) + { + /* Cleanup fifo segment slice state for fifos */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_detach_fifo (sm, &s->rx_fifo); + segment_manager_detach_fifo (sm, &s->tx_fifo); + } /* Check if session closed during migration */ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) @@ -1079,7 +1082,7 @@ session_dgram_connect_notify (transport_connection_t * tc, session_lookup_add_connection (tc, session_handle (new_s)); app_wrk = app_worker_get_if_valid (new_s->app_wrk_index); - if (app_wrk) + if (app_wrk && !(new_s->flags & SESSION_F_PROXY)) { /* New set of fifos attached to the same shared memory */ sm = app_worker_get_connect_segment_manager (app_wrk); |