summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2024-12-30 20:40:51 +0100
committerMatus Fabian <matfabia@cisco.com>2024-12-30 20:42:37 +0100
commite210d413d53081f8603af16f203fb92e08127cdc (patch)
treeb60aac85839a9339011e815b7192b204127caa73
parentd11d31653d948c9f3ac80afd69f111657b007f0f (diff)
session: proxy session migration fix
Type: fix Change-Id: I487ee4e69d8885f46d7a4af2c66a710da66108c5 Signed-off-by: Matus Fabian <matfabia@cisco.com>
-rw-r--r--extras/hs-test/infra/suite_vpp_udp_proxy.go9
-rw-r--r--extras/hs-test/proxy_test.go23
-rw-r--r--src/plugins/hs_apps/proxy.c53
-rw-r--r--src/vnet/session/application_worker.c5
-rw-r--r--src/vnet/session/session.c13
5 files changed, 82 insertions, 21 deletions
diff --git a/extras/hs-test/infra/suite_vpp_udp_proxy.go b/extras/hs-test/infra/suite_vpp_udp_proxy.go
index 6a65a0be5ff..2290aeec6a2 100644
--- a/extras/hs-test/infra/suite_vpp_udp_proxy.go
+++ b/extras/hs-test/infra/suite_vpp_udp_proxy.go
@@ -63,6 +63,11 @@ func (s *VppUdpProxySuite) SetupTest() {
s.Interfaces.Server.Ip4AddressString(),
s.Interfaces.Server.HwAddress)
vpp.Vppctl(arp)
+ arp = fmt.Sprintf("set ip neighbor %s %s %s",
+ s.Interfaces.Client.Peer.Name(),
+ s.Interfaces.Client.Ip4AddressString(),
+ s.Interfaces.Client.HwAddress)
+ vpp.Vppctl(arp)
if *DryRun {
s.LogStartedContainers()
@@ -127,7 +132,7 @@ func (s *VppUdpProxySuite) ClientSendReceive(toSend []byte, rcvBuffer []byte) (i
}
defer proxiedConn.Close()
- err = proxiedConn.SetReadDeadline(time.Now().Add(time.Second * 5))
+ err = proxiedConn.SetDeadline(time.Now().Add(time.Second * 5))
if err != nil {
return 0, err
}
@@ -173,7 +178,7 @@ var _ = Describe("VppUdpProxySuite", Ordered, ContinueOnFailure, func() {
}
})
-var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, func() {
+var _ = Describe("VppUdpProxySuiteSolo", Ordered, ContinueOnFailure, Serial, func() {
var s VppUdpProxySuite
BeforeAll(func() {
s.SetupSuite()
diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go
index 3afdc3103a3..d371de46cbb 100644
--- a/extras/hs-test/proxy_test.go
+++ b/extras/hs-test/proxy_test.go
@@ -25,6 +25,7 @@ func init() {
RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest,
VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest)
RegisterVppUdpProxyTests(VppProxyUdpTest)
+ RegisterVppUdpProxySoloTests(VppProxyUdpMigrationMTTest)
RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest)
RegisterNginxProxyTests(NginxMirroringTest)
RegisterNginxProxySoloTests(MirrorMultiThreadTest)
@@ -350,3 +351,25 @@ func VppProxyUdpTest(s *VppUdpProxySuite) {
s.AssertNil(err, fmt.Sprint(err))
s.AssertEqual([]byte("hello"), b[:n])
}
+
+func VppProxyUdpMigrationMTTest(s *VppUdpProxySuite) {
+ remoteServerConn := s.StartEchoServer()
+ defer remoteServerConn.Close()
+
+ vppProxy := s.Containers.VppProxy.VppInstance
+ cmd := fmt.Sprintf("test proxy server fifo-size 512k server-uri udp://%s/%d", s.VppProxyAddr(), s.ProxyPort())
+ cmd += fmt.Sprintf(" client-uri udp://%s/%d", s.ServerAddr(), s.ServerPort())
+ s.Log(vppProxy.Vppctl(cmd))
+
+ b := make([]byte, 1500)
+
+ n, err := s.ClientSendReceive([]byte("hello"), b)
+ s.AssertNil(err, fmt.Sprint(err))
+ s.AssertEqual([]byte("hello"), b[:n])
+
+ n, err = s.ClientSendReceive([]byte("world"), b)
+ s.AssertNil(err, fmt.Sprint(err))
+ s.AssertEqual([]byte("world"), b[:n])
+
+ s.Log(s.Containers.VppProxy.VppInstance.Vppctl("show session verbose 2"))
+}
diff --git a/src/plugins/hs_apps/proxy.c b/src/plugins/hs_apps/proxy.c
index 9d5b94959fb..82b904f9f1b 100644
--- a/src/plugins/hs_apps/proxy.c
+++ b/src/plugins/hs_apps/proxy.c
@@ -26,6 +26,14 @@ proxy_main_t proxy_main;
#define TCP_MSS 1460
+#define PROXY_DEBUG 0
+
+#if PROXY_DEBUG
+#define PROXY_DBG(_fmt, _args...) clib_warning (_fmt, ##_args)
+#else
+#define PROXY_DBG(_fmt, _args...)
+#endif
+
static proxy_session_side_ctx_t *
proxy_session_side_ctx_alloc (proxy_worker_t *wrk)
{
@@ -200,6 +208,8 @@ proxy_session_postponed_free_rpc (void *arg)
clib_spinlock_lock_if_init (&pm->sessions_lock);
+ PROXY_DBG ("[%u] ps %u postponed free", vlib_get_thread_index (), ps_index);
+
ps = proxy_session_get (ps_index);
segment_manager_dealloc_fifos (ps->po.rx_fifo, ps->po.tx_fifo);
proxy_session_free (ps);
@@ -262,6 +272,9 @@ proxy_try_close_session (session_t * s, int is_active_open)
wrk = proxy_worker_get (s->thread_index);
sc = proxy_session_side_ctx_get (wrk, s->opaque);
+ PROXY_DBG ("[%u] ps %u close (is ao %u)", vlib_get_thread_index (),
+ sc->ps_index, is_active_open);
+
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (sc->ps_index);
@@ -304,6 +317,8 @@ proxy_try_side_ctx_cleanup (session_t *s)
if (sc->state == PROXY_SC_S_CREATED)
return;
+ PROXY_DBG ("[%u] ps %u side ctx cleanup", vlib_get_thread_index (),
+ sc->ps_index);
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (sc->ps_index);
@@ -330,6 +345,9 @@ proxy_try_delete_session (session_t * s, u8 is_active_open)
sc = proxy_session_side_ctx_get (wrk, s->opaque);
ps_index = sc->ps_index;
+ PROXY_DBG ("[%u] ps %u delete (is ao %u)", vlib_get_thread_index (),
+ sc->ps_index, is_active_open);
+
proxy_session_side_ctx_free (wrk, sc);
clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -436,6 +454,8 @@ proxy_accept_callback (session_t * s)
ps = proxy_session_alloc ();
+ PROXY_DBG ("[%u] ps %u new", vlib_get_thread_index (), ps->ps_index);
+
ps->po.session_handle = session_handle (s);
ps->po.rx_fifo = s->rx_fifo;
ps->po.tx_fifo = s->tx_fifo;
@@ -614,6 +634,8 @@ proxy_rx_callback (session_t *s)
if (sc->state == PROXY_SC_S_CREATED)
{
+ PROXY_DBG ("[%u] ps %u start connect", vlib_get_thread_index (),
+ sc->ps_index);
proxy_session_start_connect (sc, s);
sc->state = PROXY_SC_S_CONNECTING;
return 0;
@@ -719,6 +741,8 @@ active_open_alloc_session_fifos (session_t *s)
svm_fifo_t *rxf, *txf;
proxy_session_t *ps;
+ PROXY_DBG ("[%u] ps %u ao alloc fifos", vlib_get_thread_index (), s->opaque);
+
clib_spinlock_lock_if_init (&pm->sessions_lock);
/* Active open opaque is pointing at proxy session */
@@ -788,6 +812,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
/* Connection failed */
if (err)
{
+ PROXY_DBG ("[%u] ps %u connect failed: %d", vlib_get_thread_index (),
+ opaque, err);
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (opaque);
@@ -806,6 +832,8 @@ active_open_connected_callback (u32 app_index, u32 opaque,
return 0;
}
+ PROXY_DBG ("[%u] ps %u connected", vlib_get_thread_index (), opaque);
+
wrk = proxy_worker_get (s->thread_index);
clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -867,6 +895,9 @@ active_open_migrate_po_fixup_rpc (void *arg)
proxy_session_t *ps;
session_t *po_s;
+ PROXY_DBG ("[%u] ps %u migrate (po fixup)", vlib_get_thread_index (),
+ ps_index);
+
wrk = proxy_worker_get (vlib_get_thread_index ());
clib_spinlock_lock_if_init (&pm->sessions_lock);
@@ -874,8 +905,6 @@ active_open_migrate_po_fixup_rpc (void *arg)
ps = proxy_session_get (ps_index);
po_s = session_get_from_handle (ps->po.session_handle);
- po_s->rx_fifo = ps->po.rx_fifo;
- po_s->tx_fifo = ps->po.tx_fifo;
po_sc = proxy_session_side_ctx_get (wrk, po_s->opaque);
po_sc->pair = ps->ao;
@@ -896,6 +925,9 @@ active_open_migrate_rpc (void *arg)
proxy_session_t *ps;
session_t *s;
+ PROXY_DBG ("[%u] ps %u migrate (alloc new sc)", vlib_get_thread_index (),
+ ps_index);
+
wrk = proxy_worker_get (vlib_get_thread_index ());
sc = proxy_session_side_ctx_alloc (wrk);
@@ -908,15 +940,6 @@ active_open_migrate_rpc (void *arg)
s->opaque = sc->sc_index;
s->flags &= ~SESSION_F_IS_MIGRATING;
- /* Fixup passive open session because of migration and zc */
- ps->ao.rx_fifo = ps->po.tx_fifo = s->rx_fifo;
- ps->ao.tx_fifo = ps->po.rx_fifo = s->tx_fifo;
-
- ps->po.tx_fifo->shr->master_session_index =
- session_index_from_handle (ps->po.session_handle);
- ps->po.tx_fifo->master_thread_index =
- session_thread_from_handle (ps->po.session_handle);
-
sc->pair = ps->po;
clib_spinlock_unlock_if_init (&pm->sessions_lock);
@@ -937,14 +960,18 @@ active_open_migrate_callback (session_t *s, session_handle_t new_sh)
wrk = proxy_worker_get (s->thread_index);
sc = proxy_session_side_ctx_get (wrk, s->opaque);
+ PROXY_DBG ("[%u] ps %u migrate (free sc)", vlib_get_thread_index (),
+ sc->ps_index);
+
/* NOTE: this is just an example. ZC makes this migration rather
* tedious. Probably better approaches could be found */
clib_spinlock_lock_if_init (&pm->sessions_lock);
ps = proxy_session_get (sc->ps_index);
ps->ao.session_handle = new_sh;
- ps->ao.rx_fifo = 0;
- ps->ao.tx_fifo = 0;
+ ps->ao.tx_fifo->shr->master_session_index =
+ session_index_from_handle (new_sh);
+ ps->ao.tx_fifo->master_thread_index = session_thread_from_handle (new_sh);
clib_spinlock_unlock_if_init (&pm->sessions_lock);
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index f056aad6c3c..cae340cd64e 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -455,7 +455,10 @@ app_worker_init_connected (app_worker_t * app_wrk, session_t * s)
/* Allocate fifos for session, unless the app is a builtin proxy */
if (application_is_builtin_proxy (app))
- return app->cb_fns.proxy_alloc_session_fifos (s);
+ {
+ s->flags |= SESSION_F_PROXY;
+ return app->cb_fns.proxy_alloc_session_fifos (s);
+ }
sm = app_worker_get_connect_segment_manager (app_wrk);
return app_worker_alloc_session_fifos (sm, s);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index be2c5dc4df7..cc0e89fd1e2 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1030,10 +1030,13 @@ session_switch_pool (void *cb_args)
if (!app_wrk)
goto app_closed;
- /* Cleanup fifo segment slice state for fifos */
- sm = app_worker_get_connect_segment_manager (app_wrk);
- segment_manager_detach_fifo (sm, &s->rx_fifo);
- segment_manager_detach_fifo (sm, &s->tx_fifo);
+ if (!(s->flags & SESSION_F_PROXY))
+ {
+ /* Cleanup fifo segment slice state for fifos */
+ sm = app_worker_get_connect_segment_manager (app_wrk);
+ segment_manager_detach_fifo (sm, &s->rx_fifo);
+ segment_manager_detach_fifo (sm, &s->tx_fifo);
+ }
/* Check if session closed during migration */
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
@@ -1079,7 +1082,7 @@ session_dgram_connect_notify (transport_connection_t * tc,
session_lookup_add_connection (tc, session_handle (new_s));
app_wrk = app_worker_get_if_valid (new_s->app_wrk_index);
- if (app_wrk)
+ if (app_wrk && !(new_s->flags & SESSION_F_PROXY))
{
/* New set of fifos attached to the same shared memory */
sm = app_worker_get_connect_segment_manager (app_wrk);