diff options
author | hanlin <hanlin_wang@163.com> | 2020-07-13 11:09:15 +0800 |
---|---|---|
committer | Florin Coras <florin.coras@gmail.com> | 2020-08-12 01:57:18 +0000 |
commit | a3a489691d7f2813702fae2d915120743b860d62 (patch) | |
tree | 23a22a25ebe051be37244c45fc3a0732cc296e0d /src/vcl | |
parent | 40c07ce7a78af69f7354222d4663a65cd5572049 (diff) |
vcl: support multi-threads with session migration
Currently, mutlti-threads only support one dispatch thread and multiple
worker threads, eventually only dispatch thread is a vcl worker and can
interact with epoll.
This patch will register all threads as vcl worker, and then each
thread can interact with epoll now. Moreover, session migration also
supported, such as socket created in thread A and used (bind, connect
and etc.) in thread B.
Type: feature
Signed-off-by: hanlin <hanlin_wang@163.com>
Change-Id: Iab0b43a33466968c1423d7d20faf1460c8589d91
Diffstat (limited to 'src/vcl')
-rw-r--r-- | src/vcl/ldp.c | 11 | ||||
-rw-r--r-- | src/vcl/vcl_cfg.c | 5 | ||||
-rw-r--r-- | src/vcl/vcl_locked.c | 245 | ||||
-rw-r--r-- | src/vcl/vcl_locked.h | 3 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 2 | ||||
-rw-r--r-- | src/vcl/vcl_private.h | 7 | ||||
-rw-r--r-- | src/vcl/vppcom.c | 13 |
7 files changed, 261 insertions, 25 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index cda4425e574..fddf45cd502 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -2223,7 +2223,7 @@ epoll_create1 (int flags) if ((errno = -ldp_init ())) return -1; - if (ldp->vcl_needs_real_epoll) + if (ldp->vcl_needs_real_epoll || vls_use_real_epoll ()) { /* Make sure workers have been allocated */ if (!ldp->workers) @@ -2425,7 +2425,7 @@ static inline int ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t * sigmask) { - ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); + ldp_worker_ctx_t *ldpw; int libc_epfd, rv = 0, num_ev; vls_handle_t ep_vlsh; @@ -2438,6 +2438,11 @@ ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, return -1; } + if (vls_mt_wrk_supported ()) + if (PREDICT_FALSE (vppcom_worker_index () == ~0)) + vls_register_vcl_worker (); + + ldpw = ldp_worker_get_current (); if (epfd == ldpw->vcl_mq_epfd) return libc_epoll_pwait (epfd, events, maxevents, timeout, sigmask); @@ -2497,7 +2502,7 @@ ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, rv = vls_epoll_wait (ep_vlsh, events, maxevents, 0); if (rv > 0) goto done; - else if (rv < 0) + else if (PREDICT_FALSE (rv < 0)) { errno = -rv; rv = -1; diff --git a/src/vcl/vcl_cfg.c b/src/vcl/vcl_cfg.c index 2ae940170b7..a94c874532f 100644 --- a/src/vcl/vcl_cfg.c +++ b/src/vcl/vcl_cfg.c @@ -498,6 +498,11 @@ vppcom_cfg_read_file (char *conf_fname) VCFG_DBG (0, "VCL<%d>: configured tls-engine %u (0x%x)", getpid (), vcl_cfg->tls_engine, vcl_cfg->tls_engine); } + else if (unformat (line_input, "multi-thread")) + { + vcl_cfg->mt_supported = 1; + VCFG_DBG (0, "VCL<%d>: configured with multithread", getpid ()); + } else if (unformat (line_input, "}")) { vc_cfg_input = 0; diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index fc48618429b..02da8cd2cf5 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -31,6 +31,9 @@ typedef struct vcl_locked_session_ u32 worker_index; u32 vls_index; u32 shared_data_index; + /** VCL session owned by different workers because of migration */ + u32 owner_vcl_wrk_index; + uword *vcl_wrk_index_to_session_index; } vcl_locked_session_t; typedef struct vls_worker_ @@ -38,7 +41,6 @@ typedef struct vls_worker_ vcl_locked_session_t *vls_pool; uword *session_index_to_vlsh_table; u32 wrk_index; - volatile int rpc_done; } vls_worker_t; typedef struct vls_local_ @@ -68,6 +70,7 @@ vls_main_t *vlsm; typedef enum vls_rpc_msg_type_ { VLS_RPC_CLONE_AND_SHARE, + VLS_RPC_SESS_CLEANUP, } vls_rpc_msg_type_e; typedef struct vls_rpc_msg_ @@ -79,14 +82,35 @@ typedef struct vls_rpc_msg_ typedef struct vls_clone_and_share_msg_ { u32 vls_index; /**< vls to be shared */ - u32 origin_vls_wrk; /**< worker that initiated the rpc */ + u32 session_index; /**< vcl session to be shared */ + u32 origin_vls_wrk; /**< vls worker that initiated the rpc */ u32 origin_vls_index; /**< vls session of the originator */ + u32 origin_vcl_wrk; /**< vcl worker that initiated the rpc */ + u32 origin_session_index; /**< vcl session of the originator */ } vls_clone_and_share_msg_t; +typedef struct vls_sess_cleanup_msg_ +{ + u32 session_index; /**< vcl session to be cleaned */ + u32 origin_vcl_wrk; /**< worker that initiated the rpc */ +} vls_sess_cleanup_msg_t; + +void vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index); +void vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, + u32 session_index, u32 vls_wrk_index, + u32 dst_wrk_index, u32 dst_vls_index, + u32 dst_session_index); + + static inline u32 vls_get_worker_index (void) { - return vcl_get_worker_index (); + if (vls_mt_wrk_supported ()) + return vlsl->vls_wrk_index; + else + return vcl_get_worker_index (); } static u32 @@ -190,7 +214,10 @@ static void vls_mt_add (void) { vlsl->vls_mt_n_threads += 1; - vcl_set_worker_index (vlsl->vls_wrk_index); + if (vls_mt_wrk_supported ()) + vls_register_vcl_worker (); + else + vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void @@ -305,6 +332,12 @@ vls_alloc (vcl_session_handle_t sh) vls->shared_data_index = ~0; hash_set (wrk->session_index_to_vlsh_table, vls->session_index, vls->vls_index); + if (vls_mt_wrk_supported ()) + { + hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index, + vls->session_index); + vls->owner_vcl_wrk_index = vls->worker_index; + } clib_spinlock_init (&vls->lock); vls_table_wunlock (); @@ -556,7 +589,8 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) u32 n_subscribers; vcl_session_t *s; - ASSERT (vls->shared_data_index != ~0); + if (vls->shared_data_index == ~0) + return 0; s = vcl_session_get (wrk, vls->session_index); if (s->session_state == STATE_LISTEN) @@ -792,10 +826,76 @@ vls_mt_rel_locks (int locks_acq) vls_mt_create_unlock (); } +static void +vls_session_migrate (vcl_locked_session_t * vls) +{ + u32 wrk_index = vcl_get_worker_index (); + vcl_worker_t *wrk; + u32 src_sid, sid; + vcl_session_t *session; + uword *p; + + if (!vls_mt_wrk_supported ()) + return; + + if (PREDICT_TRUE (vls->worker_index == wrk_index)) + return; + if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index))) + { + vls->worker_index = wrk_index; + vls->session_index = (u32) p[0]; + return; + } + + /* migrate from orignal vls */ + if (!(p = hash_get (vls->vcl_wrk_index_to_session_index, + vls->owner_vcl_wrk_index))) + { + VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index); + ASSERT (0); + return; + } + + src_sid = (u32) p[0]; + wrk = vcl_worker_get_current (); + session = vcl_session_alloc (wrk); + sid = session->session_index; + vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (), + vls->owner_vcl_wrk_index, vls->vls_index, + src_sid); + session->session_index = sid; + vls->worker_index = wrk_index; + vls->session_index = sid; + hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid); + VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)", + vls->owner_vcl_wrk_index, src_sid, wrk_index, sid); + + if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0)) + { + /* TODO: rollback? */ + VERR ("can't migrate nonempty epoll session"); + ASSERT (0); + return; + } + else if (PREDICT_FALSE (!session->is_vep && + session->session_state != STATE_CLOSED)) + { + /* TODO: rollback? */ + VERR ("migrate NOT supported, session_status (%u)", + session->session_state); + ASSERT (0); + return; + } +} + #define vls_mt_guard(_vls, _op) \ int _locks_acq = 0; \ if (PREDICT_FALSE (vcl_get_worker_index () == ~0)) \ vls_mt_add (); \ + if (PREDICT_FALSE (_vls && vls_mt_wrk_supported () && \ + ((vcl_locked_session_t *)_vls)->worker_index != \ + vcl_get_worker_index ())) \ + vls_session_migrate (_vls); \ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \ vls_mt_acq_locks (_vls, _op, &_locks_acq); \ @@ -893,6 +993,7 @@ vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen) if (!(vls = vls_get_w_dlock (vlsh))) return VPPCOM_EBADFD; + vls_session_migrate (vls); rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen); vls_get_and_unlock (vlsh); return rv; @@ -948,6 +1049,9 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add) vcl_session_t *s; u32 owner_wrk; + if (vls_mt_wrk_supported ()) + return; + s = vcl_session_get (wrk, vls->session_index); switch (s->session_state) { @@ -1024,6 +1128,27 @@ vls_create (uint8_t proto, uint8_t is_nonblocking) return vlsh; } +static void +vls_migrate_session_close (vcl_locked_session_t * vls) +{ + u32 session_index, wrk_index; + vcl_worker_t *wrk = vcl_worker_get_current (); + + if (!vls_mt_wrk_supported ()) + return; + + /* *INDENT-OFF* */ + hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index, + ({ + if (vcl_get_worker_index () != wrk_index) + { + vls_send_session_cleanup_rpc (wrk, wrk_index, session_index); + } + })); + /* *INDENT-ON* */ + hash_free (vls->vcl_wrk_index_to_session_index); +} + int vls_close (vls_handle_t vlsh) { @@ -1039,13 +1164,15 @@ vls_close (vls_handle_t vlsh) return VPPCOM_EBADFD; } - vls_mt_guard (0, VLS_MT_OP_SPOOL); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) rv = vls_unshare_session (vls, vcl_worker_get_current ()); else rv = vppcom_session_close (vls_to_sh (vls)); + vls_migrate_session_close (vls); + vls_free (vls); vls_mt_unguard (); @@ -1101,6 +1228,7 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, vls_table_rlock (); ep_vls = vls_get_and_lock (ep_vlsh); vls = vls_get_and_lock (vlsh); + vls_session_migrate (ep_vls); ep_sh = vls_to_sh (ep_vls); sh = vls_to_sh (vls); @@ -1388,21 +1516,42 @@ vls_clone_and_share_rpc_handler (void *args) vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args; vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk; vcl_locked_session_t *vls, *dst_vls; - vcl_worker_t *dst_vcl_wrk; + vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk; vcl_session_t *s, *dst_s; vls = vls_session_get (wrk, msg->vls_index); - vls_init_share_session (wrk, vls); - s = vcl_session_get (vcl_worker_get_current (), vls->session_index); + if (!vls_mt_wrk_supported ()) + vls_init_share_session (wrk, vls); + + s = vcl_session_get (vcl_wrk, msg->session_index); dst_wrk = vls_worker_get (msg->origin_vls_wrk); - dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk); + dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk); dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index); dst_vls->shared_data_index = vls->shared_data_index; - dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index); + dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index); clib_memcpy (dst_s, s, sizeof (*s)); + dst_vcl_wrk->rpc_done = 1; + + VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)", + vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index, + msg->origin_session_index); +} + +static void +vls_session_cleanup_rpc_handler (void *args) +{ + vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args; + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk); + + vppcom_session_close (vcl_session_handle_from_index (msg->session_index)); + dst_wrk->rpc_done = 1; + + VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()", + wrk->wrk_index, msg->session_index, dst_wrk->wrk_index); } static void @@ -1414,29 +1563,68 @@ vls_rpc_handler (void *args) case VLS_RPC_CLONE_AND_SHARE: vls_clone_and_share_rpc_handler (msg->data); break; + case VLS_RPC_SESS_CLEANUP: + vls_session_cleanup_rpc_handler (msg->data); + break; default: break; } } void -vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls, - u32 dst_wrk_index, u32 dst_vls_index) +vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, u32 session_index, + u32 vls_wrk_index, u32 dst_wrk_index, + u32 dst_vls_index, u32 dst_session_index) { u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)]; vls_clone_and_share_msg_t *msg; vls_rpc_msg_t *rpc; + int ret; rpc = (vls_rpc_msg_t *) & data; rpc->type = VLS_RPC_CLONE_AND_SHARE; msg = (vls_clone_and_share_msg_t *) & rpc->data; - msg->origin_vls_wrk = wrk->wrk_index; + msg->origin_vls_wrk = vls_wrk_index; msg->origin_vls_index = vls->vls_index; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->origin_session_index = session_index; msg->vls_index = dst_vls_index; + msg->session_index = dst_session_index; + + wrk->rpc_done = 0; + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session clone of worker (session): %u (%u) -> %u (%u), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, + msg->origin_session_index, ret); + while (!ret && !wrk->rpc_done) + ; +} + +void +vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index) +{ + u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)]; + vls_sess_cleanup_msg_t *msg; + vls_rpc_msg_t *rpc; + int ret; + + rpc = (vls_rpc_msg_t *) & data; + rpc->type = VLS_RPC_SESS_CLEANUP; + msg = (vls_sess_cleanup_msg_t *) & rpc->data; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->session_index = dst_session_index; wrk->rpc_done = 0; - vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); - while (!wrk->rpc_done) + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session cleanup of worker (session): %u (%u) from %u (), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); + while (!ret && !wrk->rpc_done) ; } @@ -1470,6 +1658,31 @@ vls_use_eventfd (void) return vcm->cfg.use_mq_eventfd; } +unsigned char +vls_mt_wrk_supported (void) +{ + return vcm->cfg.mt_supported; +} + +int +vls_use_real_epoll (void) +{ + if (vcl_get_worker_index () == ~0) + return 0; + + return vcl_worker_get_current ()->vcl_needs_real_epoll; +} + +void +vls_register_vcl_worker (void) +{ + if (vppcom_worker_register () != VPPCOM_OK) + { + VERR ("failed to register worker"); + return; + } +} + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h index 18b341b11ca..7cfb3bdc522 100644 --- a/src/vcl/vcl_locked.h +++ b/src/vcl/vcl_locked.h @@ -51,6 +51,9 @@ vcl_session_handle_t vlsh_to_session_index (vls_handle_t vlsh); vls_handle_t vls_session_index_to_vlsh (uint32_t session_index); int vls_app_create (char *app_name); unsigned char vls_use_eventfd (void); +unsigned char vls_mt_wrk_supported (void); +int vls_use_real_epoll (void); +void vls_register_vcl_worker (void); #endif /* SRC_VCL_VCL_LOCKED_H_ */ diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index 300b82c4f21..1dadb628fb8 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -217,7 +217,9 @@ vcl_worker_alloc_and_init () wrk->mqs_epfd = -1; if (vcm->cfg.use_mq_eventfd) { + wrk->vcl_needs_real_epoll = 1; wrk->mqs_epfd = epoll_create (1); + wrk->vcl_needs_real_epoll = 0; if (wrk->mqs_epfd < 0) { clib_unix_warning ("epoll_create() returned"); diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 4a25632379c..b873ad994a6 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -218,6 +218,7 @@ typedef struct vppcom_cfg_t_ u8 *vpp_api_socket_name; u8 *vpp_api_chroot; u32 tls_engine; + u8 mt_supported; } vppcom_cfg_t; void vppcom_cfg (vppcom_cfg_t * vcl_cfg); @@ -307,6 +308,10 @@ typedef struct vcl_worker_ socket_client_main_t bapi_sock_ctx; memory_client_main_t bapi_shm_ctx; api_main_t bapi_api_ctx; + + /** vcl needs next epoll_create to go to libc_epoll */ + u8 vcl_needs_real_epoll; + volatile int rpc_done; } vcl_worker_t; typedef void (vcl_rpc_fn_t) (void *args); @@ -659,7 +664,7 @@ vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, u32 wrk_index); -void vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len); +int vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len); /* * VCL Binary API diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 41d2f3170aa..d73c73be383 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -351,13 +351,14 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, app_send_ctrl_evt_to_vpp (mq, app_evt); } -void +int vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len) { app_session_evt_t _app_evt, *app_evt = &_app_evt; session_app_wrk_rpc_msg_t *mp; vcl_worker_t *dst_wrk, *wrk; svm_msg_q_t *mq; + int ret = -1; if (data_len > sizeof (mp->data)) goto done; @@ -376,9 +377,11 @@ vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len) mp->wrk_index = dst_wrk->vpp_wrk_index; clib_memcpy (mp->data, data, data_len); app_send_ctrl_evt_to_vpp (mq, app_evt); + ret = 0; done: clib_spinlock_unlock (&vcm->workers_lock); + return ret; } static u32 @@ -902,7 +905,7 @@ vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data) if (!vcm->wrk_rpc_fn) return; - (vcm->wrk_rpc_fn) (data); + (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data); } static int @@ -962,7 +965,7 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: @@ -2301,7 +2304,7 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: @@ -2920,7 +2923,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: |