diff options
-rw-r--r-- | src/vcl/ldp.c | 264 | ||||
-rw-r--r-- | src/vcl/vcl_bapi.c | 15 | ||||
-rw-r--r-- | src/vcl/vcl_private.c | 22 | ||||
-rw-r--r-- | src/vcl/vcl_private.h | 24 | ||||
-rw-r--r-- | src/vcl/vppcom.c | 206 | ||||
-rw-r--r-- | src/vcl/vppcom.h | 5 | ||||
-rw-r--r-- | src/vnet/session/application.c | 50 | ||||
-rw-r--r-- | src/vnet/session/application.h | 3 | ||||
-rw-r--r-- | src/vnet/session/application_interface.h | 48 | ||||
-rw-r--r-- | src/vnet/session/session.h | 5 | ||||
-rw-r--r-- | src/vnet/session/session_node.c | 85 |
11 files changed, 544 insertions, 183 deletions
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c index ee35396bc63..a61acb9a6ab 100644 --- a/src/vcl/ldp.c +++ b/src/vcl/ldp.c @@ -57,9 +57,11 @@ typedef struct ldp_fd_entry_ { u32 session_index; + u32 worker_index; u32 fd; u32 fd_index; u32 flags; + clib_spinlock_t lock; } ldp_fd_entry_t; typedef struct ldp_worker_ctx_ @@ -101,7 +103,7 @@ typedef struct ldp_worker_ctx_t *workers; int init; char app_name[LDP_APP_NAME_MAX]; - u32 sid_bit_val; + u32 sh_bit_val; u32 sid_bit_mask; u32 debug; ldp_fd_entry_t *fd_pool; @@ -119,7 +121,7 @@ typedef struct clib_warning ("ldp<%d>: " _fmt, getpid(), ##_args) static ldp_main_t ldp_main = { - .sid_bit_val = (1 << LDP_SID_BIT_MIN), + .sh_bit_val = (1 << LDP_SID_BIT_MIN), .sid_bit_mask = (1 << LDP_SID_BIT_MIN) - 1, .debug = LDP_DEBUG_INIT, }; @@ -154,44 +156,73 @@ ldp_get_app_name () return ldp->app_name; } +static inline vcl_session_handle_t +ldp_fd_entry_sh (ldp_fd_entry_t * fde) +{ + return vppcom_session_handle (fde->session_index); +} + static int -ldp_fd_alloc (u32 sid) +ldp_fd_alloc (vcl_session_handle_t sh) { ldp_fd_entry_t *fde; clib_rwlock_writer_lock (&ldp->fd_table_lock); - if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sid_bit_val) + if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sh_bit_val) { clib_rwlock_writer_unlock (&ldp->fd_table_lock); return -1; } pool_get (ldp->fd_pool, fde); - fde->session_index = vppcom_session_index (sid); + fde->session_index = vppcom_session_index (sh); + fde->worker_index = vppcom_session_worker (sh); fde->fd_index = fde - ldp->fd_pool; - fde->fd = fde->fd_index + ldp->sid_bit_val; + fde->fd = fde->fd_index + ldp->sh_bit_val; hash_set (ldp->session_index_to_fd_table, fde->session_index, fde->fd); + clib_spinlock_init (&fde->lock); clib_rwlock_writer_unlock (&ldp->fd_table_lock); return fde->fd; } static ldp_fd_entry_t * -ldp_fd_entry_get_w_lock (u32 fd_index) +ldp_fd_entry_get (u32 fd_index) { - clib_rwlock_reader_lock (&ldp->fd_table_lock); if (pool_is_free_index (ldp->fd_pool, fd_index)) return 0; - return pool_elt_at_index (ldp->fd_pool, fd_index); } +static ldp_fd_entry_t * +ldp_fd_entry_lock (u32 fd_index) +{ + ldp_fd_entry_t *fe; + clib_rwlock_reader_lock (&ldp->fd_table_lock); + if (pool_is_free_index (ldp->fd_pool, fd_index)) + { + clib_rwlock_reader_unlock (&ldp->fd_table_lock); + return 0; + } + + fe = pool_elt_at_index (ldp->fd_pool, fd_index); + clib_spinlock_lock (&fe->lock); + return fe; +} + +static void +ldp_fd_entry_unlock (ldp_fd_entry_t * fde) +{ + clib_spinlock_unlock (&fde->lock); + clib_rwlock_reader_unlock (&ldp->fd_table_lock); +} + static inline int -ldp_fd_from_sid (u32 sid) +ldp_fd_from_sh (vcl_session_handle_t sh) { uword *fdp; int fd; clib_rwlock_reader_lock (&ldp->fd_table_lock); - fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sid)); + fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sh)); fd = fdp ? *fdp : -EMFILE; clib_rwlock_reader_unlock (&ldp->fd_table_lock); @@ -199,52 +230,63 @@ ldp_fd_from_sid (u32 sid) } static inline int -ldp_fd_is_sid (int fd) +ldp_fd_is_sh (int fd) { - return fd >= ldp->sid_bit_val; + return fd >= ldp->sh_bit_val; } static inline u32 -ldp_sid_from_fd (int fd) +ldp_sh_from_fd (int fd) { u32 fd_index, session_index; ldp_fd_entry_t *fde; - if (!ldp_fd_is_sid (fd)) + if (!ldp_fd_is_sh (fd)) return INVALID_SESSION_ID; - fd_index = fd - ldp->sid_bit_val; - fde = ldp_fd_entry_get_w_lock (fd_index); + fd_index = fd - ldp->sh_bit_val; + fde = ldp_fd_entry_lock (fd_index); if (!fde) { LDBG (0, "unknown fd %d", fd); - clib_rwlock_reader_unlock (&ldp->fd_table_lock); return INVALID_SESSION_ID; } session_index = fde->session_index; - clib_rwlock_reader_unlock (&ldp->fd_table_lock); + ldp_fd_entry_unlock (fde); return vppcom_session_handle (session_index); } +static ldp_fd_entry_t * +ldp_fd_entry_lock_w_fd (int fd) +{ + u32 fd_index; + + if (!ldp_fd_is_sh (fd)) + return 0; + + fd_index = fd - ldp->sh_bit_val; + return ldp_fd_entry_lock (fd_index); +} + static void -ldp_fd_free_w_sid (u32 sid) +ldp_fd_free_w_sh (vcl_session_handle_t sh) { ldp_fd_entry_t *fde; u32 fd_index; int fd; - fd = ldp_fd_from_sid (sid); + fd = ldp_fd_from_sh (sh); if (!fd) return; - fd_index = fd - ldp->sid_bit_val; - fde = ldp_fd_entry_get_w_lock (fd_index); - if (fde) - { - hash_unset (ldp->session_index_to_fd_table, fde->session_index); - pool_put (ldp->fd_pool, fde); - } + fd_index = fd - ldp->sh_bit_val; + clib_rwlock_writer_lock (&ldp->fd_table_lock); + fde = ldp_fd_entry_get (fd_index); + ASSERT (fde != 0); + hash_unset (ldp->session_index_to_fd_table, fde->session_index); + clib_spinlock_free (&fde->lock); + pool_put (ldp->fd_pool, fde); clib_rwlock_writer_unlock (&ldp->fd_table_lock); } @@ -307,38 +349,38 @@ ldp_init (void) clib_warning ("LDP<%d>: WARNING: Invalid LDP sid bit specified in" " the env var " LDP_ENV_SID_BIT " (%s)! sid bit " "value %d (0x%x)", getpid (), env_var_str, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else if (sb < LDP_SID_BIT_MIN) { - ldp->sid_bit_val = (1 << LDP_SID_BIT_MIN); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << LDP_SID_BIT_MIN); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the" " env var " LDP_ENV_SID_BIT " (%s) is too small. " "Using LDP_SID_BIT_MIN (%d)! sid bit value %d (0x%x)", getpid (), sb, env_var_str, LDP_SID_BIT_MIN, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else if (sb > LDP_SID_BIT_MAX) { - ldp->sid_bit_val = (1 << LDP_SID_BIT_MAX); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << LDP_SID_BIT_MAX); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the" " env var " LDP_ENV_SID_BIT " (%s) is too big. Using" " LDP_SID_BIT_MAX (%d)! sid bit value %d (0x%x)", getpid (), sb, env_var_str, LDP_SID_BIT_MAX, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } else { - ldp->sid_bit_val = (1 << sb); - ldp->sid_bit_mask = ldp->sid_bit_val - 1; + ldp->sh_bit_val = (1 << sb); + ldp->sid_bit_mask = ldp->sh_bit_val - 1; LDBG (0, "configured LDP sid bit (%u) from " LDP_ENV_SID_BIT "! sid bit value %d (0x%x)", sb, - ldp->sid_bit_val, ldp->sid_bit_val); + ldp->sh_bit_val, ldp->sh_bit_val); } } @@ -352,17 +394,18 @@ ldp_init (void) int close (int fd) { - int rv, refcnt; - u32 sid = ldp_sid_from_fd (fd); + int rv, refcnt, epfd; + ldp_fd_entry_t *fde; + u32 sh; if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { - int epfd; - - epfd = vppcom_session_attr (sid, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0); + sh = ldp_fd_entry_sh (fde); + epfd = vppcom_session_attr (sh, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0); if (epfd > 0) { LDBG (0, "fd %d (0x%x): calling libc_close: epfd %u (0x%x)", @@ -374,7 +417,7 @@ close (int fd) u32 size = sizeof (epfd); epfd = 0; - (void) vppcom_session_attr (sid, VPPCOM_ATTR_SET_LIBC_EPFD, + (void) vppcom_session_attr (sh, VPPCOM_ATTR_SET_LIBC_EPFD, &epfd, &size); } } @@ -382,21 +425,24 @@ close (int fd) { errno = -epfd; rv = -1; + ldp_fd_entry_unlock (fde); goto done; } LDBG (0, "fd %d (0x%x): calling vppcom_session_close: sid %u (0x%x)", - fd, fd, sid, sid); + fd, fd, sh, sh); - refcnt = vppcom_session_attr (sid, VPPCOM_ATTR_GET_REFCNT, 0, 0); - rv = vppcom_session_close (sid); + refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0); + rv = vppcom_session_close (sh); if (rv != VPPCOM_OK) { errno = -rv; rv = -1; } + + ldp_fd_entry_unlock (fde); if (refcnt <= 1) - ldp_fd_free_w_sid (sid); + ldp_fd_free_w_sh (sh); } else { @@ -413,23 +459,27 @@ done: ssize_t read (int fd, void *buf, size_t nbytes) { + vcl_session_handle_t sh; + ldp_fd_entry_t *fde; ssize_t size; - u32 sid = ldp_sid_from_fd (fd); if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { + sh = ldp_fd_entry_sh (fde); LDBG (2, "fd %d (0x%x): calling vppcom_session_read(): sid %u (0x%x)," - " buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes); + " buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes); - size = vppcom_session_read (sid, buf, nbytes); + size = vppcom_session_read (sh, buf, nbytes); if (size < 0) { errno = -size; size = -1; } + ldp_fd_entry_unlock (fde); } else { @@ -447,7 +497,7 @@ ssize_t readv (int fd, const struct iovec * iov, int iovcnt) { ssize_t size = 0; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); int rv = 0, i, total = 0; if ((errno = -ldp_init ())) @@ -504,23 +554,27 @@ readv (int fd, const struct iovec * iov, int iovcnt) ssize_t write (int fd, const void *buf, size_t nbytes) { + vcl_session_handle_t sh; + ldp_fd_entry_t *fde; ssize_t size = 0; - u32 sid = ldp_sid_from_fd (fd); if ((errno = -ldp_init ())) return -1; - if (sid != INVALID_SESSION_ID) + fde = ldp_fd_entry_lock_w_fd (fd); + if (fde) { + sh = ldp_fd_entry_sh (fde); LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), " - "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes); + "buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes); - size = vppcom_session_write_msg (sid, (void *) buf, nbytes); + size = vppcom_session_write_msg (sh, (void *) buf, nbytes); if (size < 0) { errno = -size; size = -1; } + ldp_fd_entry_unlock (fde); } else { @@ -538,7 +592,7 @@ ssize_t writev (int fd, const struct iovec * iov, int iovcnt) { ssize_t size = 0, total = 0; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); int i, rv = 0; /* @@ -590,7 +644,7 @@ fcntl (int fd, int cmd, ...) const char *func_str = __func__; int rv = 0; va_list ap; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -681,7 +735,7 @@ ioctl (int fd, unsigned long int cmd, ...) const char *func_str; int rv; va_list ap; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -808,7 +862,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, time_out = -1; - if (nfds <= ldp->sid_bit_val) + if (nfds <= ldp->sh_bit_val) { func_str = "libc_pselect"; @@ -821,11 +875,11 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, goto done; } - if (PREDICT_FALSE (ldp->sid_bit_val > FD_SETSIZE / 2)) + if (PREDICT_FALSE (ldp->sh_bit_val > FD_SETSIZE / 2)) { clib_warning ("LDP<%d>: ERROR: LDP sid bit value %d (0x%x) > " "FD_SETSIZE/2 %d (0x%x)!", getpid (), - ldp->sid_bit_val, ldp->sid_bit_val, + ldp->sh_bit_val, ldp->sh_bit_val, FD_SETSIZE / 2, FD_SETSIZE / 2); errno = EOVERFLOW; return -1; @@ -845,7 +899,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->rd_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "readfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_rd_bitmap, fd, 1); @@ -877,7 +931,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->wr_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "writefds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_wr_bitmap, fd, 1); @@ -909,7 +963,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, clib_bitmap_foreach (fd, ldpw->ex_bitmap, ({ if (fd > nfds) break; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (3, "exceptfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid); if (sid == INVALID_SESSION_ID) clib_bitmap_set_no_check (ldpw->libc_ex_bitmap, fd, 1); @@ -977,7 +1031,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->rd_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -993,7 +1047,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->wr_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -1009,7 +1063,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds, /* *INDENT-OFF* */ clib_bitmap_foreach (sid, ldpw->ex_bitmap, ({ - fd = ldp_fd_from_sid (vppcom_session_handle (sid)); + fd = ldp_fd_from_sh (vppcom_session_handle (sid)); if (PREDICT_FALSE (fd < 0)) { errno = EBADFD; @@ -1237,7 +1291,7 @@ int bind (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1361,7 +1415,7 @@ getsockname (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len) { int rv; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1430,7 +1484,7 @@ int connect (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1516,7 +1570,7 @@ getpeername (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len) { int rv; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1586,7 +1640,7 @@ send (int fd, const void *buf, size_t n, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -1644,7 +1698,7 @@ sendfile (int out_fd, int in_fd, off_t * offset, size_t len) ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); ssize_t size = 0; const char *func_str; - u32 sid = ldp_sid_from_fd (out_fd); + u32 sid = ldp_sh_from_fd (out_fd); if ((errno = -ldp_init ())) return -1; @@ -1888,7 +1942,7 @@ recv (int fd, void *buf, size_t n, int flags) if ((errno = -ldp_init ())) return -1; - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); if (sid != INVALID_SESSION_ID) { LDBG (2, "fd %d (0x%x): calling vcl recvfrom: sid %u (0x%x), buf %p," @@ -1915,7 +1969,7 @@ sendto (int fd, const void *buf, size_t n, int flags, { ssize_t size; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2006,7 +2060,7 @@ recvfrom (int fd, void *__restrict buf, size_t n, int flags, { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2076,7 +2130,7 @@ sendmsg (int fd, const struct msghdr * message, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2125,7 +2179,7 @@ sendmmsg (int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2172,7 +2226,7 @@ recvmsg (int fd, struct msghdr * message, int flags) { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2222,7 +2276,7 @@ recvmmsg (int fd, struct mmsghdr *vmessages, { ssize_t size; const char *func_str; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2271,7 +2325,7 @@ getsockopt (int fd, int level, int optname, { int rv; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); u32 buflen = optlen ? (u32) * optlen : 0; if ((errno = -ldp_init ())) @@ -2499,7 +2553,7 @@ setsockopt (int fd, int level, int optname, { int rv; const char *func_str = __func__; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2663,7 +2717,7 @@ int listen (int fd, int n) { int rv; - u32 sid = ldp_sid_from_fd (fd); + u32 sid = ldp_sh_from_fd (fd); if ((errno = -ldp_init ())) return -1; @@ -2696,13 +2750,14 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, socklen_t * __restrict addr_len, int flags) { int rv; - u32 listen_sid = ldp_sid_from_fd (listen_fd); - int accept_sid; + u32 listen_sh; + int accept_sh; if ((errno = -ldp_init ())) return -1; - if (listen_sid != INVALID_SESSION_ID) + listen_sh = ldp_sh_from_fd (listen_fd); + if (listen_sh != INVALID_SESSION_ID) { vppcom_endpt_t ep; u8 src_addr[sizeof (struct sockaddr_in6)]; @@ -2711,12 +2766,12 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, LDBG (0, "listen fd %d (0x%x): calling vppcom_session_accept:" " listen sid %u (0x%x), ep %p, flags 0x%x", listen_fd, - listen_fd, listen_sid, listen_sid, ep, flags); + listen_fd, listen_sh, listen_sh, ep, flags); - accept_sid = vppcom_session_accept (listen_sid, &ep, flags); - if (accept_sid < 0) + accept_sh = vppcom_session_accept (listen_sh, &ep, flags); + if (accept_sh < 0) { - errno = -accept_sid; + errno = -accept_sh; rv = -1; } else @@ -2724,16 +2779,16 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr, rv = ldp_copy_ep_to_sockaddr (addr, addr_len, &ep); if (rv != VPPCOM_OK) { - (void) vppcom_session_close ((u32) accept_sid); + (void) vppcom_session_close ((u32) accept_sh); errno = -rv; rv = -1; } else { - rv = ldp_fd_alloc ((u32) accept_sid); + rv = ldp_fd_alloc ((u32) accept_sh); if (rv < 0) { - (void) vppcom_session_close ((u32) accept_sid); + (void) vppcom_session_close ((u32) accept_sh); errno = -rv; rv = -1; } @@ -2776,15 +2831,14 @@ shutdown (int fd, int how) if ((errno = -ldp_init ())) return -1; - if (ldp_fd_is_sid (fd)) + if (ldp_fd_is_sh (fd)) { - u32 fd_index = fd - ldp->sid_bit_val; + u32 fd_index = fd - ldp->sh_bit_val; ldp_fd_entry_t *fde; - fde = ldp_fd_entry_get_w_lock (fd_index); + fde = ldp_fd_entry_lock (fd_index); if (!fde) { - clib_rwlock_reader_unlock (&ldp->fd_table_lock); errno = ENOTCONN; return -1; } @@ -2799,7 +2853,7 @@ shutdown (int fd, int how) if ((fde->flags & LDP_F_SHUT_RD) && (fde->flags & LDP_F_SHUT_WR)) rv = close (fd); - clib_rwlock_reader_unlock (&ldp->fd_table_lock); + ldp_fd_entry_unlock (fde); LDBG (0, "fd %d (0x%x): calling vcl shutdown: how %d", fd, fd, how); } else @@ -2869,7 +2923,7 @@ epoll_create (int size) int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) { - u32 vep_idx = ldp_sid_from_fd (epfd), sid; + u32 vep_idx = ldp_sh_from_fd (epfd), sid; const char *func_str; int rv; @@ -2892,7 +2946,7 @@ epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) goto done; } - sid = ldp_sid_from_fd (fd); + sid = ldp_sh_from_fd (fd); LDBG (0, "epfd %d (0x%x), vep_idx %d (0x%x), sid %d (0x%x)", epfd, epfd, vep_idx, vep_idx, sid, sid); @@ -2995,7 +3049,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events, int maxevents, { ldp_worker_ctx_t *ldpw = ldp_worker_get_current (); double time_to_wait = (double) 0, time_out, now = 0; - u32 vep_idx = ldp_sid_from_fd (epfd); + u32 vep_idx = ldp_sh_from_fd (epfd); int libc_epfd, rv = 0; if ((errno = -ldp_init ())) @@ -3115,7 +3169,7 @@ poll (struct pollfd *fds, nfds_t nfds, int timeout) LDBG (3, "fds[%d] fd %d (0x%0x) events = 0x%x revents = 0x%x", i, fds[i].fd, fds[i].fd, fds[i].events, fds[i].revents); - sid = ldp_sid_from_fd (fds[i].fd); + sid = ldp_sh_from_fd (fds[i].fd); if (sid != INVALID_SESSION_ID) { fds[i].fd = -fds[i].fd; diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c index de5e80a55b3..5b9a9d5d3ce 100644 --- a/src/vcl/vcl_bapi.c +++ b/src/vcl/vcl_bapi.c @@ -610,21 +610,6 @@ vppcom_send_unbind_sock (u64 vpp_handle) } void -vppcom_send_accept_session_reply (u64 handle, u32 context, int retval) -{ - vcl_worker_t *wrk = vcl_worker_get_current (); - vl_api_accept_session_reply_t *rmp; - - rmp = vl_msg_api_alloc (sizeof (*rmp)); - memset (rmp, 0, sizeof (*rmp)); - rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); - rmp->retval = htonl (retval); - rmp->context = context; - rmp->handle = handle; - vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp); -} - -void vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert, u32 cert_len) { diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c index d82a7ff3355..32664312f01 100644 --- a/src/vcl/vcl_private.c +++ b/src/vcl/vcl_private.c @@ -375,21 +375,23 @@ vcl_worker_share_session (vcl_worker_t * parent, vcl_worker_t * wrk, vcl_session_t * new_s) { vcl_shared_session_t *ss; - vcl_session_t *s; + vcl_session_t *old_s; - s = vcl_session_get (parent, new_s->session_index); - if (s->shared_index == ~0) + if (new_s->shared_index == ~0) { ss = vcl_shared_session_alloc (); + ss->session_index = new_s->session_index; vec_add1 (ss->workers, parent->wrk_index); - s->shared_index = ss->ss_index; + vec_add1 (ss->workers, wrk->wrk_index); + new_s->shared_index = ss->ss_index; + old_s = vcl_session_get (parent, new_s->session_index); + old_s->shared_index = ss->ss_index; } else { - ss = vcl_shared_session_get (s->shared_index); + ss = vcl_shared_session_get (new_s->shared_index); + vec_add1 (ss->workers, wrk->wrk_index); } - new_s->shared_index = ss->ss_index; - vec_add1 (ss->workers, wrk->wrk_index); } int @@ -414,6 +416,12 @@ vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s) return 1; } + /* If the first removed and not last, start session worker change. + * First request goes to vpp and vpp reflects it back to the right + * worker */ + if (i == 0) + vcl_send_session_worker_update (wrk, s, ss->workers[0]); + return 0; } diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h index 9dce5182193..2ae4b72f6a4 100644 --- a/src/vcl/vcl_private.h +++ b/src/vcl/vcl_private.h @@ -69,7 +69,8 @@ typedef enum STATE_ACCEPT = 0x08, STATE_VPP_CLOSING = 0x10, STATE_DISCONNECT = 0x20, - STATE_FAILED = 0x40 + STATE_FAILED = 0x40, + STATE_UPDATED = 0x80, } session_state_t; #define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING) @@ -144,6 +145,7 @@ typedef struct vcl_shared_session_ { u32 ss_index; u32 *workers; + u32 session_index; } vcl_shared_session_t; typedef struct @@ -287,6 +289,8 @@ typedef struct vcl_worker_ /** Vector of unhandled events */ session_event_t *unhandled_evts_vector; + u32 *pending_session_wrk_updates; + /** Used also as a thread stop key buffer */ pthread_t thread_id; @@ -517,6 +521,7 @@ int vcl_worker_register_with_vpp (void); int vcl_worker_set_bapi (void); void vcl_worker_share_sessions (vcl_worker_t * parent_wrk); int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s); +vcl_shared_session_t *vcl_shared_session_get (u32 ss_index); int vcl_session_get_refcnt (vcl_session_t * s); void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index); @@ -543,6 +548,17 @@ vcl_worker_get_current (void) return vcl_worker_get (vcl_get_worker_index ()); } +static inline svm_msg_q_t * +vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) +{ + if (vcl_session_is_ct (s)) + return wrk->vpp_event_queues[0]; + else + return wrk->vpp_event_queues[s->vpp_thread_index]; +} + +void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, + u32 wrk_index); /* * VCL Binary API */ @@ -556,12 +572,10 @@ void vppcom_send_disconnect_session (u64 vpp_handle); void vppcom_send_bind_sock (vcl_session_t * session); void vppcom_send_unbind_sock (u64 vpp_handle); void vppcom_api_hookup (void); -void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv); void vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert, u32 cert_len); -void -vppcom_send_application_tls_key_add (vcl_session_t * session, char *key, - u32 key_len); +void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key, + u32 key_len); void vcl_send_app_worker_add_del (u8 is_add); void vcl_send_child_worker_del (vcl_worker_t * wrk); diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 70afdce7832..6a1bf1cfcc7 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -44,6 +44,22 @@ vcl_wait_for_segment (u64 segment_handle) return 1; } +static inline int +vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq) +{ + svm_msg_q_msg_t *msg; + u32 n_msgs; + int i; + + n_msgs = svm_msg_q_size (mq); + for (i = 0; i < n_msgs; i++) + { + vec_add2 (wrk->mq_msg_vector, msg, 1); + svm_msg_q_sub_w_lock (mq, msg); + } + return n_msgs; +} + const char * vppcom_session_state_str (session_state_t state) { @@ -175,15 +191,6 @@ format_ip46_address (u8 * s, va_list * args) */ -static svm_msg_q_t * -vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s) -{ - if (vcl_session_is_ct (s)) - return wrk->vpp_event_queues[0]; - else - return wrk->vpp_event_queues[s->vpp_thread_index]; -} - static void vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context, session_handle_t handle, int retval) @@ -227,6 +234,24 @@ vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context, app_send_ctrl_evt_to_vpp (mq, app_evt); } +void +vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s, + u32 wrk_index) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_worker_update_msg_t *mp; + svm_msg_q_t *mq; + + mq = vcl_session_vpp_evt_q (wrk, s); + app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE); + mp = (session_worker_update_msg_t *) app_evt->evt->data; + mp->client_index = wrk->my_client_index; + mp->handle = s->vpp_handle; + mp->req_wrk_index = wrk->vpp_wrk_index; + mp->wrk_index = wrk_index; + app_send_ctrl_evt_to_vpp (mq, app_evt); +} + static u32 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp) { @@ -540,7 +565,6 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk, /* Caught a disconnect before actually accepting the session */ if (session->session_state == STATE_LISTEN) { - if (!vcl_flag_accepted_session (session, msg->handle, VCL_ACCEPTED_F_CLOSED)) VDBG (0, "session was not accepted!"); @@ -551,6 +575,59 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk, return session; } +static void +vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data) +{ + session_req_worker_update_msg_t *msg; + vcl_session_t *s; + + msg = (session_req_worker_update_msg_t *) data; + s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle); + if (!s) + return; + + vec_add1 (wrk->pending_session_wrk_updates, s->session_index); +} + +static void +vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data) +{ + session_worker_update_reply_msg_t *msg; + vcl_session_t *s; + + msg = (session_worker_update_reply_msg_t *) data; + s = vcl_session_get_w_vpp_handle (wrk, msg->handle); + if (!s) + { + VDBG (0, "unknown handle 0x%llx", msg->handle); + return; + } + if (vcl_wait_for_segment (msg->segment_handle)) + { + clib_warning ("segment for session %u couldn't be mounted!", + s->session_index); + return; + } + s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *); + s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *); + + s->rx_fifo->client_session_index = s->session_index; + s->tx_fifo->client_session_index = s->session_index; + s->rx_fifo->client_thread_index = wrk->wrk_index; + s->tx_fifo->client_thread_index = wrk->wrk_index; + s->session_state = STATE_UPDATED; + + if (s->shared_index != VCL_INVALID_SESSION_INDEX) + { + vcl_shared_session_t *ss; + ss = vcl_shared_session_get (s->shared_index); + if (vec_len (ss->workers) > 1) + VDBG (0, "workers need to be updated"); + } + VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index, + s->vpp_handle, wrk->wrk_index); +} + static int vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) { @@ -587,13 +664,19 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e) case SESSION_CTRL_EVT_BOUND: vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data); break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; default: clib_warning ("unhandled %u", e->event_type); } return VPPCOM_OK; } -static inline int +static int vppcom_wait_for_session_state_change (u32 session_index, session_state_t state, f64 wait_for_time) @@ -638,6 +721,52 @@ vppcom_wait_for_session_state_change (u32 session_index, return VPPCOM_ETIMEDOUT; } +static void +vcl_handle_pending_wrk_updates (vcl_worker_t * wrk) +{ + session_state_t state; + vcl_session_t *s; + u32 *sip; + + if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0)) + return; + + vec_foreach (sip, wrk->pending_session_wrk_updates) + { + s = vcl_session_get (wrk, *sip); + vcl_send_session_worker_update (wrk, s, wrk->wrk_index); + state = s->session_state; + vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5); + s->session_state = state; + } + vec_reset_length (wrk->pending_session_wrk_updates); +} + +static void +vcl_flush_mq_events (void) +{ + vcl_worker_t *wrk = vcl_worker_get_current (); + svm_msg_q_msg_t *msg; + session_event_t *e; + svm_msg_q_t *mq; + int i; + + mq = wrk->app_event_queue; + svm_msg_q_lock (mq); + vcl_mq_dequeue_batch (wrk, mq); + svm_msg_q_unlock (mq); + + for (i = 0; i < vec_len (wrk->mq_msg_vector); i++) + { + msg = vec_elt_at_index (wrk->mq_msg_vector, i); + e = svm_msg_q_msg_data (mq, msg); + vcl_handle_mq_event (wrk, e); + svm_msg_q_free_msg (mq, msg); + } + vec_reset_length (wrk->mq_msg_vector); + vcl_handle_pending_wrk_updates (wrk); +} + static int vppcom_app_session_enable (void) { @@ -845,13 +974,14 @@ static void vcl_app_pre_fork (void) { vcl_incercept_sigchld (); + vcl_flush_mq_events (); } static void vcl_app_fork_child_handler (void) { + vcl_worker_t *parent_wrk, *wrk; int rv, parent_wrk_index; - vcl_worker_t *parent_wrk; u8 *child_name; parent_wrk_index = vcl_get_worker_index (); @@ -884,6 +1014,8 @@ vcl_app_fork_child_handler (void) */ vcl_worker_register_with_vpp (); parent_wrk = vcl_worker_get (parent_wrk_index); + wrk = vcl_worker_get_current (); + wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues); vcl_worker_share_sessions (parent_wrk); parent_wrk->forked_child = vcl_get_worker_index (); @@ -1097,7 +1229,11 @@ vppcom_session_close (uint32_t session_handle) } if (!do_disconnect) - goto cleanup; + { + VDBG (0, "session handle %u [0x%llx] disconnect skipped", + session_handle, vpp_handle); + goto cleanup; + } if (state & STATE_LISTEN) { @@ -1143,10 +1279,7 @@ cleanup: vcl_ct_registration_unlock (wrk); } - if (vpp_handle != ~0) - { - vcl_session_table_del_vpp_handle (wrk, vpp_handle); - } + vcl_session_table_del_vpp_handle (wrk, vpp_handle); vcl_session_free (wrk, session); VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle); @@ -1948,22 +2081,6 @@ vppcom_session_write_ready (vcl_session_t * session) return svm_fifo_max_enqueue (session->tx_fifo); } -static inline int -vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq) -{ - svm_msg_q_msg_t *msg; - u32 n_msgs; - int i; - - n_msgs = svm_msg_q_size (mq); - for (i = 0; i < n_msgs; i++) - { - vec_add2 (wrk->mq_msg_vector, msg, 1); - svm_msg_q_sub_w_lock (mq, msg); - } - return n_msgs; -} - #define vcl_fifo_rx_evt_valid_or_break(_fifo) \ if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \ { \ @@ -2067,6 +2184,12 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, *bits_set += 1; } break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; default: clib_warning ("unhandled: %u", e->event_type); break; @@ -2122,6 +2245,7 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq, svm_msg_q_free_msg (mq, msg); } vec_reset_length (wrk->mq_msg_vector); + vcl_handle_pending_wrk_updates (wrk); return *bits_set; } @@ -2676,6 +2800,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e, session_evt_data = session->vep.ev.data.u64; session_events = session->vep.ev.events; break; + case SESSION_CTRL_EVT_REQ_WORKER_UPDATE: + vcl_session_req_worker_update_handler (wrk, e->data); + break; + case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY: + vcl_session_worker_update_reply_handler (wrk, e->data); + break; default: VDBG (0, "unhandled: %u", e->event_type); break; @@ -2741,7 +2871,7 @@ handle_dequeued: svm_msg_q_free_msg (mq, msg); } vec_reset_length (wrk->mq_msg_vector); - + vcl_handle_pending_wrk_updates (wrk); return *num_ev; } @@ -3580,12 +3710,18 @@ vppcom_mq_epoll_fd (void) } int -vppcom_session_index (uint32_t session_handle) +vppcom_session_index (vcl_session_handle_t session_handle) { return session_handle & 0xFFFFFF; } int +vppcom_session_worker (vcl_session_handle_t session_handle) +{ + return session_handle >> 24; +} + +int vppcom_session_handle (uint32_t session_index) { return (vcl_get_worker_index () << 24) | session_index; diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h index 641946be55a..f2fca09b512 100644 --- a/src/vcl/vppcom.h +++ b/src/vcl/vppcom.h @@ -97,6 +97,8 @@ typedef struct vppcom_endpt_t_ uint16_t port; } vppcom_endpt_t; +typedef uint32_t vcl_session_handle_t; + typedef enum { VPPCOM_OK = 0, @@ -277,7 +279,8 @@ extern int vppcom_session_sendto (uint32_t session_handle, void *buffer, extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time); extern int vppcom_mq_epoll_fd (void); -extern int vppcom_session_index (uint32_t session_handle); +extern int vppcom_session_index (vcl_session_handle_t session_handle); +extern int vppcom_session_worker (vcl_session_handle_t session_handle); extern int vppcom_session_handle (uint32_t session_index); extern int vppcom_session_read_segments (uint32_t session_handle, diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 19c8fa2f2e0..85b5f939427 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle) return 0; } +int +app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s) +{ + segment_manager_t *sm; + svm_fifo_t *rxf, *txf; + + s->app_wrk_index = app_wrk->wrk_index; + + rxf = s->server_rx_fifo; + txf = s->server_tx_fifo; + + if (!rxf || !txf) + return 0; + + s->server_rx_fifo = 0; + s->server_tx_fifo = 0; + + sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk); + if (session_alloc_fifos (sm, s)) + return -1; + + if (!svm_fifo_is_empty (rxf)) + { + clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems); + s->server_rx_fifo->head = rxf->head; + s->server_rx_fifo->tail = rxf->tail; + s->server_rx_fifo->cursize = rxf->cursize; + } + + if (!svm_fifo_is_empty (txf)) + { + clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems); + s->server_tx_fifo->head = txf->head; + s->server_tx_fifo->tail = txf->tail; + s->server_tx_fifo->cursize = txf->cursize; + } + + segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf); + + return 0; +} + /** * Start listening local transport endpoint for requested transport. * @@ -890,6 +932,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app) } segment_manager_t * +app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk) +{ + if (app_wrk->connects_seg_manager == (u32) ~ 0) + app_worker_alloc_connects_segment_manager (app_wrk); + return segment_manager_get (app_wrk->connects_seg_manager); +} + +segment_manager_t * app_worker_get_listen_segment_manager (app_worker_t * app, stream_session_t * listener) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index e33f2ff797e..1d2064df62e 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk); app_worker_t *app_worker_get (u32 wrk_index); app_worker_t *app_worker_get_if_valid (u32 wrk_index); application_t *app_worker_get_app (u32 wrk_index); +int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s); void app_worker_free (app_worker_t * app_wrk); int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep, u32 api_context); segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *, stream_session_t *); segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *); +segment_manager_t + * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *); int app_worker_alloc_connects_segment_manager (app_worker_t * app); int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle); u32 app_worker_n_listeners (app_worker_t * app); diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index a156c82a745..9c48faa8abc 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -220,9 +220,9 @@ typedef struct session_bound_msg_ u8 lcl_is_ip4; u8 lcl_ip[16]; u16 lcl_port; - u64 rx_fifo; - u64 tx_fifo; - u64 vpp_evt_q; + uword rx_fifo; + uword tx_fifo; + uword vpp_evt_q; u32 segment_size; u8 segment_name_length; u8 segment_name[128]; @@ -233,12 +233,12 @@ typedef struct session_accepted_msg_ u32 context; u64 listener_handle; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 server_event_queue_address; - u64 client_event_queue_address; + uword vpp_event_queue_address; + uword server_event_queue_address; + uword client_event_queue_address; u16 port; u8 is_ip4; u8 ip[16]; @@ -260,12 +260,12 @@ typedef struct session_connected_msg_ u32 context; i32 retval; u64 handle; - u64 server_rx_fifo; - u64 server_tx_fifo; + uword server_rx_fifo; + uword server_tx_fifo; u64 segment_handle; - u64 vpp_event_queue_address; - u64 client_event_queue_address; - u64 server_event_queue_address; + uword vpp_event_queue_address; + uword client_event_queue_address; + uword server_event_queue_address; u32 segment_size; u8 segment_name_length; u8 segment_name[64]; @@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_ u64 handle; } __clib_packed session_reset_reply_msg_t; +typedef struct session_req_worker_update_msg_ +{ + u64 session_handle; +} __clib_packed session_req_worker_update_msg_t; + +/* NOTE: using u16 for wrk indices because message needs to fit in 18B */ +typedef struct session_worker_update_msg_ +{ + u32 client_index; + u16 wrk_index; + u16 req_wrk_index; + u64 handle; +} __clib_packed session_worker_update_msg_t; + +typedef struct session_worker_update_reply_msg_ +{ + u64 handle; + uword rx_fifo; + uword tx_fifo; + u64 segment_handle; +} __clib_packed session_worker_update_reply_msg_t; + typedef struct app_session_event_ { svm_msg_q_msg_t msg; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index e3c73000edf..cf1b3e99f4d 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -49,7 +49,10 @@ typedef enum SESSION_CTRL_EVT_DISCONNECTED, SESSION_CTRL_EVT_DISCONNECTED_REPLY, SESSION_CTRL_EVT_RESET, - SESSION_CTRL_EVT_RESET_REPLY + SESSION_CTRL_EVT_RESET_REPLY, + SESSION_CTRL_EVT_REQ_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE, + SESSION_CTRL_EVT_WORKER_UPDATE_REPLY, } session_evt_type_t; static inline const char * diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index 98965f334af..880f16388b8 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data) svm_msg_q_unlock (app_wrk->event_queue); evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); clib_memset (evt, 0, sizeof (*evt)); - evt->event_type = SESSION_CTRL_EVT_DISCONNECTED; + evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY; rmp = (session_disconnected_reply_msg_t *) evt->data; rmp->handle = mp->handle; rmp->context = mp->context; @@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data) } } +static void +session_mq_worker_update_handler (void *data) +{ + session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data; + session_worker_update_reply_msg_t *rmp; + svm_msg_q_msg_t _msg, *msg = &_msg; + app_worker_t *app_wrk; + u32 owner_app_wrk_map; + session_event_t *evt; + stream_session_t *s; + application_t *app; + + app = application_lookup (mp->client_index); + if (!app) + return; + if (!(s = session_get_from_handle_if_valid (mp->handle))) + { + clib_warning ("invalid handle %llu", mp->handle); + return; + } + app_wrk = app_worker_get (s->app_wrk_index); + if (app_wrk->app_index != app->app_index) + { + clib_warning ("app %u does not own session %llu", app->app_index, + mp->handle); + return; + } + owner_app_wrk_map = app_wrk->wrk_map_index; + app_wrk = application_get_worker (app, mp->wrk_index); + + /* This needs to come from the new owner */ + if (mp->req_wrk_index == owner_app_wrk_map) + { + session_req_worker_update_msg_t *wump; + + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE; + wump = (session_req_worker_update_msg_t *) evt->data; + wump->session_handle = mp->handle; + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + return; + } + + app_worker_own_session (app_wrk, s); + + /* + * Send reply + */ + svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue, + SESSION_MQ_CTRL_EVT_RING, + SVM_Q_WAIT, msg); + svm_msg_q_unlock (app_wrk->event_queue); + evt = svm_msg_q_msg_data (app_wrk->event_queue, msg); + clib_memset (evt, 0, sizeof (*evt)); + evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY; + rmp = (session_worker_update_reply_msg_t *) evt->data; + rmp->handle = mp->handle; + rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo); + rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo); + rmp->segment_handle = session_segment_handle (s); + svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT); + + /* + * Retransmit messages that may have been lost + */ + if (!svm_fifo_is_empty (s->server_tx_fifo)) + session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX); + + if (!svm_fifo_is_empty (s->server_rx_fifo)) + app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX); + + if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING) + app->cb_fns.session_disconnect_callback (s); +} + vlib_node_registration_t session_queue_node; typedef struct @@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, case SESSION_CTRL_EVT_RESET_REPLY: session_mq_reset_reply_handler (e->data); break; + case SESSION_CTRL_EVT_WORKER_UPDATE: + session_mq_worker_update_handler (e->data); + break; default: clib_warning ("unhandled event type %d", e->event_type); } |