diff options
author | hanlin <hanlin_wang@163.com> | 2020-07-13 11:09:15 +0800 |
---|---|---|
committer | Florin Coras <florin.coras@gmail.com> | 2020-08-12 01:57:18 +0000 |
commit | a3a489691d7f2813702fae2d915120743b860d62 (patch) | |
tree | 23a22a25ebe051be37244c45fc3a0732cc296e0d /src/vcl/vcl_locked.c | |
parent | 40c07ce7a78af69f7354222d4663a65cd5572049 (diff) |
vcl: support multi-threads with session migration
Currently, mutlti-threads only support one dispatch thread and multiple
worker threads, eventually only dispatch thread is a vcl worker and can
interact with epoll.
This patch will register all threads as vcl worker, and then each
thread can interact with epoll now. Moreover, session migration also
supported, such as socket created in thread A and used (bind, connect
and etc.) in thread B.
Type: feature
Signed-off-by: hanlin <hanlin_wang@163.com>
Change-Id: Iab0b43a33466968c1423d7d20faf1460c8589d91
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r-- | src/vcl/vcl_locked.c | 245 |
1 files changed, 229 insertions, 16 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index fc48618429b..02da8cd2cf5 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -31,6 +31,9 @@ typedef struct vcl_locked_session_ u32 worker_index; u32 vls_index; u32 shared_data_index; + /** VCL session owned by different workers because of migration */ + u32 owner_vcl_wrk_index; + uword *vcl_wrk_index_to_session_index; } vcl_locked_session_t; typedef struct vls_worker_ @@ -38,7 +41,6 @@ typedef struct vls_worker_ vcl_locked_session_t *vls_pool; uword *session_index_to_vlsh_table; u32 wrk_index; - volatile int rpc_done; } vls_worker_t; typedef struct vls_local_ @@ -68,6 +70,7 @@ vls_main_t *vlsm; typedef enum vls_rpc_msg_type_ { VLS_RPC_CLONE_AND_SHARE, + VLS_RPC_SESS_CLEANUP, } vls_rpc_msg_type_e; typedef struct vls_rpc_msg_ @@ -79,14 +82,35 @@ typedef struct vls_rpc_msg_ typedef struct vls_clone_and_share_msg_ { u32 vls_index; /**< vls to be shared */ - u32 origin_vls_wrk; /**< worker that initiated the rpc */ + u32 session_index; /**< vcl session to be shared */ + u32 origin_vls_wrk; /**< vls worker that initiated the rpc */ u32 origin_vls_index; /**< vls session of the originator */ + u32 origin_vcl_wrk; /**< vcl worker that initiated the rpc */ + u32 origin_session_index; /**< vcl session of the originator */ } vls_clone_and_share_msg_t; +typedef struct vls_sess_cleanup_msg_ +{ + u32 session_index; /**< vcl session to be cleaned */ + u32 origin_vcl_wrk; /**< worker that initiated the rpc */ +} vls_sess_cleanup_msg_t; + +void vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index); +void vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, + u32 session_index, u32 vls_wrk_index, + u32 dst_wrk_index, u32 dst_vls_index, + u32 dst_session_index); + + static inline u32 vls_get_worker_index (void) { - return vcl_get_worker_index (); + if (vls_mt_wrk_supported ()) + return vlsl->vls_wrk_index; + else + return vcl_get_worker_index (); } static u32 @@ -190,7 +214,10 @@ static void vls_mt_add (void) { vlsl->vls_mt_n_threads += 1; - vcl_set_worker_index (vlsl->vls_wrk_index); + if (vls_mt_wrk_supported ()) + vls_register_vcl_worker (); + else + vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void @@ -305,6 +332,12 @@ vls_alloc (vcl_session_handle_t sh) vls->shared_data_index = ~0; hash_set (wrk->session_index_to_vlsh_table, vls->session_index, vls->vls_index); + if (vls_mt_wrk_supported ()) + { + hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index, + vls->session_index); + vls->owner_vcl_wrk_index = vls->worker_index; + } clib_spinlock_init (&vls->lock); vls_table_wunlock (); @@ -556,7 +589,8 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) u32 n_subscribers; vcl_session_t *s; - ASSERT (vls->shared_data_index != ~0); + if (vls->shared_data_index == ~0) + return 0; s = vcl_session_get (wrk, vls->session_index); if (s->session_state == STATE_LISTEN) @@ -792,10 +826,76 @@ vls_mt_rel_locks (int locks_acq) vls_mt_create_unlock (); } +static void +vls_session_migrate (vcl_locked_session_t * vls) +{ + u32 wrk_index = vcl_get_worker_index (); + vcl_worker_t *wrk; + u32 src_sid, sid; + vcl_session_t *session; + uword *p; + + if (!vls_mt_wrk_supported ()) + return; + + if (PREDICT_TRUE (vls->worker_index == wrk_index)) + return; + if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index))) + { + vls->worker_index = wrk_index; + vls->session_index = (u32) p[0]; + return; + } + + /* migrate from orignal vls */ + if (!(p = hash_get (vls->vcl_wrk_index_to_session_index, + vls->owner_vcl_wrk_index))) + { + VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index); + ASSERT (0); + return; + } + + src_sid = (u32) p[0]; + wrk = vcl_worker_get_current (); + session = vcl_session_alloc (wrk); + sid = session->session_index; + vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (), + vls->owner_vcl_wrk_index, vls->vls_index, + src_sid); + session->session_index = sid; + vls->worker_index = wrk_index; + vls->session_index = sid; + hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid); + VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)", + vls->owner_vcl_wrk_index, src_sid, wrk_index, sid); + + if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0)) + { + /* TODO: rollback? */ + VERR ("can't migrate nonempty epoll session"); + ASSERT (0); + return; + } + else if (PREDICT_FALSE (!session->is_vep && + session->session_state != STATE_CLOSED)) + { + /* TODO: rollback? */ + VERR ("migrate NOT supported, session_status (%u)", + session->session_state); + ASSERT (0); + return; + } +} + #define vls_mt_guard(_vls, _op) \ int _locks_acq = 0; \ if (PREDICT_FALSE (vcl_get_worker_index () == ~0)) \ vls_mt_add (); \ + if (PREDICT_FALSE (_vls && vls_mt_wrk_supported () && \ + ((vcl_locked_session_t *)_vls)->worker_index != \ + vcl_get_worker_index ())) \ + vls_session_migrate (_vls); \ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \ vls_mt_acq_locks (_vls, _op, &_locks_acq); \ @@ -893,6 +993,7 @@ vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen) if (!(vls = vls_get_w_dlock (vlsh))) return VPPCOM_EBADFD; + vls_session_migrate (vls); rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen); vls_get_and_unlock (vlsh); return rv; @@ -948,6 +1049,9 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add) vcl_session_t *s; u32 owner_wrk; + if (vls_mt_wrk_supported ()) + return; + s = vcl_session_get (wrk, vls->session_index); switch (s->session_state) { @@ -1024,6 +1128,27 @@ vls_create (uint8_t proto, uint8_t is_nonblocking) return vlsh; } +static void +vls_migrate_session_close (vcl_locked_session_t * vls) +{ + u32 session_index, wrk_index; + vcl_worker_t *wrk = vcl_worker_get_current (); + + if (!vls_mt_wrk_supported ()) + return; + + /* *INDENT-OFF* */ + hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index, + ({ + if (vcl_get_worker_index () != wrk_index) + { + vls_send_session_cleanup_rpc (wrk, wrk_index, session_index); + } + })); + /* *INDENT-ON* */ + hash_free (vls->vcl_wrk_index_to_session_index); +} + int vls_close (vls_handle_t vlsh) { @@ -1039,13 +1164,15 @@ vls_close (vls_handle_t vlsh) return VPPCOM_EBADFD; } - vls_mt_guard (0, VLS_MT_OP_SPOOL); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) rv = vls_unshare_session (vls, vcl_worker_get_current ()); else rv = vppcom_session_close (vls_to_sh (vls)); + vls_migrate_session_close (vls); + vls_free (vls); vls_mt_unguard (); @@ -1101,6 +1228,7 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, vls_table_rlock (); ep_vls = vls_get_and_lock (ep_vlsh); vls = vls_get_and_lock (vlsh); + vls_session_migrate (ep_vls); ep_sh = vls_to_sh (ep_vls); sh = vls_to_sh (vls); @@ -1388,21 +1516,42 @@ vls_clone_and_share_rpc_handler (void *args) vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args; vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk; vcl_locked_session_t *vls, *dst_vls; - vcl_worker_t *dst_vcl_wrk; + vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk; vcl_session_t *s, *dst_s; vls = vls_session_get (wrk, msg->vls_index); - vls_init_share_session (wrk, vls); - s = vcl_session_get (vcl_worker_get_current (), vls->session_index); + if (!vls_mt_wrk_supported ()) + vls_init_share_session (wrk, vls); + + s = vcl_session_get (vcl_wrk, msg->session_index); dst_wrk = vls_worker_get (msg->origin_vls_wrk); - dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk); + dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk); dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index); dst_vls->shared_data_index = vls->shared_data_index; - dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index); + dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index); clib_memcpy (dst_s, s, sizeof (*s)); + dst_vcl_wrk->rpc_done = 1; + + VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)", + vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index, + msg->origin_session_index); +} + +static void +vls_session_cleanup_rpc_handler (void *args) +{ + vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args; + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk); + + vppcom_session_close (vcl_session_handle_from_index (msg->session_index)); + dst_wrk->rpc_done = 1; + + VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()", + wrk->wrk_index, msg->session_index, dst_wrk->wrk_index); } static void @@ -1414,29 +1563,68 @@ vls_rpc_handler (void *args) case VLS_RPC_CLONE_AND_SHARE: vls_clone_and_share_rpc_handler (msg->data); break; + case VLS_RPC_SESS_CLEANUP: + vls_session_cleanup_rpc_handler (msg->data); + break; default: break; } } void -vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls, - u32 dst_wrk_index, u32 dst_vls_index) +vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, u32 session_index, + u32 vls_wrk_index, u32 dst_wrk_index, + u32 dst_vls_index, u32 dst_session_index) { u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)]; vls_clone_and_share_msg_t *msg; vls_rpc_msg_t *rpc; + int ret; rpc = (vls_rpc_msg_t *) & data; rpc->type = VLS_RPC_CLONE_AND_SHARE; msg = (vls_clone_and_share_msg_t *) & rpc->data; - msg->origin_vls_wrk = wrk->wrk_index; + msg->origin_vls_wrk = vls_wrk_index; msg->origin_vls_index = vls->vls_index; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->origin_session_index = session_index; msg->vls_index = dst_vls_index; + msg->session_index = dst_session_index; + + wrk->rpc_done = 0; + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session clone of worker (session): %u (%u) -> %u (%u), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, + msg->origin_session_index, ret); + while (!ret && !wrk->rpc_done) + ; +} + +void +vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index) +{ + u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)]; + vls_sess_cleanup_msg_t *msg; + vls_rpc_msg_t *rpc; + int ret; + + rpc = (vls_rpc_msg_t *) & data; + rpc->type = VLS_RPC_SESS_CLEANUP; + msg = (vls_sess_cleanup_msg_t *) & rpc->data; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->session_index = dst_session_index; wrk->rpc_done = 0; - vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); - while (!wrk->rpc_done) + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session cleanup of worker (session): %u (%u) from %u (), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); + while (!ret && !wrk->rpc_done) ; } @@ -1470,6 +1658,31 @@ vls_use_eventfd (void) return vcm->cfg.use_mq_eventfd; } +unsigned char +vls_mt_wrk_supported (void) +{ + return vcm->cfg.mt_supported; +} + +int +vls_use_real_epoll (void) +{ + if (vcl_get_worker_index () == ~0) + return 0; + + return vcl_worker_get_current ()->vcl_needs_real_epoll; +} + +void +vls_register_vcl_worker (void) +{ + if (vppcom_worker_register () != VPPCOM_OK) + { + VERR ("failed to register worker"); + return; + } +} + /* * fd.io coding-style-patch-verification: ON * |