diff options
author | Florin Coras <fcoras@cisco.com> | 2019-01-02 19:31:22 -0800 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2019-01-05 21:53:16 +0000 |
commit | 30e79c2e388a98160a3660f4f03103890c9b1b7c (patch) | |
tree | 0b108f43d95e28304924cc6e1d43900b3046c8de /src/vcl | |
parent | 3c1cf2c1716f436e5da4a106dd2b9a3df5d3a4a3 (diff) |
vcl/session: add api for changing session app worker
In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.
Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vcl')
-rw-r--r-- | src/vcl/ldp.c | 264 | ||||
-rw-r--r-- | src/vcl/vcl_bapi.c | 15 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 22 | ||||
-rw-r--r-- | src/vcl/vcl_private.h | 24 | ||||
-rw-r--r-- | src/vcl/vppcom.c | 206 | ||||
-rw-r--r-- | src/vcl/vppcom.h | 5 |
6 files changed, 368 insertions, 168 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index ee35396bc63..a61acb9a6ab 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -57,9 +57,11 @@ typedef struct ldp_fd_entry_ { u32 session_index; + u32 worker_index; u32 fd; u32 fd_index; u32 flags; + clib_spinlock_t lock; } ldp_fd_entry_t; typedef struct ldp_worker_ctx_ @@ -101,7 +103,7 @@ typedef struct ldp_worker_ctx_t *workers; int init; char app_name[LDP_APP_NAME_MAX]; - u32 sid_bit_val; + u32 sh_bit_val; u32 sid_bit_mask; u32 debug; ldp_fd_entry_t *fd_pool; @@ -119,7 +121,7 @@ typedef struct clib_warning ("ldp<%d>: " _fmt, getpid(), ##_args) static ldp_main_t ldp_main = { - .sid_bit_val = (1 << LDP_SID_BIT_MIN), + .sh_bit_val = (1 << LDP_SID_BIT_MIN), .sid_bit_mask = (1 << LDP_SID_BIT_MIN) - 1, .debug = LDP_DEBUG_INIT, }; @@ -154,44 +156,73 @@ ldp_get_app_name () return ldp->app_name; } +static inline vcl_session_handle_t +ldp_fd_entry_sh (ldp_fd_entry_t * fde) +{ + return vppcom_session_handle (fde->session_index); +} + static int -ldp_fd_alloc (u32 sid) +ldp_fd_alloc (vcl_session_handle_t sh) { ldp_fd_entry_t *fde; clib_rwlock_writer_lock (&ldp->fd_table_lock); - if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sid_bit_val) + if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sh_bit_val) { clib_rwlock_writer_unlock (&ldp->fd_table_lock); return -1; } pool_get (ldp->fd_pool, fde); - fde->session_index = vppcom_session_index (sid); + fde->session_index = vppcom_session_index (sh); + fde->worker_index = vppcom_session_worker (sh); fde->fd_index = fde - ldp->fd_pool; - fde->fd = fde->fd_index + ldp->sid_bit_val; + fde->fd = fde->fd_index + ldp->sh_bit_val; hash_set (ldp->session_index_to_fd_table, fde->session_index, fde->fd); + clib_spinlock_init (&fde->lock); clib_rwlock_writer_unlock (&ldp->fd_table_lock); return fde->fd; } static ldp_fd_entry_t * -ldp_fd_entry_get_w_lock (u32 fd_index) +ldp_fd_entry_get (u32 fd_index) { - clib_rwlock_reader_lock (&ldp->fd_table_lock); if (pool_is_free_index (ldp->fd_pool, fd_index)) return 0; - return pool_elt_at_index (ldp->fd_pool, fd_index); } +static ldp_fd_entry_t * +ldp_fd_entry_lock (u32 fd_index) +{ + ldp_fd_entry_t *fe; + clib_rwlock_reader_lock (&ldp->fd_table_lock); + if (pool_is_free_index (ldp->fd_pool, fd_index)) + { + clib_rwlock_reader_unlock (&ldp->fd_table_lock); + return 0; + } + + fe = pool_elt_at_index (ldp->fd_pool, fd_index); + clib_spinlock_lock (&fe->lock); + return fe; +} + +static void +ldp_fd_entry_unlock (ldp_fd_entry_t * fde) +{ + clib_spinlock_unlock (&fde->lock); + clib_rwlock_reader_unlock (&ldp->fd_table_lock); +} + static inline int -ldp_fd_from_sid (u32 sid) +ldp_fd_from_sh (vcl_session_handle_t sh) { uword *fdp; int fd; clib_rwlock_reader_lock (&ldp->fd_table_lock); - fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sid)); + fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sh)); fd = fdp ? *fdp : -EMFILE; clib_rwlock_reader_unlock (&ldp->fd_table_lock); @@ -199,52 +230,63 @@ ldp_fd_from_sid (u32 sid) } static inline int -ldp_fd_is_sid (int fd) +ldp_fd_is_sh (int fd) { - return fd >= ldp->sid_bit_val; + return fd >= ldp->sh_bit_val; } static inline u32 -ldp_sid_from_fd (int fd) +ldp_sh_from_fd (int fd) { u32 fd_index, session_index; ldp_fd_entry_t *fde; - if (!ldp_fd_is_sid (fd)) + if (!ldp_fd_is_sh (fd)) return INVALID_SESSION_ID; - fd_index = fd - ldp->sid_bit_val; - fde = ldp_fd_entry_get_w_lock (fd_index); + fd_index = fd - ldp->sh_bit_val; + fde = ldp_fd_entry_lock (fd_index); if (!fde) { LDBG (0, "unknown fd %d", fd); - clib_rwlock_reader_unlock (&ldp->fd_table_lock); return INVALID_SESSION_ID; } session_index = fde->session_index; - clib_rwlock_reader_unlock (&ldp->fd_table_lock); + ldp_fd_entry_unlock (fde); return vppcom_session_handle (session_index); } +static ldp_fd_entry_t * +ldp_fd_entry_lock_w_fd (int fd) +{ + u32 fd_index; + + if (!ldp_fd_is_sh (fd)) + return 0; + + fd_index = fd - ldp->sh_bit_val; + return ldp_fd_entry_lock (fd_index); +} + static void -ldp_fd_free_w_sid (u32 sid) +ldp_fd_free_w_sh (vcl_session_handle_t sh) { ldp_fd_entry_t *fde; u32 fd_index; int fd; - fd = ldp_fd_from_sid (sid); + fd = ldp_fd_from_sh (sh); if (!fd) return; - fd_index = fd - ldp->sid_bit_val; - fde = ldp_fd_entry_get_w_lock (fd_index); - if (fde) - { - hash_unset (ldp->session_index_to_fd_table, fde->session_index); - pool_put (ldp->fd_pool, fde); - } + fd_index = fd - ldp->sh_bit_val; + clib_rwlock_writer_lock (&ldp->fd_table_lock); + fde = ldp_fd_entry_get (fd_index); + ASSERT (fde != 0); + hash_unset (ldp->session_index_to_fd_table, fde->session_index); + clib_spinlock_free (&fde->lock); + pool_put (ldp->fd_pool, fde); clib_rwlock_writer_unlock (&ldp->fd_table_lock); } @@ -307,38 +349,38 @@ ldp_init (void) clib_warning ("LDP<%d>: WARNING: Invalid LDP sid bit specified in" " the env var " LDP_ENV_SID_BIT " (%s)! sid bit " "value %d (0x%x)", getpid (), env_var_str, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else if (sb < LDP_SID_BIT_MIN) { - ldp->sid_bit_val = (1 << LDP_SID_BIT_MIN); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << LDP_SID_BIT_MIN); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the" " env var " LDP_ENV_SID_BIT " (%s) is too small. " "Using LDP_SID_BIT_MIN (%d)! sid bit value %d (0x%x)", getpid (), sb, env_var_str, LDP_SID_BIT_MIN, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else if (sb > LDP_SID_BIT_MAX) { - ldp->sid_bit_val = (1 << LDP_SID_BIT_MAX); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << LDP_SID_BIT_MAX); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the" " env var " LDP_ENV_SID_BIT " (%s) is too big. Using" " LDP_SID_BIT_MAX (%d)! sid bit value %d (0x%x)", getpid (), sb, env_var_str, LDP_SID_BIT_MAX, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else { - ldp->sid_bit_val = (1 << sb); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << sb); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; LDBG (0, "configured LDP sid bit (%u) from " LDP_ENV_SID_BIT "! sid bit value %d (0x%x)", sb, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } } @@ -352,17 +394,18 @@ ldp_init (void) int close (int fd) { - int rv, refcnt; - u32 sid = ldp_sid_from_fd (fd); + int rv, refcnt, epfd; + ldp_fd_entry_t *fde; + u32 sh; if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { - int epfd; - - epfd = vppcom_session_attr (sid, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0); + sh = ldp_fd_entry_sh (fde); + epfd = vppcom_session_attr (sh, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0); if (epfd > 0) { LDBG (0, "fd %d (0x%x): calling libc_close: epfd %u (0x%x)", @@ -374,7 +417,7 @@ close (int fd) u32 size = sizeof (epfd); epfd = 0; - (void) vppcom_session_attr (sid, VPPCOM_ATTR_SET_LIBC_EPFD, + (void) vppcom_session_attr (sh, VPPCOM_ATTR_SET_LIBC_EPFD, &epfd, &size); } } @@ -382,21 +425,24 @@ close (int fd) { errno = -epfd; rv = -1; + ldp_fd_entry_unlock (fde); goto done; } LDBG (0, "fd %d (0x%x): calling vppcom_session_close: sid %u (0x%x)", - fd, fd, sid, sid); + fd, fd, sh, sh); - refcnt = vppcom_session_attr (sid, VPPCOM_ATTR_GET_REFCNT, 0, 0); - rv = vppcom_session_close (sid); + refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0); + rv = vppcom_session_close (sh); if (rv != VPPCOM_OK) { errno = -rv; rv = -1; } + + ldp_fd_entry_unlock (fde); if (refcnt <= 1) - ldp_fd_free_w_sid (sid); + ldp_fd_free_w_sh (sh); } else { @@ -413,23 +459,27 @@ done: ssize_t read (int fd, void *buf, size_t nbytes) { + vcl_session_handle_t sh; + ldp_fd_entry_t *fde; ssize_t size; - u32 sid = ldp_sid_from_fd (fd); if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { + sh = ldp_fd_entry_sh (fde); LDBG (2, "fd %d (0x%x): calling vppcom_session_read(): sid %u (0x%x)," - " buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes); + " buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes); - size = vppcom_session_read (sid, buf, nbytes); + size = vppcom_session_read (sh, buf, nbytes); if (size < 0) { errno = -size; size = -1; } + ldp_fd_entry_unlock (fde); } else { @@ -447,7 +497,7 @@ ssize_t readv (int fd, const struct iovec * iov, int iovcnt) { ssize_t size = 0; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); int rv = 0, i, total = 0; if ((errno = -ldp_init ())) @@ -504,23 +554,27 @@ readv (int fd, const struct iovec * iov, int iovcnt) ssize_t write (int fd, const void *buf, size_t nbytes) { + vcl_session_handle_t sh; + ldp_fd_entry_t *fde; ssize_t size = 0; - u32 sid = ldp_sid_from_fd (fd); if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { + sh = ldp_fd_entry_sh (fde); LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), " - "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes); + "buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes); - size = vppcom_session_write_msg (sid, (void *) buf, nbytes); + size = vppcom_session_write_msg (sh, (void *) buf, nbytes); if (size < 0) { errno = -size; size = -1; } + ldp_fd_entry_unlock (fde); } else { @@ -538,7 +592,7 @@ ssize_t writev (int fd, const struct iovec * iov, int iovcnt) { ssize_t size = 0, total = 0; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); int i, rv = 0; /* @@ -590,7 +644,7 @@ fcntl (int fd, int cmd, ...) const char *func_str = __func__; int rv = 0; va_list ap; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -681,7 +735,7 @@ ioctl (int fd, unsigned long int cmd, ...) const char *func_str; int rv; va_list ap; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -808,7 +862,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, time_out = -1; - if (nfds <= ldp->sid_bit_val) + if (nfds <= ldp->sh_bit_val) { func_str = "libc_pselect"; @@ -821,11 +875,11 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, goto done; } - if (PREDICT_FALSE (ldp->sid_bit_val > FD_SETSIZE / 2)) + if (PREDICT_FALSE (ldp->sh_bit_val > FD_SETSIZE / 2)) { clib_warning ("LDP<%d>: ERROR: LDP sid bit value %d (0x%x) > " "FD_SETSIZE/2 %d (0x%x)!", getpid (), - ldp->sid_bit_val, ldp->sid_bit_val, + ldp->sh_bit_val, ldp->sh_bit_val, FD_SETSIZE / 2, FD_SETSIZE / 2); errno = EOVERFLOW; return -1; @@ -845,7 +899,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->rd_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "readfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_rd_bitmap, fd, 1); @@ -877,7 +931,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->wr_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "writefds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_wr_bitmap, fd, 1); @@ -909,7 +963,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->ex_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "exceptfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_ex_bitmap, fd, 1); @@ -977,7 +1031,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->rd_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -993,7 +1047,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->wr_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -1009,7 +1063,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->ex_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -1237,7 +1291,7 @@ int bind (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1361,7 +1415,7 @@ getsockname (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len) { int rv; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1430,7 +1484,7 @@ int connect (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1516,7 +1570,7 @@ getpeername (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len) { int rv; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1586,7 +1640,7 @@ send (int fd, const void *buf, size_t n, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1644,7 +1698,7 @@ sendfile (int out_fd, int in_fd, off_t * offset, size_t len) ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); ssize_t size = 0; const char *func_str; - u32 sid = ldp_sid_from_fd (out_fd); + u32 sid = ldp_sh_from_fd (out_fd); if ((errno = -ldp_init ())) return -1; @@ -1888,7 +1942,7 @@ recv (int fd, void *buf, size_t n, int flags) if ((errno = -ldp_init ())) return -1; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); if (sid != INVALID_SESSION_ID) { LDBG (2, "fd %d (0x%x): calling vcl recvfrom: sid %u (0x%x), buf %p," @@ -1915,7 +1969,7 @@ sendto (int fd, const void *buf, size_t n, int flags, { ssize_t size; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2006,7 +2060,7 @@ recvfrom (int fd, void *__restrict buf, size_t n, int flags, { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2076,7 +2130,7 @@ sendmsg (int fd, const struct msghdr * message, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2125,7 +2179,7 @@ sendmmsg (int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2172,7 +2226,7 @@ recvmsg (int fd, struct msghdr * message, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2222,7 +2276,7 @@ recvmmsg (int fd, struct mmsghdr *vmessages, { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2271,7 +2325,7 @@ getsockopt (int fd, int level, int optname, { int rv; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); u32 buflen = optlen ? (u32) * optlen : 0; if ((errno = -ldp_init ())) @@ -2499,7 +2553,7 @@ setsockopt (int fd, int level, int optname, { int rv; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2663,7 +2717,7 @@ int listen (int fd, int n) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2696,13 +2750,14 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, socklen_t * __restrict addr_len, int flags) { int rv; - u32 listen_sid = ldp_sid_from_fd (listen_fd); - int accept_sid; + u32 listen_sh; + int accept_sh; if ((errno = -ldp_init ())) return -1; - if (listen_sid != INVALID_SESSION_ID) + listen_sh = ldp_sh_from_fd (listen_fd); + if (listen_sh != INVALID_SESSION_ID) { vppcom_endpt_t ep; u8 src_addr[sizeof (struct sockaddr_in6)]; @@ -2711,12 +2766,12 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, LDBG (0, "listen fd %d (0x%x): calling vppcom_session_accept:" " listen sid %u (0x%x), ep %p, flags 0x%x", listen_fd, - listen_fd, listen_sid, listen_sid, ep, flags); + listen_fd, listen_sh, listen_sh, ep, flags); - accept_sid = vppcom_session_accept (listen_sid, &ep, flags); - if (accept_sid < 0) + accept_sh = vppcom_session_accept (listen_sh, &ep, flags); + if (accept_sh < 0) { - errno = -accept_sid; + errno = -accept_sh; rv = -1; } else @@ -2724,16 +2779,16 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, rv = ldp_copy_ep_to_sockaddr (addr, addr_len, &ep); if (rv != VPPCOM_OK) { - (void) vppcom_session_close ((u32) accept_sid); + (void) vppcom_session_close ((u32) accept_sh); errno = -rv; rv = -1; } else { - rv = ldp_fd_alloc ((u32) accept_sid); + rv = ldp_fd_alloc ((u32) accept_sh); if (rv < 0) { - (void) vppcom_session_close ((u32) accept_sid); + (void) vppcom_session_close ((u32) accept_sh); errno = -rv; rv = -1; } @@ -2776,15 +2831,14 @@ shutdown (int fd, int how) if ((errno = -ldp_init ())) return -1; - if (ldp_fd_is_sid (fd)) + if (ldp_fd_is_sh (fd)) { - u32 fd_index = fd - ldp->sid_bit_val; + u32 fd_index = fd - ldp->sh_bit_val; ldp_fd_entry_t *fde; - fde = ldp_fd_entry_get_w_lock (fd_index); + fde = ldp_fd_entry_lock (fd_index); if (!fde) { - clib_rwlock_reader_unlock (&ldp->fd_table_lock); errno = ENOTCONN; return -1; } @@ -2799,7 +2853,7 @@ shutdown (int fd, int how) if ((fde->flags & LDP_F_SHUT_RD) && (fde->flags & LDP_F_SHUT_WR)) rv = close (fd); - clib_rwlock_reader_unlock (&ldp->fd_table_lock); + ldp_fd_entry_unlock (fde); LDBG (0, "fd %d (0x%x): calling vcl shutdown: how %d", fd, fd, how); } else @@ -2869,7 +2923,7 @@ epoll_create (int size) int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) { - u32 vep_idx = ldp_sid_from_fd (epfd), sid; + u32 vep_idx = ldp_sh_from_fd (epfd), sid; const char *func_str; int rv; @@ -2892,7 +2946,7 @@ epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) goto done; } - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (0, "epfd %d (0x%x), vep_idx %d (0x%x), sid %d (0x%x)", epfd, epfd, vep_idx, vep_idx, sid, sid); @@ -2995,7 +3049,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events, int maxevents, { ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); double time_to_wait = (double) 0, time_out, now = 0; - u32 vep_idx = ldp_sid_from_fd (epfd); + u32 vep_idx = ldp_sh_from_fd (epfd); int libc_epfd, rv = 0; if ((errno = -ldp_init ())) @@ -3115,7 +3169,7 @@ poll (struct pollfd *fds, nfds_t nfds, int timeout) LDBG (3, "fds[%d] fd %d (0x%0x) events = 0x%x revents = 0x%x", i, fds[i].fd, fds[i].fd, fds[i].events, fds[i].revents); - sid = ldp_sid_from_fd (fds[i].fd); + sid = ldp_sh_from_fd (fds[i].fd); if (sid != INVALID_SESSION_ID) { fds[i].fd = -fds[i].fd; diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index de5e80a55b3..5b9a9d5d3ce 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -610,21 +610,6 @@ vppcom_send_unbind_sock (u64 vpp_handle) } void -vppcom_send_accept_session_reply (u64 handle, u32 context, int retval) -{ - vcl_worker_t *wrk = vcl_worker_get_current (); - vl_api_accept_session_reply_t *rmp; - - rmp = vl_msg_api_alloc (sizeof (*rmp)); - memset (rmp, 0, sizeof (*rmp)); - rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); - rmp->retval = htonl (retval); - rmp->context = context; - rmp->handle = handle; - vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp); -} - -void vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert, u32 cert_len) { diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index d82a7ff3355..32664312f01 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -375,21 +375,23 @@ vcl_worker_share_session (vcl_worker_t * parent, vcl_worker_t * wrk, vcl_session_t * new_s) { vcl_shared_session_t *ss; - vcl_session_t *s; + vcl_session_t *old_s; - s = vcl_session_get (parent, new_s->session_index); - if (s->shared_index == ~0) + if (new_s->shared_index == ~0) { ss = vcl_shared_session_alloc (); + ss->session_index = new_s->session_index; vec_add1 (ss->workers, parent->wrk_index); - s->shared_index = ss->ss_index; + vec_add1 (ss->workers, wrk->wrk_index); + new_s->shared_index = ss->ss_index; + old_s = vcl_session_get (parent, new_s->session_index); + old_s->shared_index = ss->ss_index; } else { - ss = vcl_shared_session_get (s->shared_index); + ss = vcl_shared_session_get (new_s->shared_index); + vec_add1 (ss->workers, wrk->wrk_index); } - new_s->shared_index = ss->ss_index; - vec_add1 (ss->workers, wrk->wrk_index); } int @@ -414,6 +416,12 @@ vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s) return 1; } + /* If the first removed and not last, start session worker change. + * First request goes to vpp and vpp reflects it back to the right + * worker */ + if (i == 0) + vcl_send_session_worker_update (wrk, s, ss->workers[0]); + return 0; } diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 9dce5182193..2ae4b72f6a4 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -69,7 +69,8 @@ typedef enum STATE_ACCEPT = 0x08, STATE_VPP_CLOSING = 0x10, STATE_DISCONNECT = 0x20, - STATE_FAILED = 0x40 + STATE_FAILED = 0x40, + STATE_UPDATED = 0x80, } session_state_t; #define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING) @@ -144,6 +145,7 @@ typedef struct vcl_shared_session_ { u32 ss_index; u32 *workers; + u32 session_index; } vcl_shared_session_t; typedef struct @@ -287,6 +289,8 @@ typedef struct vcl_worker_ /** Vector of unhandled events */ session_event_t *unhandled_evts_vector; + u32 *pending_session_wrk_updates; + /** Used also as a thread stop key buffer */ pthread_t thread_id; @@ -517,6 +521,7 @@ int vcl_worker_register_with_vpp (void); int vcl_worker_set_bapi (void); void vcl_worker_share_sessions (vcl_worker_t * parent_wrk); int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s); +vcl_shared_session_t *vcl_shared_session_get (u32 ss_index); int vcl_session_get_refcnt (vcl_session_t * s); void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index); @@ -543,6 +548,17 @@ vcl_worker_get_current (void) return vcl_worker_get (vcl_get_worker_index ()); } +static inline svm_msg_q_t * +vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) +{ + if (vcl_session_is_ct (s)) + return wrk->vpp_event_queues[0]; + else + return wrk->vpp_event_queues[s->vpp_thread_index]; +} + +void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, + u32 wrk_index); /* * VCL Binary API */ @@ -556,12 +572,10 @@ void vppcom_send_disconnect_session (u64 vpp_handle); void vppcom_send_bind_sock (vcl_session_t * session); void vppcom_send_unbind_sock (u64 vpp_handle); void vppcom_api_hookup (void); -void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv); void vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert, u32 cert_len); -void -vppcom_send_application_tls_key_add (vcl_session_t * session, char *key, - u32 key_len); +void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key, + u32 key_len); void vcl_send_app_worker_add_del (u8 is_add); void vcl_send_child_worker_del (vcl_worker_t * wrk); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 70afdce7832..6a1bf1cfcc7 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -44,6 +44,22 @@ vcl_wait_for_segment (u64 segment_handle) return 1; } +static inline int +vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq) +{ + svm_msg_q_msg_t *msg; + u32 n_msgs; + int i; + + n_msgs = svm_msg_q_size (mq); + for (i = 0; i < n_msgs; i++) + { + vec_add2 (wrk->mq_msg_vector, msg, 1); + svm_msg_q_sub_w_lock (mq, msg); + } + return n_msgs; +} + const char * vppcom_session_state_str (session_state_t state) { @@ -175,15 +191,6 @@ format_ip46_address (u8 * s, va_list * args) */ -static svm_msg_q_t * -vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) -{ - if (vcl_session_is_ct (s)) - return wrk->vpp_event_queues[0]; - else - return wrk->vpp_event_queues[s->vpp_thread_index]; -} - static void vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context, session_handle_t handle, int retval) @@ -227,6 +234,24 @@ vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context, app_send_ctrl_evt_to_vpp (mq, app_evt); } +void +vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, + u32 wrk_index) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_worker_update_msg_t *mp; + svm_msg_q_t *mq; + + mq = vcl_session_vpp_evt_q (wrk, s); + app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE); + mp = (session_worker_update_msg_t *) app_evt->evt->data; + mp->client_index = wrk->my_client_index; + mp->handle = s->vpp_handle; + mp->req_wrk_index = wrk->vpp_wrk_index; + mp->wrk_index = wrk_index; + app_send_ctrl_evt_to_vpp (mq, app_evt); +} + static u32 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) { @@ -540,7 +565,6 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk, /* Caught a disconnect before actually accepting the session */ if (session->session_state == STATE_LISTEN) { - if (!vcl_flag_accepted_session (session, msg->handle, VCL_ACCEPTED_F_CLOSED)) VDBG (0, "session was not accepted!"); @@ -551,6 +575,59 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk, return session; } +static void +vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data) +{ + session_req_worker_update_msg_t *msg; + vcl_session_t *s; + + msg = (session_req_worker_update_msg_t *) data; + s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle); + if (!s) + return; + + vec_add1 (wrk->pending_session_wrk_updates, s->session_index); +} + +static void +vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data) +{ + session_worker_update_reply_msg_t *msg; + vcl_session_t *s; + + msg = (session_worker_update_reply_msg_t *) data; + s = vcl_session_get_w_vpp_handle (wrk, msg->handle); + if (!s) + { + VDBG (0, "unknown handle 0x%llx", msg->handle); + return; + } + if (vcl_wait_for_segment (msg->segment_handle)) + { + clib_warning ("segment for session %u couldn't be mounted!", + s->session_index); + return; + } + s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *); + s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *); + + s->rx_fifo->client_session_index = s->session_index; + s->tx_fifo->client_session_index = s->session_index; + s->rx_fifo->client_thread_index = wrk->wrk_index; + s->tx_fifo->client_thread_index = wrk->wrk_index; + s->session_state = STATE_UPDATED; + + if (s->shared_index != VCL_INVALID_SESSION_INDEX) + { + vcl_shared_session_t *ss; + ss = vcl_shared_session_get (s->shared_index); + if (vec_len (ss->workers) > 1) + VDBG (0, "workers need to be updated"); + } + VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index, + s->vpp_handle, wrk->wrk_index); +} + static int vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) { @@ -587,13 +664,19 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) case SESSION_CTRL_EVT_BOUND: vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data); break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; default: clib_warning ("unhandled %u", e->event_type); } return VPPCOM_OK; } -static inline int +static int vppcom_wait_for_session_state_change (u32 session_index, session_state_t state, f64 wait_for_time) @@ -638,6 +721,52 @@ vppcom_wait_for_session_state_change (u32 session_index, return VPPCOM_ETIMEDOUT; } +static void +vcl_handle_pending_wrk_updates (vcl_worker_t * wrk) +{ + session_state_t state; + vcl_session_t *s; + u32 *sip; + + if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0)) + return; + + vec_foreach (sip, wrk->pending_session_wrk_updates) + { + s = vcl_session_get (wrk, *sip); + vcl_send_session_worker_update (wrk, s, wrk->wrk_index); + state = s->session_state; + vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5); + s->session_state = state; + } + vec_reset_length (wrk->pending_session_wrk_updates); +} + +static void +vcl_flush_mq_events (void) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + svm_msg_q_msg_t *msg; + session_event_t *e; + svm_msg_q_t *mq; + int i; + + mq = wrk->app_event_queue; + svm_msg_q_lock (mq); + vcl_mq_dequeue_batch (wrk, mq); + svm_msg_q_unlock (mq); + + for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) + { + msg = vec_elt_at_index (wrk->mq_msg_vector, i); + e = svm_msg_q_msg_data (mq, msg); + vcl_handle_mq_event (wrk, e); + svm_msg_q_free_msg (mq, msg); + } + vec_reset_length (wrk->mq_msg_vector); + vcl_handle_pending_wrk_updates (wrk); +} + static int vppcom_app_session_enable (void) { @@ -845,13 +974,14 @@ static void vcl_app_pre_fork (void) { vcl_incercept_sigchld (); + vcl_flush_mq_events (); } static void vcl_app_fork_child_handler (void) { + vcl_worker_t *parent_wrk, *wrk; int rv, parent_wrk_index; - vcl_worker_t *parent_wrk; u8 *child_name; parent_wrk_index = vcl_get_worker_index (); @@ -884,6 +1014,8 @@ vcl_app_fork_child_handler (void) */ vcl_worker_register_with_vpp (); parent_wrk = vcl_worker_get (parent_wrk_index); + wrk = vcl_worker_get_current (); + wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues); vcl_worker_share_sessions (parent_wrk); parent_wrk->forked_child = vcl_get_worker_index (); @@ -1097,7 +1229,11 @@ vppcom_session_close (uint32_t session_handle) } if (!do_disconnect) - goto cleanup; + { + VDBG (0, "session handle %u [0x%llx] disconnect skipped", + session_handle, vpp_handle); + goto cleanup; + } if (state & STATE_LISTEN) { @@ -1143,10 +1279,7 @@ cleanup: vcl_ct_registration_unlock (wrk); } - if (vpp_handle != ~0) - { - vcl_session_table_del_vpp_handle (wrk, vpp_handle); - } + vcl_session_table_del_vpp_handle (wrk, vpp_handle); vcl_session_free (wrk, session); VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle); @@ -1948,22 +2081,6 @@ vppcom_session_write_ready (vcl_session_t * session) return svm_fifo_max_enqueue (session->tx_fifo); } -static inline int -vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq) -{ - svm_msg_q_msg_t *msg; - u32 n_msgs; - int i; - - n_msgs = svm_msg_q_size (mq); - for (i = 0; i < n_msgs; i++) - { - vec_add2 (wrk->mq_msg_vector, msg, 1); - svm_msg_q_sub_w_lock (mq, msg); - } - return n_msgs; -} - #define vcl_fifo_rx_evt_valid_or_break(_fifo) \ if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \ { \ @@ -2067,6 +2184,12 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, *bits_set += 1; } break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; default: clib_warning ("unhandled: %u", e->event_type); break; @@ -2122,6 +2245,7 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, svm_msg_q_free_msg (mq, msg); } vec_reset_length (wrk->mq_msg_vector); + vcl_handle_pending_wrk_updates (wrk); return *bits_set; } @@ -2676,6 +2800,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, session_evt_data = session->vep.ev.data.u64; session_events = session->vep.ev.events; break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; default: VDBG (0, "unhandled: %u", e->event_type); break; @@ -2741,7 +2871,7 @@ handle_dequeued: svm_msg_q_free_msg (mq, msg); } vec_reset_length (wrk->mq_msg_vector); - + vcl_handle_pending_wrk_updates (wrk); return *num_ev; } @@ -3580,12 +3710,18 @@ vppcom_mq_epoll_fd (void) } int -vppcom_session_index (uint32_t session_handle) +vppcom_session_index (vcl_session_handle_t session_handle) { return session_handle & 0xFFFFFF; } int +vppcom_session_worker (vcl_session_handle_t session_handle) +{ + return session_handle >> 24; +} + +int vppcom_session_handle (uint32_t session_index) { return (vcl_get_worker_index () << 24) | session_index; diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h index 641946be55a..f2fca09b512 100644 --- a/src/vcl/vppcom.h +++ b/src/vcl/vppcom.h @@ -97,6 +97,8 @@ typedef struct vppcom_endpt_t_ uint16_t port; } vppcom_endpt_t; +typedef uint32_t vcl_session_handle_t; + typedef enum { VPPCOM_OK = 0, @@ -277,7 +279,8 @@ extern int vppcom_session_sendto (uint32_t session_handle, void *buffer, extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time); extern int vppcom_mq_epoll_fd (void); -extern int vppcom_session_index (uint32_t session_handle); +extern int vppcom_session_index (vcl_session_handle_t session_handle); +extern int vppcom_session_worker (vcl_session_handle_t session_handle); extern int vppcom_session_handle (uint32_t session_index); extern int vppcom_session_read_segments (uint32_t session_handle, |