From 40c07ce7a78af69f7354222d4663a65cd5572049 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Thu, 16 Jul 2020 20:46:17 -0700 Subject: vcl: support inter worker rpc Type: feature Signed-off-by: Florin Coras Change-Id: I664cd14c84fc5cf2ffe61efce99c95219b44fad7 --- src/vcl/vcl_locked.c | 143 ++++++++++++++++++++++++------- src/vcl/vcl_private.h | 6 ++ src/vcl/vppcom.c | 48 +++++++++++ src/vnet/session/application_interface.h | 7 ++ src/vnet/session/session_node.c | 30 +++++++ src/vnet/session/session_types.h | 1 + 6 files changed, 205 insertions(+), 30 deletions(-) diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c index 19522c05d61..fc48618429b 100644 --- a/src/vcl/vcl_locked.c +++ b/src/vcl/vcl_locked.c @@ -38,6 +38,7 @@ typedef struct vls_worker_ vcl_locked_session_t *vls_pool; uword *session_index_to_vlsh_table; u32 wrk_index; + volatile int rpc_done; } vls_worker_t; typedef struct vls_local_ @@ -64,6 +65,24 @@ typedef struct vls_main_ vls_main_t *vlsm; +typedef enum vls_rpc_msg_type_ +{ + VLS_RPC_CLONE_AND_SHARE, +} vls_rpc_msg_type_e; + +typedef struct vls_rpc_msg_ +{ + u8 type; + u8 data[0]; +} vls_rpc_msg_t; + +typedef struct vls_clone_and_share_msg_ +{ + u32 vls_index; /**< vls to be shared */ + u32 origin_vls_wrk; /**< worker that initiated the rpc */ + u32 origin_vls_index; /**< vls session of the originator */ +} vls_clone_and_share_msg_t; + static inline u32 vls_get_worker_index (void) { @@ -614,51 +633,50 @@ done: } void -vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk, - vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk) +vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls) { - vcl_locked_session_t *parent_vls; + vls_shared_data_t *vls_shd; + + u32 vls_shd_index = vls_shared_data_alloc (); + + vls_shared_data_pool_rlock (); + + vls_shd = vls_shared_data_get (vls_shd_index); + vls_shd->owner_wrk_index = vls_wrk->wrk_index; + vls->shared_data_index = vls_shd_index; + vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index); + + vls_shared_data_pool_runlock (); +} + +void +vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls) +{ + vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index); vls_shared_data_t *vls_shd; vcl_session_t *s; s = vcl_session_get (vcl_wrk, vls->session_index); if (!s) { - clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE", - vcl_wrk->wrk_index, vls_parent_wrk->wrk_index, - vls->session_index, vls->vls_index); + clib_warning ("wrk %u session %u vls %u NOT AVAILABLE", + vcl_wrk->wrk_index, vls->session_index, vls->vls_index); return; } + ASSERT (vls->shared_data_index != ~0); + /* Reinit session lock */ clib_spinlock_init (&vls->lock); - if (vls->shared_data_index != ~0) - { - vls_shared_data_pool_rlock (); - vls_shd = vls_shared_data_get (vls->shared_data_index); - } - else - { - u32 vls_shd_index = vls_shared_data_alloc (); - - vls_shared_data_pool_rlock (); - - vls_shd = vls_shared_data_get (vls_shd_index); - vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index; - vls->shared_data_index = vls_shd_index; + vls_shared_data_pool_rlock (); - /* Update parent shared data */ - parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index); - parent_vls->shared_data_index = vls_shd_index; - vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index); - } + vls_shd = vls_shared_data_get (vls->shared_data_index); clib_spinlock_lock (&vls_shd->lock); - vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index); - clib_spinlock_unlock (&vls_shd->lock); + vls_shared_data_pool_runlock (); if (s->rx_fifo) @@ -675,12 +693,18 @@ vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk, static void vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk) { - vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index); - vcl_locked_session_t *vls; + vcl_locked_session_t *vls, *parent_vls; /* *INDENT-OFF* */ pool_foreach (vls, vls_wrk->vls_pool, ({ - vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk); + /* Initialize sharing on parent session */ + if (vls->shared_data_index == ~0) + { + parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index); + vls_init_share_session (vls_parent_wrk, parent_vls); + vls->shared_data_index = parent_vls->shared_data_index; + } + vls_share_session (vls_wrk, vls); })); /* *INDENT-ON* */ } @@ -1358,6 +1382,64 @@ vls_app_exit (void) vls_worker_free (wrk); } +static void +vls_clone_and_share_rpc_handler (void *args) +{ + vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args; + vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk; + vcl_locked_session_t *vls, *dst_vls; + vcl_worker_t *dst_vcl_wrk; + vcl_session_t *s, *dst_s; + + vls = vls_session_get (wrk, msg->vls_index); + vls_init_share_session (wrk, vls); + + s = vcl_session_get (vcl_worker_get_current (), vls->session_index); + dst_wrk = vls_worker_get (msg->origin_vls_wrk); + dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk); + dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index); + dst_vls->shared_data_index = vls->shared_data_index; + dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index); + clib_memcpy (dst_s, s, sizeof (*s)); + + dst_wrk->rpc_done = 1; +} + +static void +vls_rpc_handler (void *args) +{ + vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args; + switch (msg->type) + { + case VLS_RPC_CLONE_AND_SHARE: + vls_clone_and_share_rpc_handler (msg->data); + break; + default: + break; + } +} + +void +vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls, + u32 dst_wrk_index, u32 dst_vls_index) +{ + u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)]; + vls_clone_and_share_msg_t *msg; + vls_rpc_msg_t *rpc; + + rpc = (vls_rpc_msg_t *) & data; + rpc->type = VLS_RPC_CLONE_AND_SHARE; + msg = (vls_clone_and_share_msg_t *) & rpc->data; + msg->origin_vls_wrk = wrk->wrk_index; + msg->origin_vls_index = vls->vls_index; + msg->vls_index = dst_vls_index; + + wrk->rpc_done = 0; + vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data)); + while (!wrk->rpc_done) + ; +} + int vls_app_create (char *app_name) { @@ -1378,6 +1460,7 @@ vls_app_create (char *app_name) vls_worker_alloc (); vlsl->vls_wrk_index = vcl_get_worker_index (); vls_mt_locks_init (); + vcm->wrk_rpc_fn = vls_rpc_handler; return VPPCOM_OK; } diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 4a739e64aff..4a25632379c 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -309,6 +309,8 @@ typedef struct vcl_worker_ api_main_t bapi_api_ctx; } vcl_worker_t; +typedef void (vcl_rpc_fn_t) (void *args); + typedef struct vppcom_main_t_ { u8 is_init; @@ -357,6 +359,8 @@ typedef struct vppcom_main_t_ /* VNET_API_ERROR_FOO -> "Foo" hash table */ uword *error_string_by_error_number; + vcl_rpc_fn_t *wrk_rpc_fn; + } vppcom_main_t; extern vppcom_main_t *vcm; @@ -655,6 +659,8 @@ vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, u32 wrk_index); +void vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len); + /* * VCL Binary API */ diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index f1478f890c9..41d2f3170aa 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -351,6 +351,36 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, app_send_ctrl_evt_to_vpp (mq, app_evt); } +void +vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_app_wrk_rpc_msg_t *mp; + vcl_worker_t *dst_wrk, *wrk; + svm_msg_q_t *mq; + + if (data_len > sizeof (mp->data)) + goto done; + + clib_spinlock_lock (&vcm->workers_lock); + + dst_wrk = vcl_worker_get_if_valid (dst_wrk_index); + if (!dst_wrk) + goto done; + + wrk = vcl_worker_get_current (); + mq = vcl_worker_ctrl_mq (wrk); + app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC); + mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data; + mp->client_index = wrk->my_client_index; + mp->wrk_index = dst_wrk->vpp_wrk_index; + clib_memcpy (mp->data, data, data_len); + app_send_ctrl_evt_to_vpp (mq, app_evt); + +done: + clib_spinlock_unlock (&vcm->workers_lock); +} + static u32 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp, u32 ls_index) @@ -866,6 +896,15 @@ vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data) VDBG (1, "Unmapped segment: %d", msg->segment_handle); } +static void +vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data) +{ + if (!vcm->wrk_rpc_fn) + return; + + (vcm->wrk_rpc_fn) (data); +} + static int vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) { @@ -923,6 +962,9 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; + case SESSION_CTRL_EVT_RPC: + vcl_worker_rpc_handler (wrk, e->data); + break; default: clib_warning ("unhandled %u", e->event_type); } @@ -2259,6 +2301,9 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; + case SESSION_CTRL_EVT_RPC: + vcl_worker_rpc_handler (wrk, e->data); + break; default: clib_warning ("unhandled: %u", e->event_type); break; @@ -2875,6 +2920,9 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, case SESSION_CTRL_EVT_APP_DEL_SEGMENT: vcl_session_app_del_segment_handler (wrk, e->data); break; + case SESSION_CTRL_EVT_RPC: + vcl_worker_rpc_handler (wrk, e->data); + break; default: VDBG (0, "unhandled: %u", e->event_type); break; diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 7a3eeeb34c4..d1897934d61 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -527,6 +527,13 @@ typedef struct session_cleanup_msg_ u8 type; } __clib_packed session_cleanup_msg_t; +typedef struct session_app_wrk_rpc_msg_ +{ + u32 client_index; /**< app client index */ + u32 wrk_index; /**< dst worker index */ + u8 data[252]; /**< rpc data */ +} __clib_packed session_app_wrk_rpc_msg_t; + typedef struct app_session_event_ { svm_msg_q_msg_t msg; diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 738f0b90124..763b789eef2 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -490,6 +490,33 @@ session_mq_worker_update_handler (void *data) app_worker_close_notify (app_wrk, s); } +static void +session_mq_app_wrk_rpc_handler (void *data) +{ + session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data; + svm_msg_q_msg_t _msg, *msg = &_msg; + session_app_wrk_rpc_msg_t *rmp; + app_worker_t *app_wrk; + session_event_t *evt; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + + app_wrk = application_get_worker (app, mp->wrk_index); + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, + msg); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC; + rmp = (session_app_wrk_rpc_msg_t *) evt->data; + clib_memcpy (rmp->data, mp->data, sizeof (mp->data)); + svm_msg_q_add_and_unlock (app_wrk->event_queue, msg); +} + vlib_node_registration_t session_queue_node; typedef struct @@ -1226,6 +1253,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) case SESSION_CTRL_EVT_APP_DETACH: app_mq_detach_handler (session_evt_ctrl_data (wrk, elt)); break; + case SESSION_CTRL_EVT_APP_WRK_RPC: + session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt)); + break; default: clib_warning ("unhandled event type %d", e->event_type); } diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 784312d9182..b3a7eb2bb60 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -358,6 +358,7 @@ typedef enum SESSION_CTRL_EVT_APP_DEL_SEGMENT, SESSION_CTRL_EVT_MIGRATED, SESSION_CTRL_EVT_CLEANUP, + SESSION_CTRL_EVT_APP_WRK_RPC, } session_evt_type_t; #define foreach_session_ctrl_evt \ -- cgit 1.2.3-korg