diff options
-rw-r--r-- | src/vcl/ldp.c | 11 | ||||
-rw-r--r-- | src/vcl/vcl_cfg.c | 5 | ||||
-rw-r--r-- | src/vcl/vcl_locked.c | 245 | ||||
-rw-r--r-- | src/vcl/vcl_locked.h | 3 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 2 | ||||
-rw-r--r-- | src/vcl/vcl_private.h | 7 | ||||
-rw-r--r-- | src/vcl/vppcom.c | 13 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 2 | ||||
-rw-r--r-- | src/vnet/session/session_types.h | 1 |
9 files changed, 263 insertions, 26 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index cda4425e574..fddf45cd502 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -2223,7 +2223,7 @@ epoll_create1 (int flags) if ((errno = -ldp_init ())) return -1; - if (ldp->vcl_needs_real_epoll) + if (ldp->vcl_needs_real_epoll || vls_use_real_epoll ()) { /* Make sure workers have been allocated */ if (!ldp->workers) @@ -2425,7 +2425,7 @@ static inline int ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t * sigmask) { - ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); + ldp_worker_ctx_t *ldpw; int libc_epfd, rv = 0, num_ev; vls_handle_t ep_vlsh; @@ -2438,6 +2438,11 @@ ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, return -1; } + if (vls_mt_wrk_supported ()) + if (PREDICT_FALSE (vppcom_worker_index () == ~0)) + vls_register_vcl_worker (); + + ldpw = ldp_worker_get_current (); if (epfd == ldpw->vcl_mq_epfd) return libc_epoll_pwait (epfd, events, maxevents, timeout, sigmask); @@ -2497,7 +2502,7 @@ ldp_epoll_pwait_eventfd (int epfd, struct epoll_event *events, rv = vls_epoll_wait (ep_vlsh, events, maxevents, 0); if (rv > 0) goto done; - else if (rv < 0) + else if (PREDICT_FALSE (rv < 0)) { errno = -rv; rv = -1; diff --git a/src/vcl/vcl_cfg.c b/src/vcl/vcl_cfg.c index 2ae940170b7..a94c874532f 100644 --- a/src/vcl/vcl_cfg.c +++ b/src/vcl/vcl_cfg.c @@ -498,6 +498,11 @@ vppcom_cfg_read_file (char *conf_fname) VCFG_DBG (0, "VCL<%d>: configured tls-engine %u (0x%x)", getpid (), vcl_cfg->tls_engine, vcl_cfg->tls_engine); } + else if (unformat (line_input, "multi-thread")) + { + vcl_cfg->mt_supported = 1; + VCFG_DBG (0, "VCL<%d>: configured with multithread", getpid ()); + } else if (unformat (line_input, "}")) { vc_cfg_input = 0; diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index fc48618429b..02da8cd2cf5 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -31,6 +31,9 @@ typedef struct vcl_locked_session_ u32 worker_index; u32 vls_index; u32 shared_data_index; + /** VCL session owned by different workers because of migration */ + u32 owner_vcl_wrk_index; + uword *vcl_wrk_index_to_session_index; } vcl_locked_session_t; typedef struct vls_worker_ @@ -38,7 +41,6 @@ typedef struct vls_worker_ vcl_locked_session_t *vls_pool; uword *session_index_to_vlsh_table; u32 wrk_index; - volatile int rpc_done; } vls_worker_t; typedef struct vls_local_ @@ -68,6 +70,7 @@ vls_main_t *vlsm; typedef enum vls_rpc_msg_type_ { VLS_RPC_CLONE_AND_SHARE, + VLS_RPC_SESS_CLEANUP, } vls_rpc_msg_type_e; typedef struct vls_rpc_msg_ @@ -79,14 +82,35 @@ typedef struct vls_rpc_msg_ typedef struct vls_clone_and_share_msg_ { u32 vls_index; /**< vls to be shared */ - u32 origin_vls_wrk; /**< worker that initiated the rpc */ + u32 session_index; /**< vcl session to be shared */ + u32 origin_vls_wrk; /**< vls worker that initiated the rpc */ u32 origin_vls_index; /**< vls session of the originator */ + u32 origin_vcl_wrk; /**< vcl worker that initiated the rpc */ + u32 origin_session_index; /**< vcl session of the originator */ } vls_clone_and_share_msg_t; +typedef struct vls_sess_cleanup_msg_ +{ + u32 session_index; /**< vcl session to be cleaned */ + u32 origin_vcl_wrk; /**< worker that initiated the rpc */ +} vls_sess_cleanup_msg_t; + +void vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index); +void vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, + u32 session_index, u32 vls_wrk_index, + u32 dst_wrk_index, u32 dst_vls_index, + u32 dst_session_index); + + static inline u32 vls_get_worker_index (void) { - return vcl_get_worker_index (); + if (vls_mt_wrk_supported ()) + return vlsl->vls_wrk_index; + else + return vcl_get_worker_index (); } static u32 @@ -190,7 +214,10 @@ static void vls_mt_add (void) { vlsl->vls_mt_n_threads += 1; - vcl_set_worker_index (vlsl->vls_wrk_index); + if (vls_mt_wrk_supported ()) + vls_register_vcl_worker (); + else + vcl_set_worker_index (vlsl->vls_wrk_index); } static inline void @@ -305,6 +332,12 @@ vls_alloc (vcl_session_handle_t sh) vls->shared_data_index = ~0; hash_set (wrk->session_index_to_vlsh_table, vls->session_index, vls->vls_index); + if (vls_mt_wrk_supported ()) + { + hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index, + vls->session_index); + vls->owner_vcl_wrk_index = vls->worker_index; + } clib_spinlock_init (&vls->lock); vls_table_wunlock (); @@ -556,7 +589,8 @@ vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk) u32 n_subscribers; vcl_session_t *s; - ASSERT (vls->shared_data_index != ~0); + if (vls->shared_data_index == ~0) + return 0; s = vcl_session_get (wrk, vls->session_index); if (s->session_state == STATE_LISTEN) @@ -792,10 +826,76 @@ vls_mt_rel_locks (int locks_acq) vls_mt_create_unlock (); } +static void +vls_session_migrate (vcl_locked_session_t * vls) +{ + u32 wrk_index = vcl_get_worker_index (); + vcl_worker_t *wrk; + u32 src_sid, sid; + vcl_session_t *session; + uword *p; + + if (!vls_mt_wrk_supported ()) + return; + + if (PREDICT_TRUE (vls->worker_index == wrk_index)) + return; + if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index))) + { + vls->worker_index = wrk_index; + vls->session_index = (u32) p[0]; + return; + } + + /* migrate from orignal vls */ + if (!(p = hash_get (vls->vcl_wrk_index_to_session_index, + vls->owner_vcl_wrk_index))) + { + VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index); + ASSERT (0); + return; + } + + src_sid = (u32) p[0]; + wrk = vcl_worker_get_current (); + session = vcl_session_alloc (wrk); + sid = session->session_index; + vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (), + vls->owner_vcl_wrk_index, vls->vls_index, + src_sid); + session->session_index = sid; + vls->worker_index = wrk_index; + vls->session_index = sid; + hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid); + VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)", + vls->owner_vcl_wrk_index, src_sid, wrk_index, sid); + + if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0)) + { + /* TODO: rollback? */ + VERR ("can't migrate nonempty epoll session"); + ASSERT (0); + return; + } + else if (PREDICT_FALSE (!session->is_vep && + session->session_state != STATE_CLOSED)) + { + /* TODO: rollback? */ + VERR ("migrate NOT supported, session_status (%u)", + session->session_state); + ASSERT (0); + return; + } +} + #define vls_mt_guard(_vls, _op) \ int _locks_acq = 0; \ if (PREDICT_FALSE (vcl_get_worker_index () == ~0)) \ vls_mt_add (); \ + if (PREDICT_FALSE (_vls && vls_mt_wrk_supported () && \ + ((vcl_locked_session_t *)_vls)->worker_index != \ + vcl_get_worker_index ())) \ + vls_session_migrate (_vls); \ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \ vls_mt_acq_locks (_vls, _op, &_locks_acq); \ @@ -893,6 +993,7 @@ vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen) if (!(vls = vls_get_w_dlock (vlsh))) return VPPCOM_EBADFD; + vls_session_migrate (vls); rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen); vls_get_and_unlock (vlsh); return rv; @@ -948,6 +1049,9 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add) vcl_session_t *s; u32 owner_wrk; + if (vls_mt_wrk_supported ()) + return; + s = vcl_session_get (wrk, vls->session_index); switch (s->session_state) { @@ -1024,6 +1128,27 @@ vls_create (uint8_t proto, uint8_t is_nonblocking) return vlsh; } +static void +vls_migrate_session_close (vcl_locked_session_t * vls) +{ + u32 session_index, wrk_index; + vcl_worker_t *wrk = vcl_worker_get_current (); + + if (!vls_mt_wrk_supported ()) + return; + + /* *INDENT-OFF* */ + hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index, + ({ + if (vcl_get_worker_index () != wrk_index) + { + vls_send_session_cleanup_rpc (wrk, wrk_index, session_index); + } + })); + /* *INDENT-ON* */ + hash_free (vls->vcl_wrk_index_to_session_index); +} + int vls_close (vls_handle_t vlsh) { @@ -1039,13 +1164,15 @@ vls_close (vls_handle_t vlsh) return VPPCOM_EBADFD; } - vls_mt_guard (0, VLS_MT_OP_SPOOL); + vls_mt_guard (vls, VLS_MT_OP_SPOOL); if (vls_is_shared (vls)) rv = vls_unshare_session (vls, vcl_worker_get_current ()); else rv = vppcom_session_close (vls_to_sh (vls)); + vls_migrate_session_close (vls); + vls_free (vls); vls_mt_unguard (); @@ -1101,6 +1228,7 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh, vls_table_rlock (); ep_vls = vls_get_and_lock (ep_vlsh); vls = vls_get_and_lock (vlsh); + vls_session_migrate (ep_vls); ep_sh = vls_to_sh (ep_vls); sh = vls_to_sh (vls); @@ -1388,21 +1516,42 @@ vls_clone_and_share_rpc_handler (void *args) vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args; vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk; vcl_locked_session_t *vls, *dst_vls; - vcl_worker_t *dst_vcl_wrk; + vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk; vcl_session_t *s, *dst_s; vls = vls_session_get (wrk, msg->vls_index); - vls_init_share_session (wrk, vls); - s = vcl_session_get (vcl_worker_get_current (), vls->session_index); + if (!vls_mt_wrk_supported ()) + vls_init_share_session (wrk, vls); + + s = vcl_session_get (vcl_wrk, msg->session_index); dst_wrk = vls_worker_get (msg->origin_vls_wrk); - dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk); + dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk); dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index); dst_vls->shared_data_index = vls->shared_data_index; - dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index); + dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index); clib_memcpy (dst_s, s, sizeof (*s)); + dst_vcl_wrk->rpc_done = 1; + + VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)", + vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index, + msg->origin_session_index); +} + +static void +vls_session_cleanup_rpc_handler (void *args) +{ + vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args; + vcl_worker_t *wrk = vcl_worker_get_current (); + vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk); + + vppcom_session_close (vcl_session_handle_from_index (msg->session_index)); + dst_wrk->rpc_done = 1; + + VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()", + wrk->wrk_index, msg->session_index, dst_wrk->wrk_index); } static void @@ -1414,29 +1563,68 @@ vls_rpc_handler (void *args) case VLS_RPC_CLONE_AND_SHARE: vls_clone_and_share_rpc_handler (msg->data); break; + case VLS_RPC_SESS_CLEANUP: + vls_session_cleanup_rpc_handler (msg->data); + break; default: break; } } void -vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls, - u32 dst_wrk_index, u32 dst_vls_index) +vls_send_clone_and_share_rpc (vcl_worker_t * wrk, + vcl_locked_session_t * vls, u32 session_index, + u32 vls_wrk_index, u32 dst_wrk_index, + u32 dst_vls_index, u32 dst_session_index) { u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)]; vls_clone_and_share_msg_t *msg; vls_rpc_msg_t *rpc; + int ret; rpc = (vls_rpc_msg_t *) & data; rpc->type = VLS_RPC_CLONE_AND_SHARE; msg = (vls_clone_and_share_msg_t *) & rpc->data; - msg->origin_vls_wrk = wrk->wrk_index; + msg->origin_vls_wrk = vls_wrk_index; msg->origin_vls_index = vls->vls_index; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->origin_session_index = session_index; msg->vls_index = dst_vls_index; + msg->session_index = dst_session_index; + + wrk->rpc_done = 0; + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session clone of worker (session): %u (%u) -> %u (%u), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, + msg->origin_session_index, ret); + while (!ret && !wrk->rpc_done) + ; +} + +void +vls_send_session_cleanup_rpc (vcl_worker_t * wrk, + u32 dst_wrk_index, u32 dst_session_index) +{ + u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)]; + vls_sess_cleanup_msg_t *msg; + vls_rpc_msg_t *rpc; + int ret; + + rpc = (vls_rpc_msg_t *) & data; + rpc->type = VLS_RPC_SESS_CLEANUP; + msg = (vls_sess_cleanup_msg_t *) & rpc->data; + msg->origin_vcl_wrk = wrk->wrk_index; + msg->session_index = dst_session_index; wrk->rpc_done = 0; - vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); - while (!wrk->rpc_done) + ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + + VDBG (1, + "send session cleanup of worker (session): %u (%u) from %u (), ret=%d", + dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret); + while (!ret && !wrk->rpc_done) ; } @@ -1470,6 +1658,31 @@ vls_use_eventfd (void) return vcm->cfg.use_mq_eventfd; } +unsigned char +vls_mt_wrk_supported (void) +{ + return vcm->cfg.mt_supported; +} + +int +vls_use_real_epoll (void) +{ + if (vcl_get_worker_index () == ~0) + return 0; + + return vcl_worker_get_current ()->vcl_needs_real_epoll; +} + +void +vls_register_vcl_worker (void) +{ + if (vppcom_worker_register () != VPPCOM_OK) + { + VERR ("failed to register worker"); + return; + } +} + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vcl/vcl_locked.h b/src/vcl/vcl_locked.h index 18b341b11ca..7cfb3bdc522 100644 --- a/src/vcl/vcl_locked.h +++ b/src/vcl/vcl_locked.h @@ -51,6 +51,9 @@ vcl_session_handle_t vlsh_to_session_index (vls_handle_t vlsh); vls_handle_t vls_session_index_to_vlsh (uint32_t session_index); int vls_app_create (char *app_name); unsigned char vls_use_eventfd (void); +unsigned char vls_mt_wrk_supported (void); +int vls_use_real_epoll (void); +void vls_register_vcl_worker (void); #endif /* SRC_VCL_VCL_LOCKED_H_ */ diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index 300b82c4f21..1dadb628fb8 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -217,7 +217,9 @@ vcl_worker_alloc_and_init () wrk->mqs_epfd = -1; if (vcm->cfg.use_mq_eventfd) { + wrk->vcl_needs_real_epoll = 1; wrk->mqs_epfd = epoll_create (1); + wrk->vcl_needs_real_epoll = 0; if (wrk->mqs_epfd < 0) { clib_unix_warning ("epoll_create() returned"); diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 4a25632379c..b873ad994a6 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -218,6 +218,7 @@ typedef struct vppcom_cfg_t_ u8 *vpp_api_socket_name; u8 *vpp_api_chroot; u32 tls_engine; + u8 mt_supported; } vppcom_cfg_t; void vppcom_cfg (vppcom_cfg_t * vcl_cfg); @@ -307,6 +308,10 @@ typedef struct vcl_worker_ socket_client_main_t bapi_sock_ctx; memory_client_main_t bapi_shm_ctx; api_main_t bapi_api_ctx; + + /** vcl needs next epoll_create to go to libc_epoll */ + u8 vcl_needs_real_epoll; + volatile int rpc_done; } vcl_worker_t; typedef void (vcl_rpc_fn_t) (void *args); @@ -659,7 +664,7 @@ vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, u32 wrk_index); -void vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len); +int vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len); /* * VCL Binary API diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 41d2f3170aa..d73c73be383 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -351,13 +351,14 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, app_send_ctrl_evt_to_vpp (mq, app_evt); } -void +int vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len) { app_session_evt_t _app_evt, *app_evt = &_app_evt; session_app_wrk_rpc_msg_t *mp; vcl_worker_t *dst_wrk, *wrk; svm_msg_q_t *mq; + int ret = -1; if (data_len > sizeof (mp->data)) goto done; @@ -376,9 +377,11 @@ vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len) mp->wrk_index = dst_wrk->vpp_wrk_index; clib_memcpy (mp->data, data, data_len); app_send_ctrl_evt_to_vpp (mq, app_evt); + ret = 0; done: clib_spinlock_unlock (&vcm->workers_lock); + return ret; } static u32 @@ -902,7 +905,7 @@ vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data) if (!vcm->wrk_rpc_fn) return; - (vcm->wrk_rpc_fn) (data); + (vcm->wrk_rpc_fn) (((session_app_wrk_rpc_msg_t *) data)->data); } static int @@ -962,7 +965,7 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: @@ -2301,7 +2304,7 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: @@ -2920,7 +2923,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; - case SESSION_CTRL_EVT_RPC: + case SESSION_CTRL_EVT_APP_WRK_RPC: vcl_worker_rpc_handler (wrk, e->data); break; default: diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index d1897934d61..cddc031ec0d 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -531,7 +531,7 @@ typedef struct session_app_wrk_rpc_msg_ { u32 client_index; /**< app client index */ u32 wrk_index; /**< dst worker index */ - u8 data[252]; /**< rpc data */ + u8 data[64]; /**< rpc data */ } __clib_packed session_app_wrk_rpc_msg_t; typedef struct app_session_event_ diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index b3a7eb2bb60..8cea29a8642 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -384,6 +384,7 @@ typedef enum _(APP_DEL_SEGMENT, app_del_segment) \ _(MIGRATED, migrated) \ _(CLEANUP, cleanup) \ + _(APP_WRK_RPC, app_wrk_rpc) \ /* Deprecated and will be removed. Use types above */ #define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX |