From ab2f6dbf9f7b7164a9810f4c80c8abf8463e42ad Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Fri, 31 Aug 2018 14:31:41 -0700 Subject: session: support multiple worker binds Allows app workers to listen on the same session endpoint. Incoming connects are spread across the workers in a round-robin fashion Change-Id: Ib5f5817230d9abc6127a85cdbdcad70d980c0f7f Signed-off-by: Florin Coras --- src/vcl/vcl_bapi.c | 43 +++++++++++++++++++++++++++++++------------ src/vcl/vcl_debug.h | 2 +- src/vcl/vcl_private.h | 2 +- src/vcl/vcl_test_server.c | 6 +++--- src/vcl/vppcom.c | 39 +++++++++++++++++++++++---------------- 5 files changed, 59 insertions(+), 33 deletions(-) (limited to 'src/vcl') diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index 44e6d9d1d04..0c46d8220fd 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -138,9 +138,9 @@ static void vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * mp) { + int n_fds = 0, *fds = 0; vcl_worker_t *wrk; - int n_fds = 0; - int *fds = 0; + u32 wrk_index; if (mp->retval) { @@ -148,14 +148,14 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t * format_api_error, ntohl (mp->retval)); goto failed; } - ASSERT (mp->context == mp->wrk_index); - if (mp->context != mp->wrk_index) + wrk_index = clib_net_to_host_u32 (mp->wrk_index); + if (mp->context != wrk_index) { clib_warning ("VCL<%d>: wrk numbering doesn't match ours: %u, vpp: %u", - getpid (), mp->context, mp->wrk_index); + getpid (), mp->context, wrk_index); goto failed; } - wrk = vcl_worker_get (mp->context); + wrk = vcl_worker_get (wrk_index); wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *); @@ -301,10 +301,31 @@ vl_api_unbind_sock_reply_t_handler (vl_api_unbind_sock_reply_t * mp) VDBG (1, "VCL<%d>: sid %u: unbind succeeded!", getpid (), mp->context); } +static void +vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * + mp) +{ + if (mp->retval) + clib_warning ("VCL<%d>: ERROR: sid %u: disconnect failed: %U", + getpid (), mp->context, format_api_error, + ntohl (mp->retval)); +} + +static void +vl_api_connect_sock_reply_t_handler (vl_api_connect_sock_reply_t * mp) +{ + if (mp->retval) + clib_warning ("VCL<%d>: ERROR: sid %u: connect failed: %U", + getpid (), mp->context, format_api_error, + ntohl (mp->retval)); +} + #define foreach_sock_msg \ _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \ _(BIND_SOCK_REPLY, bind_sock_reply) \ _(UNBIND_SOCK_REPLY, unbind_sock_reply) \ +_(CONNECT_SOCK_REPLY, connect_sock_reply) \ +_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ _(APPLICATION_ATTACH_REPLY, application_attach_reply) \ _(APPLICATION_DETACH_REPLY, application_detach_reply) \ _(MAP_ANOTHER_SEGMENT, map_another_segment) \ @@ -421,13 +442,12 @@ vppcom_send_connect_sock (vcl_session_t * session) { vl_api_connect_sock_t *cmp; - /* Assumes caller as acquired the spinlock: vcm->sessions_lockp */ cmp = vl_msg_api_alloc (sizeof (*cmp)); memset (cmp, 0, sizeof (*cmp)); cmp->_vl_msg_id = ntohs (VL_API_CONNECT_SOCK); cmp->client_index = vcm->my_client_index; cmp->context = session->session_index; - + cmp->wrk_index = vcl_get_worker_index (); cmp->is_ip4 = session->transport.is_ip4; clib_memcpy (cmp->ip, &session->transport.rmt_ip, sizeof (cmp->ip)); cmp->port = session->transport.rmt_port; @@ -437,13 +457,10 @@ vppcom_send_connect_sock (vcl_session_t * session) } void -vppcom_send_disconnect_session (u64 vpp_handle, u32 session_index) +vppcom_send_disconnect_session (u64 vpp_handle) { vl_api_disconnect_session_t *dmp; - VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect msg", - getpid (), vpp_handle, session_index); - dmp = vl_msg_api_alloc (sizeof (*dmp)); memset (dmp, 0, sizeof (*dmp)); dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION); @@ -467,6 +484,7 @@ vppcom_send_bind_sock (vcl_session_t * session) bmp->_vl_msg_id = ntohs (VL_API_BIND_SOCK); bmp->client_index = vcm->my_client_index; bmp->context = session->session_index; + bmp->wrk_index = vcl_get_worker_index (); bmp->is_ip4 = session->transport.is_ip4; clib_memcpy (bmp->ip, &session->transport.lcl_ip, sizeof (bmp->ip)); bmp->port = session->transport.lcl_port; @@ -485,6 +503,7 @@ vppcom_send_unbind_sock (u64 vpp_handle) ump->_vl_msg_id = ntohs (VL_API_UNBIND_SOCK); ump->client_index = vcm->my_client_index; + ump->wrk_index = vcl_get_worker_index (); ump->handle = vpp_handle; vl_msg_api_send_shmem (vcm->vl_input_queue, (u8 *) & ump); } diff --git a/src/vcl/vcl_debug.h b/src/vcl/vcl_debug.h index 13e6726c259..3faa45822f8 100644 --- a/src/vcl/vcl_debug.h +++ b/src/vcl/vcl_debug.h @@ -22,7 +22,7 @@ #define VDBG(_lvl, _fmt, _args...) \ if (vcm->debug > _lvl) \ - clib_warning (_fmt, ##_args) + clib_warning ("vcl: " _fmt, __vcl_worker_index, ##_args) #define foreach_vcl_dbg_evt \ _(INIT, "vcl init track") \ diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 1a9bb412fd7..ba9094cd96b 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -485,7 +485,7 @@ void vppcom_send_session_enable_disable (u8 is_enable); void vppcom_app_send_attach (void); void vppcom_app_send_detach (void); void vppcom_send_connect_sock (vcl_session_t * session); -void vppcom_send_disconnect_session (u64 vpp_handle, u32 session_index); +void vppcom_send_disconnect_session (u64 vpp_handle); void vppcom_send_bind_sock (vcl_session_t * session); void vppcom_send_unbind_sock (u64 vpp_handle); void vppcom_api_hookup (void); diff --git a/src/vcl/vcl_test_server.c b/src/vcl/vcl_test_server.c index 8b8b77fab2c..d1f2db72872 100644 --- a/src/vcl/vcl_test_server.c +++ b/src/vcl/vcl_test_server.c @@ -472,10 +472,9 @@ vcl_test_server_handle_cfg (vcl_test_server_worker_t * wrk, break; case SOCK_TEST_TYPE_EXIT: - vtinf ("Have a great day conn %d!", conn->fd); + vtinf ("Have a great day conn %d (closing)!", conn->fd); vppcom_session_close (conn->fd); conn_pool_free (conn); - vtinf ("Closed client fd %d", conn->fd); wrk->nfds--; break; @@ -644,6 +643,7 @@ main (int argc, char **argv) clib_mem_init_thread_safe (0, 64 << 20); ssm->cfg.port = SOCK_TEST_SERVER_PORT; ssm->cfg.workers = 1; + ssm->active_workers = 1; vcl_test_server_process_opts (ssm, argc, argv); rv = vppcom_app_create ("vcl_test_server"); @@ -661,7 +661,7 @@ main (int argc, char **argv) } vcl_test_server_worker_loop (&ssm->workers[0]); - while (ssm->active_workers) + while (ssm->active_workers > 0) ; vppcom_app_destroy (); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 87f29e35311..60d5eb3539a 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -614,13 +614,13 @@ vppcom_app_attach (void) } static int -vppcom_session_unbind (u32 session_index) +vppcom_session_unbind (u32 session_handle) { vcl_worker_t *wrk = vcl_worker_get_current (); vcl_session_t *session = 0; u64 vpp_handle; - session = vcl_session_get (wrk, session_index); + session = vcl_session_get_w_handle (wrk, session_handle); if (!session) return VPPCOM_EBADFD; @@ -630,7 +630,7 @@ vppcom_session_unbind (u32 session_index) session->session_state = STATE_DISCONNECT; VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state" - " 0x%x (%s)", getpid (), vpp_handle, session_index, STATE_DISCONNECT, + " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT, vppcom_session_state_str (STATE_DISCONNECT)); vcl_evt (VCL_EVT_UNBIND, session); vppcom_send_unbind_sock (vpp_handle); @@ -639,7 +639,7 @@ vppcom_session_unbind (u32 session_index) } static int -vppcom_session_disconnect (u32 session_index) +vppcom_session_disconnect (u32 session_handle) { vcl_worker_t *wrk = vcl_worker_get_current (); svm_msg_q_t *vpp_evt_q; @@ -647,18 +647,18 @@ vppcom_session_disconnect (u32 session_index) session_state_t state; u64 vpp_handle; - session = vcl_session_get (wrk, session_index); + session = vcl_session_get_w_handle (wrk, session_handle); vpp_handle = session->vpp_handle; state = session->session_state; VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (), - vpp_handle, session_index, state, vppcom_session_state_str (state)); + vpp_handle, session_handle, state, vppcom_session_state_str (state)); if (PREDICT_FALSE (state & STATE_LISTEN)) { clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: " "Cannot disconnect a listen socket!", - getpid (), vpp_handle, session_index); + getpid (), vpp_handle, session_handle); return VPPCOM_EBADFD; } @@ -668,13 +668,13 @@ vppcom_session_disconnect (u32 session_index) vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index, vpp_handle, 0); VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect " - "REPLY...", getpid (), vpp_handle, session_index); + "REPLY...", getpid (), vpp_handle, session_handle); } else { VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...", - getpid (), vpp_handle, session_index); - vppcom_send_disconnect_session (vpp_handle, session_index); + getpid (), vpp_handle, session_handle); + vppcom_send_disconnect_session (vpp_handle); } return VPPCOM_OK; @@ -1142,7 +1142,7 @@ handle: vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session, client_session_index); - return client_session_index; + return vcl_session_handle (client_session); } int @@ -1653,6 +1653,12 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, } break; case SESSION_IO_EVT_CT_TX: + if (svm_fifo_is_empty (e->fifo)) + { + svm_fifo_unset_event (e->fifo); + if (svm_fifo_is_empty (e->fifo)) + break; + } session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0); sid = session->session_index; if (sid < n_bits && read_map) @@ -1977,7 +1983,7 @@ vppcom_epoll_create (void) VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!", getpid (), vep_session->session_index, vep_session->session_index); - return (vep_session->session_index); + return vcl_session_handle (vep_session); } int @@ -2037,7 +2043,8 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, if (vep_session->vep.next_sh != ~0) { vcl_session_t *next_session; - next_session = vcl_session_get (wrk, vep_session->vep.next_sh); + next_session = vcl_session_get_w_handle (wrk, + vep_session->vep.next_sh); if (PREDICT_FALSE (!next_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid " @@ -2121,7 +2128,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, else { vcl_session_t *prev_session; - prev_session = vcl_session_get (wrk, session->vep.prev_sh); + prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh); if (PREDICT_FALSE (!prev_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid " @@ -2135,7 +2142,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle, if (session->vep.next_sh != ~0) { vcl_session_t *next_session; - next_session = vcl_session_get (wrk, session->vep.next_sh); + next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh); if (PREDICT_FALSE (!next_session)) { clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid " @@ -2314,7 +2321,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, session_events = session->vep.ev.events; break; default: - clib_warning ("unhandled: %u", e->event_type); + VDBG (0, "unhandled: %u", e->event_type); svm_msg_q_free_msg (mq, msg); continue; } -- cgit 1.2.3-korg