diff options
Diffstat (limited to 'src/vcl/vcl_locked.c')
-rw-r--r-- | src/vcl/vcl_locked.c | 245 |
1 files changed, 229 insertions, 16 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index fc48618429b..02da8cd2cf5 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -31,6 +31,9 @@ typedef struct vcl_locked_session_ u32 worker_index; u32 vls_index; u32 shared_data_index; + /** VCL session owned by different workers because of migration */ + u32 owner_vcl_wrk_index; + uword *vcl_wrk_index_to_session_index; } vcl_locked_session_t; typedef struct vls_worker_ @@ -38,7 +41,6 @@ typedef struct vls_worker_ vcl_locked_session_t *vls_pool; uword *session_index_to_vlsh_table; u32 wrk_index; - volatile int rpc_done; } vls_worker_t; typedef struct vls_local_ @@ -68,6 +70,7 @@ vls_main_t *vlsm; typedef enum vls_rpc_msg_type_ { VLS_RPC_CLONE_AND_SHARE, + VLS_RPC_SESS_CLEANUP, } vls_rpc_msg_type_e; typedef struct vls_rpc_msg_ @@ -79,14 +82,35 @@ typedef struct vls_rpc_msg_ typedef struct vls_clone_and_share_msg_ { u32 vls_index; /**< vls to be shared */ - u32 origin_vls_wrk; /**< worker that initiated the rpc */ + u32 session_index; /**< vcl session to be shared */ + u32 origin_vls_wrk; /**< vls worker that initiated the rpc */ u32 origin_vls_index; /**< vls session of the originator */ + u32 origin_vcl_wrk; /**< vcl worker that initiated the rpc */ + u32 origin_session_index; /**< vcl session of the originator */ } vls_clone_and_share_msg_t; +typedef struct vls_sess_cleanup_msg_ +{ + u32 session_index; /**< vcl session to be cleaned */ + u32 origin_vcl_wrk; /**< worker that initiated the rpc */ +} vls_sess_cleanup_msg_t; + +void vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index); +void vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, + u32 session_index, u32 vls_wrk_index, + u32 dst_wrk_index, u32 dst_vls_index, + u32 dst_session_index); + + static inline u32 vls_get_worker_index (void) { - return vcl_get_worker_index (); + if (vls_mt_wrk_supported ()) + return vlsl->vls_wrk_index; + else + return vcl_get_worker_index (); } static u32 @@ -190,7 +214,10 @@ static void vls_mt_add (void) { vlsl->vls_mt_n_threads += 1; - vcl_set_worker_index (vlsl->vls_wrk_index); + if (vls_mt_wrk_supported ()) + vls_register_vcl_worker (); + else + vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void @@ -305,6 +332,12 @@ vls_alloc (vcl_session_handle_t sh) vls->shared_data_index = ~0; hash_set (wrk->session_index_to_vlsh_table, vls->session_index, vls->vls_index); + if (vls_mt_wrk_supported ()) + { + hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index, + vls->session_index); + vls->owner_vcl_wrk_index = vls->worker_index; + } clib_spinlock_init (&vls->lock); vls_table_wunlock (); @@ -556,7 +589,8 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) u32 n_subscribers; vcl_session_t *s; - ASSERT (vls->shared_data_index != ~0); + if (vls->shared_data_index == ~0) + return 0; s = vcl_session_get (wrk, vls->session_index); if (s->session_state == STATE_LISTEN) @@ -792,10 +826,76 @@ vls_mt_rel_locks (int locks_acq) vls_mt_create_unlock (); } +static void +vls_session_migrate (vcl_locked_session_t * vls) +{ + u32 wrk_index = vcl_get_worker_index (); + vcl_worker_t *wrk; + u32 src_sid, sid; + vcl_session_t *session; + uword *p; + + if (!vls_mt_wrk_supported ()) + return; + + if (PREDICT_TRUE (vls->worker_index == wrk_index)) + return; + if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index))) + { + vls->worker_index = wrk_index; + vls->session_index = (u32) p[0]; + return; + } + + /* migrate from orignal vls */ + if (!(p = hash_get (vls->vcl_wrk_index_to_session_index, + vls->owner_vcl_wrk_index))) + { + VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index); + ASSERT (0); + return; + } + + src_sid = (u32) p[0]; + wrk = vcl_worker_get_current (); + session = vcl_session_alloc (wrk); + sid = session->session_index; + vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (), + vls->owner_vcl_wrk_index, vls->vls_index, + src_sid); + session->session_index = sid; + vls->worker_index = wrk_index; + vls->session_index = sid; + hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid); + VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)", + vls->owner_vcl_wrk_index, src_sid, wrk_index, sid); + + if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0)) + { + /* TODO: rollback? */ + VERR ("can't migrate nonempty epoll session"); + ASSERT (0); + return; + } + else if (PREDICT_FALSE (!session->is_vep && + session->session_state != STATE_CLOSED)) + { + /* TODO: rollback? */ + VERR ("migrate NOT supported, session_status (%u)", + session->session_state); + ASSERT (0); + return; + } +} + #define vls_mt_guard(_vls, _op) \ int _locks_acq = 0; \ if (PREDICT_FALSE (vcl_get_worker_index () == ~0)) \ vls_mt_add (); \ + if (PREDICT_FALSE (_vls && vls_mt_wrk_supported () && \ + ((vcl_locked_session_t *)_vls)->worker_index != \ + vcl_get_worker_index ())) \ + vls_session_migrate (_vls); \ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \ vls_mt_acq_locks (_vls, _op, &_locks_acq); \ @@ -893,6 +993,7 @@ vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen) if (!(vls = vls_get_w_dlock (vlsh))) return VPPCOM_EBADFD; + vls_session_migrate (vls); rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen); vls_get_and_unlock (vlsh); return rv; @@ -948,6 +1049,9 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add) vcl_session_t *s; u32 owner_wrk; + if (vls_mt_wrk_supported ()) + return; + s = vcl_session_get (wrk, vls->session_index); switch (s->session_state) { @@ -1024,6 +1128,27 @@ vls_create (uint8_t proto, uint8_t is_nonblocking) return vlsh; } +static void +vls_migrate_session_close (vcl_locked_session_t * vls) +{ + u32 session_index, wrk_index; + vcl_worker_t *wrk = vcl_worker_get_current (); + + if (!vls_mt_wrk_supported ()) + return; + + /* *INDENT-OFF* */ + hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index, + ({ + if (vcl_get_worker_index () != wrk_index) + { + vls_send_session_cleanup_rpc (wrk, wrk_index, session_index); + } + })); + /* *INDENT-ON* */ + hash_free (vls->vcl_wrk_index_to_session_index); +} + int vls_close (vls_handle_t vlsh) { @@ -1039,13 +1164,15 @@ vls_close (vls_handle_t vlsh) return VPPCOM_EBADFD; } - vls_mt_guard (0, VLS_MT_OP_SPOOL); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) rv = vls_unshare_session (vls, vcl_worker_get_current ()); else rv = vppcom_session_close (vls_to_sh (vls)); + vls_migrate_session_close (vls); + vls_free (vls); vls_mt_unguard (); @@ -1101,6 +1228,7 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, vls_table_rlock (); ep_vls = vls_get_and_lock (ep_vlsh); vls = vls_get_and_lock (vlsh); + vls_session_migrate (ep_vls); ep_sh = vls_to_sh (ep_vls); sh = vls_to_sh (vls); @@ -1388,21 +1516,42 @@ vls_clone_and_share_rpc_handler (void *args) vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args; vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk; vcl_locked_session_t *vls, *dst_vls; - vcl_worker_t *dst_vcl_wrk; + vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk; vcl_session_t *s, *dst_s; vls = vls_session_get (wrk, msg->vls_index); - vls_init_share_session (wrk, vls); - s = vcl_session_get (vcl_worker_get_current (), vls->session_index); + if (!vls_mt_wrk_supported ()) + vls_init_share_session (wrk, vls); + + s = vcl_session_get (vcl_wrk, msg->session_index); dst_wrk = vls_worker_get (msg->origin_vls_wrk); - dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk); + dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk); dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index); dst_vls->shared_data_index = vls->shared_data_index; - dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index); + dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index); clib_memcpy (dst_s, s, sizeof (*s)); + dst_vcl_wrk->rpc_done = 1; + + VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)", + vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index, + msg->origin_session_index); +} + +static void +vls_session_cleanup_rpc_handler (void *args) +{ + vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args; + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk); + + vppcom_session_close (vcl_session_handle_from_index (msg->session_index)); + dst_wrk->rpc_done = 1; + + VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()", + wrk->wrk_index, msg->session_index, dst_wrk->wrk_index); } static void @@ -1414,29 +1563,68 @@ vls_rpc_handler (void *args) case VLS_RPC_CLONE_AND_SHARE: vls_clone_and_share_rpc_handler (msg->data); break; + case VLS_RPC_SESS_CLEANUP: + vls_session_cleanup_rpc_handler (msg->data); + break; default: break; } } void -vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls, - u32 dst_wrk_index, u32 dst_vls_index) +vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, u32 session_index, + u32 vls_wrk_index, u32 dst_wrk_index, + u32 dst_vls_index, u32 dst_session_index) { u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)]; vls_clone_and_share_msg_t *msg; vls_rpc_msg_t *rpc; + int ret; rpc = (vls_rpc_msg_t *) & data; rpc->type = VLS_RPC_CLONE_AND_SHARE; msg = (vls_clone_and_share_msg_t *) & rpc->data; - msg->origin_vls_wrk = wrk->wrk_index; + msg->origin_vls_wrk = vls_wrk_index; msg->origin_vls_index = vls->vls_index; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->origin_session_index = session_index; msg->vls_index = dst_vls_index; + msg->session_index = dst_session_index; + + wrk->rpc_done = 0; + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session clone of worker (session): %u (%u) -> %u (%u), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, + msg->origin_session_index, ret); + while (!ret && !wrk->rpc_done) + ; +} + +void +vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index) +{ + u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)]; + vls_sess_cleanup_msg_t *msg; + vls_rpc_msg_t *rpc; + int ret; + + rpc = (vls_rpc_msg_t *) & data; + rpc->type = VLS_RPC_SESS_CLEANUP; + msg = (vls_sess_cleanup_msg_t *) & rpc->data; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->session_index = dst_session_index; wrk->rpc_done = 0; - vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); - while (!wrk->rpc_done) + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session cleanup of worker (session): %u (%u) from %u (), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); + while (!ret && !wrk->rpc_done) ; } @@ -1470,6 +1658,31 @@ vls_use_eventfd (void) return vcm->cfg.use_mq_eventfd; } +unsigned char +vls_mt_wrk_supported (void) +{ + return vcm->cfg.mt_supported; +} + +int +vls_use_real_epoll (void) +{ + if (vcl_get_worker_index () == ~0) + return 0; + + return vcl_worker_get_current ()->vcl_needs_real_epoll; +} + +void +vls_register_vcl_worker (void) +{ + if (vppcom_worker_register () != VPPCOM_OK) + { + VERR ("failed to register worker"); + return; + } +} + /* * fd.io coding-style-patch-verification: ON * |