diff options
Diffstat (limited to 'src/uri')
-rw-r--r-- | src/uri/sock_test_client.c | 2 | ||||
-rw-r--r-- | src/uri/sock_test_server.c | 138 | ||||
-rw-r--r-- | src/uri/vppcom.c | 750 | ||||
-rw-r--r-- | src/uri/vppcom.h | 8 |
4 files changed, 813 insertions, 85 deletions
diff --git a/src/uri/sock_test_client.c b/src/uri/sock_test_client.c index 151c90b2960..b53f8f2230f 100644 --- a/src/uri/sock_test_client.c +++ b/src/uri/sock_test_client.c @@ -505,7 +505,7 @@ sock_test_connect_test_sockets (uint32_t num_test_sockets) #ifdef VCL_TEST tsock->fd = vppcom_session_create (VPPCOM_VRF_DEFAULT, VPPCOM_PROTO_TCP, - 0 /* is_nonblocking */ ); + 1 /* is_nonblocking */ ); if (tsock->fd < 0) { errno = -tsock->fd; diff --git a/src/uri/sock_test_server.c b/src/uri/sock_test_server.c index 35046aa061b..e5b5a56e8d6 100644 --- a/src/uri/sock_test_server.c +++ b/src/uri/sock_test_server.c @@ -23,6 +23,12 @@ #include <ctype.h> #include <uri/sock_test.h> +#define SOCK_SERVER_USE_EPOLL 1 + +#if SOCK_SERVER_USE_EPOLL +#include <sys/epoll.h> +#endif + typedef struct { uint8_t is_alloc; @@ -38,9 +44,15 @@ typedef struct } sock_server_conn_t; #define SOCK_SERVER_MAX_TEST_CONN 10 +#define SOCK_SERVER_MAX_EPOLL_EVENTS 10 typedef struct { int listen_fd; +#if SOCK_SERVER_USE_EPOLL + int epfd; + struct epoll_event listen_ev; + struct epoll_event wait_events[SOCK_SERVER_MAX_EPOLL_EVENTS]; +#endif size_t num_conn; size_t conn_pool_size; sock_server_conn_t *conn_pool; @@ -52,6 +64,7 @@ typedef struct sock_server_main_t sock_server_main; +#if ! SOCK_SERVER_USE_EPOLL static inline int get_nfds (void) { @@ -83,6 +96,7 @@ conn_fdset_clr (sock_server_conn_t * conn, fd_set * fdset) FD_CLR (conn->fd, fdset); ssm->nfds = get_nfds (); } +#endif static inline void conn_pool_expand (size_t expand_size) @@ -141,10 +155,12 @@ conn_pool_alloc (void) static inline void conn_pool_free (sock_server_conn_t * conn) { +#if ! SOCK_SERVER_USE_EPOLL sock_server_main_t *ssm = &sock_server_main; conn_fdset_clr (conn, &ssm->rd_fdset); conn_fdset_clr (conn, &ssm->wr_fdset); +#endif conn->fd = 0; conn->is_alloc = 0; } @@ -274,6 +290,8 @@ new_client (void) #ifdef VCL_TEST client_fd = vppcom_session_accept (ssm->listen_fd, &conn->endpt, -1.0 /* wait forever */ ); + if (client_fd < 0) + errno = -client_fd; #else client_fd = accept (ssm->listen_fd, (struct sockaddr *) NULL, NULL); #endif @@ -281,7 +299,7 @@ new_client (void) { int errno_val; errno_val = errno; - perror ("ERROR in main()"); + perror ("ERROR in new_client()"); fprintf (stderr, "ERROR: accept failed (errno = %d)!\n", errno_val); } @@ -289,7 +307,36 @@ new_client (void) client_fd, client_fd); conn->fd = client_fd; + +#if ! SOCK_SERVER_USE_EPOLL conn_fdset_set (conn, &ssm->rd_fdset); + ssm->nfds++; +#else + { + struct epoll_event ev; + int rv; + + ev.events = EPOLLIN; + ev.data.u64 = conn - ssm->conn_pool; +#ifdef VCL_TEST + rv = vppcom_epoll_ctl (ssm->epfd, EPOLL_CTL_ADD, client_fd, &ev); + if (rv) + errno = -rv; +#else + rv = epoll_ctl (ssm->epfd, EPOLL_CTL_ADD, client_fd, &ev); +#endif + if (rv < 0) + { + int errno_val; + errno_val = errno; + perror ("ERROR in new_client()"); + fprintf (stderr, "ERROR: epoll_ctl failed (errno = %d)!\n", + errno_val); + } + else + ssm->nfds++; + } +#endif } int @@ -306,12 +353,16 @@ main (int argc, char **argv) int errno_val; int v, i; uint16_t port = SOCK_TEST_SERVER_PORT; +#if ! SOCK_SERVER_USE_EPOLL fd_set _rfdset, *rfdset = &_rfdset; +#endif #ifdef VCL_TEST vppcom_endpt_t endpt; #else +#if ! SOCK_SERVER_USE_EPOLL fd_set _wfdset, *wfdset = &_wfdset; #endif +#endif if ((argc == 2) && (sscanf (argv[1], "%d", &v) == 1)) port = (uint16_t) v; @@ -390,16 +441,55 @@ main (int argc, char **argv) return rv; } + printf ("\nSERVER: Waiting for a client to connect on port %d...\n", port); + +#if ! SOCK_SERVER_USE_EPOLL + FD_ZERO (&ssm->wr_fdset); FD_ZERO (&ssm->rd_fdset); FD_SET (ssm->listen_fd, &ssm->rd_fdset); ssm->nfds = ssm->listen_fd + 1; - printf ("\nSERVER: Waiting for a client to connect on port %d...\n", port); +#else +#ifdef VCL_TEST + ssm->epfd = vppcom_epoll_create (); + if (ssm->epfd < 0) + errno = -ssm->epfd; +#else + ssm->epfd = epoll_create (1); +#endif + if (ssm->epfd < 0) + { + errno_val = errno; + perror ("ERROR in main()"); + fprintf (stderr, "ERROR: epoll_create failed (errno = %d)!\n", + errno_val); + return ssm->epfd; + } + + ssm->listen_ev.events = EPOLLIN; + ssm->listen_ev.data.u32 = ~0; +#ifdef VCL_TEST + rv = vppcom_epoll_ctl (ssm->epfd, EPOLL_CTL_ADD, ssm->listen_fd, + &ssm->listen_ev); + if (rv < 0) + errno = -rv; +#else + rv = epoll_ctl (ssm->epfd, EPOLL_CTL_ADD, ssm->listen_fd, &ssm->listen_ev); +#endif + if (rv < 0) + { + errno_val = errno; + perror ("ERROR in main()"); + fprintf (stderr, "ERROR: epoll_ctl failed (errno = %d)!\n", errno_val); + return rv; + } +#endif while (1) { +#if ! SOCK_SERVER_USE_EPOLL _rfdset = ssm->rd_fdset; #ifdef VCL_TEST @@ -431,9 +521,45 @@ main (int argc, char **argv) continue; conn = &ssm->conn_pool[i]; +#else + int num_ev; +#ifdef VCL_TEST + num_ev = vppcom_epoll_wait (ssm->epfd, ssm->wait_events, + SOCK_SERVER_MAX_EPOLL_EVENTS, 60.0); + if (rv < 0) + errno = -rv; +#else + num_ev = epoll_wait (ssm->epfd, ssm->wait_events, + SOCK_SERVER_MAX_EPOLL_EVENTS, 60000); +#endif + if (num_ev < 0) + { + perror ("epoll_wait()"); + fprintf (stderr, "\nERROR: epoll_wait() failed -- aborting!\n"); + main_rv = -1; + goto done; + } + if (num_ev == 0) + { + fprintf (stderr, "\nepoll_wait() timeout!\n"); + continue; + } + for (i = 0; i < num_ev; i++) + { + if (ssm->wait_events[i].data.u32 == ~0) + { + new_client (); + continue; + } + conn = &ssm->conn_pool[ssm->wait_events[i].data.u32]; +#endif client_fd = conn->fd; +#if ! SOCK_SERVER_USE_EPOLL if (FD_ISSET (client_fd, rfdset)) +#else + if (EPOLLIN & ssm->wait_events[i].events) +#endif { rx_bytes = sock_test_read (client_fd, conn->buf, conn->buf_size, &conn->stats); @@ -489,8 +615,12 @@ main (int argc, char **argv) close (client_fd); #endif conn_pool_free (conn); - +#if ! SOCK_SERVER_USE_EPOLL if (ssm->nfds == (ssm->listen_fd + 1)) +#else + ssm->nfds--; + if (!ssm->nfds) +#endif { printf ("SERVER: All client connections " "closed.\n\nSERVER: " @@ -520,7 +650,7 @@ main (int argc, char **argv) ((char *) conn->buf)[rx_bytes < conn->buf_size ? rx_bytes : conn->buf_size - 1] = 0; - printf ("\nSERVER (fd %d): RX (%d bytes) - '%s'\n", + printf ("SERVER (fd %d): RX (%d bytes) - '%s'\n", conn->fd, rx_bytes, conn->buf); } } diff --git a/src/uri/vppcom.c b/src/uri/vppcom.c index 8aeb9dafa6d..127d511820a 100644 --- a/src/uri/vppcom.c +++ b/src/uri/vppcom.c @@ -68,6 +68,18 @@ typedef enum STATE_FAILED } session_state_t; +typedef struct epoll_event vppcom_epoll_event_t; + +typedef struct +{ + u32 next_sid; + u32 prev_sid; + u32 vep_idx; + vppcom_epoll_event_t ev; +#define VEP_DEFAULT_ET_MASK (EPOLLIN|EPOLLOUT) + u32 et_mask; +} vppcom_epoll_t; + typedef struct { volatile session_state_t state; @@ -79,10 +91,15 @@ typedef struct unix_shared_memory_queue_t *vpp_event_queue; /* Socket configuration state */ + /* TBD: covert 'is_*' vars to bit in u8 flags; */ u8 is_server; u8 is_listen; u8 is_cut_thru; u8 is_nonblocking; + u8 is_vep; + u8 is_vep_session; + u32 wait_cont_idx; + vppcom_epoll_t vep; u32 vrf; u8 is_ip4; u8 ip[16]; @@ -238,7 +255,7 @@ vppcom_session_at_index (u32 session_index, session_t * volatile *sess) if (PREDICT_FALSE ((session_index == ~0) || pool_is_free_index (vcm->sessions, session_index))) { - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return VPPCOM_EBADFD; } @@ -554,7 +571,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * if (PREDICT_FALSE (rv)) { if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, p[0]); } hash_unset (vcm->session_index_by_vpp_handles, mp->handle); @@ -616,7 +633,7 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp) if (PREDICT_FALSE (rval)) { if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, p[0]); } else @@ -658,7 +675,7 @@ vl_api_reset_session_t_handler (vl_api_reset_session_t * mp) if (PREDICT_FALSE (rval)) { if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, p[0]); } else @@ -729,6 +746,7 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) vec_reset_length (a->new_segment_indices); if (PREDICT_FALSE (rv)) { + clib_spinlock_unlock (&vcm->sessions_lockp); clib_warning ("[%d] sm_fifo_segment_attach ('%s') failed", vcm->my_pid, a->segment_name); return; @@ -802,7 +820,7 @@ vppcom_send_disconnect (u32 session_index) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rv; } @@ -993,12 +1011,12 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) session->port = ntohs (mp->port); session->is_ip4 = mp->is_ip4; clib_memcpy (session->ip, mp->ip, sizeof (session->ip)); - clib_spinlock_unlock (&vcm->sessions_lockp); /* Add it to lookup table */ hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index); clib_fifo_add1 (vcm->client_session_index_fifo, session_index); + clib_spinlock_unlock (&vcm->sessions_lockp); /* * Send accept reply to vpp @@ -1031,11 +1049,13 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) svm_fifo_t *tx_fifo; unix_shared_memory_queue_t *event_q = 0; + clib_spinlock_lock (&vcm->sessions_lockp); if (!clib_fifo_free_elts (vcm->client_session_index_fifo)) { if (VPPCOM_DEBUG > 1) clib_warning ("[%d] client session queue is full!", vcm->my_pid); rv = VNET_API_ERROR_QUEUE_FULL; + clib_spinlock_unlock (&vcm->sessions_lockp); goto send_reply; } @@ -1063,7 +1083,6 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) if (VPPCOM_DEBUG > 1) clib_warning ("[%d] created segment '%s'", vcm->my_pid, a->segment_name); - clib_spinlock_lock (&vcm->sessions_lockp); pool_get (vcm->sessions, session); memset (session, 0, sizeof (*session)); session_index = session - vcm->sessions; @@ -1127,8 +1146,8 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp) if (VPPCOM_DEBUG > 1) clib_warning ("[%d] Connected cut-thru to client: sid %d", vcm->my_pid, session_index); - clib_spinlock_unlock (&vcm->sessions_lockp); clib_fifo_add1 (vcm->client_session_index_fifo, session_index); + clib_spinlock_unlock (&vcm->sessions_lockp); send_reply: rmp = vl_msg_api_alloc (sizeof (*rmp)); @@ -1191,7 +1210,7 @@ vppcom_send_unbind_sock (u32 session_index) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return; } @@ -1234,7 +1253,7 @@ vppcom_session_unbind (u32 session_index) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return VPPCOM_EBADFD; } @@ -1807,6 +1826,7 @@ vppcom_session_create (u32 vrf, u8 proto, u8 is_nonblocking) clib_spinlock_lock (&vcm->sessions_lockp); pool_get (vcm->sessions, session); + memset (session, 0, sizeof (*session)); session_index = session - vcm->sessions; session->vrf = vrf; @@ -1832,49 +1852,101 @@ vppcom_session_close (uint32_t session_index) rv = vppcom_session_at_index (session_index, &session); if (PREDICT_FALSE (rv)) { - clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); - return rv; + clib_spinlock_unlock (&vcm->sessions_lockp); + goto done; } clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) clib_warning ("[%d] sid %d", vcm->my_pid, session_index); - if (session->is_cut_thru) + if (session->is_vep) { - if (session->is_server) + u32 next_sid; + for (next_sid = session->vep.next_sid; next_sid != ~0; + next_sid = session->vep.next_sid) { - rv = vppcom_session_unbind_cut_thru (session); + rv = vppcom_epoll_ctl (session_index, EPOLL_CTL_DEL, next_sid, 0); if ((VPPCOM_DEBUG > 0) && (rv < 0)) - clib_warning ("[%d] unbind cut-thru (session %d) failed, " - "rv = %s (%d)", + clib_warning ("[%d] EPOLL_CTL_DEL vep_idx %u, sid %u failed, " + "rv = %s (%d)", session_index, next_sid, vcm->my_pid, session_index, vppcom_retval_str (rv), rv); + + clib_spinlock_lock (&vcm->sessions_lockp); + rv = vppcom_session_at_index (session_index, &session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning + ("[%d] invalid session, sid (%u) has been closed!", + vcm->my_pid, session_index); + clib_spinlock_unlock (&vcm->sessions_lockp); + goto done; + } + clib_spinlock_unlock (&vcm->sessions_lockp); } } - else if (session->is_server) - { - rv = vppcom_session_unbind (session_index); - if ((VPPCOM_DEBUG > 0) && (rv < 0)) - clib_warning ("[%d] unbind (session %d) failed, rv = %s (%d)", - vcm->my_pid, session_index, vppcom_retval_str (rv), rv); - } else { - rv = vppcom_session_disconnect (session_index); - if ((VPPCOM_DEBUG > 0) && (rv < 0)) - clib_warning ("[%d] disconnect (session %d) failed, rv = %s (%d)", - vcm->my_pid, session_index, vppcom_retval_str (rv), rv); - } - if (rv < 0) - return rv; + if (session->is_vep_session) + { + u32 vep_idx = session->vep.vep_idx; + rv = vppcom_epoll_ctl (vep_idx, EPOLL_CTL_DEL, session_index, 0); + if ((VPPCOM_DEBUG > 0) && (rv < 0)) + clib_warning ("[%d] EPOLL_CTL_DEL vep_idx %u, sid %u failed, " + "rv = %s (%d)", vep_idx, session_index, + vcm->my_pid, session_index, + vppcom_retval_str (rv), rv); - clib_spinlock_lock (&vcm->sessions_lockp); + clib_spinlock_lock (&vcm->sessions_lockp); + rv = vppcom_session_at_index (session_index, &session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning + ("[%d] invalid session, sid (%u) has been closed!", + vcm->my_pid, session_index); + clib_spinlock_unlock (&vcm->sessions_lockp); + goto done; + } + clib_spinlock_unlock (&vcm->sessions_lockp); + } + + if (session->is_cut_thru) + { + if (session->is_server) + { + rv = vppcom_session_unbind_cut_thru (session); + if ((VPPCOM_DEBUG > 0) && (rv < 0)) + clib_warning ("[%d] unbind cut-thru (session %d) failed, " + "rv = %s (%d)", + vcm->my_pid, session_index, + vppcom_retval_str (rv), rv); + } + } + else if (session->is_server) + { + rv = vppcom_session_unbind (session_index); + if ((VPPCOM_DEBUG > 0) && (rv < 0)) + clib_warning ("[%d] unbind (session %d) failed, rv = %s (%d)", + vcm->my_pid, session_index, + vppcom_retval_str (rv), rv); + } + else + { + rv = vppcom_session_disconnect (session_index); + if ((VPPCOM_DEBUG > 0) && (rv < 0)) + clib_warning ("[%d] disconnect (session %d) failed, rv = %s (%d)", + vcm->my_pid, session_index, + vppcom_retval_str (rv), rv); + } + } pool_put_index (vcm->sessions, session_index); - clib_spinlock_unlock (&vcm->sessions_lockp); +done: return rv; } @@ -1895,7 +1967,7 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rv; } @@ -1927,7 +1999,7 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, listen_session_index); return rv; } @@ -1957,13 +2029,13 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, listen_session_index); return rv; } listen_session->is_listen = 1; - clib_spinlock_unlock (&vcm->sessions_lockp); clib_fifo_validate (vcm->client_session_index_fifo, q_len); + clib_spinlock_unlock (&vcm->sessions_lockp); return VPPCOM_OK; } @@ -1985,7 +2057,7 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep, { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, listen_session_index); return rv; } @@ -2025,9 +2097,8 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep, break; } - clib_fifo_sub1 (vcm->client_session_index_fifo, client_session_index); - clib_spinlock_lock (&vcm->sessions_lockp); + clib_fifo_sub1 (vcm->client_session_index_fifo, client_session_index); rv = vppcom_session_at_index (client_session_index, &client_session); ASSERT (rv == VPPCOM_OK); ASSERT (client_session->is_ip4 == listen_session->is_ip4); @@ -2062,7 +2133,7 @@ vppcom_session_connect (uint32_t session_index, vppcom_endpt_t * server_ep) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rv; } @@ -2071,7 +2142,7 @@ vppcom_session_connect (uint32_t session_index, vppcom_endpt_t * server_ep) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] session, sid (%d) already connected!", + clib_warning ("[%d] session, sid (%u) already connected!", vcm->my_pid, session_index); return VPPCOM_OK; } @@ -2115,7 +2186,6 @@ vppcom_session_read (uint32_t session_index, void *buf, int n) svm_fifo_t *rx_fifo; int n_read = 0; int rv; - int max_dequeue; char *fifo_str; ASSERT (buf); @@ -2126,7 +2196,7 @@ vppcom_session_read (uint32_t session_index, void *buf, int n) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rv; } @@ -2135,7 +2205,7 @@ vppcom_session_read (uint32_t session_index, void *buf, int n) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] sid (%d) has been closed by remote peer!", + clib_warning ("[%d] sid (%u) has been closed by remote peer!", vcm->my_pid, session_index); return VPPCOM_ECONNRESET; } @@ -2144,12 +2214,19 @@ vppcom_session_read (uint32_t session_index, void *buf, int n) session->server_rx_fifo : session->server_tx_fifo); fifo_str = ((!session->is_cut_thru || session->is_server) ? "server_rx_fifo" : "server_tx_fifo"); - clib_spinlock_unlock (&vcm->sessions_lockp); - max_dequeue = (int) svm_fifo_max_dequeue (rx_fifo); - n_read = svm_fifo_dequeue_nowait (rx_fifo, clib_min (n, max_dequeue), buf); + do + { + n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf); + } + while (!session->is_nonblocking && (n_read <= 0)); - if (VPPCOM_DEBUG > 2) + if (n_read <= 0) + session->vep.et_mask |= EPOLLIN; + + clib_spinlock_unlock (&vcm->sessions_lockp); + + if ((VPPCOM_DEBUG > 2) && (n_read > 0)) clib_warning ("[%d] sid %d, read %d bytes from %s (%p)", vcm->my_pid, session_index, n_read, fifo_str, rx_fifo); @@ -2167,7 +2244,7 @@ vppcom_session_read_ready (session_t * session, u32 session_index) if (session->state == STATE_DISCONNECT) { if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] sid (%d) has been closed by remote peer!", + clib_warning ("[%d] sid (%u) has been closed by remote peer!", vcm->my_pid, session_index); return VPPCOM_ECONNRESET; } @@ -2187,6 +2264,9 @@ vppcom_session_read_ready (session_t * session, u32 session_index) session_index, session->is_server ? "server_rx_fifo" : "server_tx_fifo", rx_fifo, ready); + if (ready == 0) + session->vep.et_mask |= EPOLLIN; + return ready; } @@ -2198,9 +2278,8 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) svm_fifo_t *tx_fifo; unix_shared_memory_queue_t *q; session_fifo_event_t evt; - int rv; + int rv, n_write; char *fifo_str; - u8 is_nonblocking; ASSERT (buf); @@ -2210,7 +2289,7 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rv; } @@ -2219,7 +2298,7 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] sid (%d) has been closed by remote peer!", + clib_warning ("[%d] sid (%u) has been closed by remote peer!", vcm->my_pid, session_index); return VPPCOM_ECONNRESET; } @@ -2228,18 +2307,14 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) session->server_tx_fifo : session->server_rx_fifo); fifo_str = ((!session->is_cut_thru || session->is_server) ? "server_tx_fifo" : "server_rx_fifo"); - - is_nonblocking = session->is_nonblocking; - clib_spinlock_unlock (&vcm->sessions_lockp); - do { - rv = svm_fifo_enqueue_nowait (tx_fifo, n, buf); + n_write = svm_fifo_enqueue_nowait (tx_fifo, n, buf); } - while (!is_nonblocking && (rv <= 0)); + while (!session->is_nonblocking && (n_write <= 0)); /* If event wasn't set, add one */ - if (!session->is_cut_thru && (rv > 0) && svm_fifo_set_event (tx_fifo)) + if (!session->is_cut_thru && (n_write > 0) && svm_fifo_set_event (tx_fifo)) { int rval; @@ -2248,28 +2323,30 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) evt.event_type = FIFO_EVENT_APP_TX; evt.event_id = vcm->tx_event_id++; - clib_spinlock_lock (&vcm->sessions_lockp); rval = vppcom_session_at_index (session_index, &session); if (PREDICT_FALSE (rval)) { clib_spinlock_unlock (&vcm->sessions_lockp); if (VPPCOM_DEBUG > 1) - clib_warning ("[%d] invalid session, sid (%d) has been closed!", + clib_warning ("[%d] invalid session, sid (%u) has been closed!", vcm->my_pid, session_index); return rval; } q = session->vpp_event_queue; - clib_spinlock_unlock (&vcm->sessions_lockp); ASSERT (q); unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); } + if (n_write <= 0) + session->vep.et_mask |= EPOLLOUT; + + clib_spinlock_unlock (&vcm->sessions_lockp); + if (VPPCOM_DEBUG > 2) clib_warning ("[%d] sid %d, wrote %d bytes to %s (%p)", vcm->my_pid, - session_index, rv, fifo_str, tx_fifo); - - return rv; + session_index, n_write, fifo_str, tx_fifo); + return n_write; } static inline int @@ -2278,13 +2355,13 @@ vppcom_session_write_ready (session_t * session, u32 session_index) vppcom_main_t *vcm = &vppcom_main; svm_fifo_t *tx_fifo; char *fifo_str; - int rv; + int ready; /* Assumes caller has acquired spinlock: vcm->sessions_lockp */ if (session->state == STATE_DISCONNECT) { if (VPPCOM_DEBUG > 0) - clib_warning ("[%d] sid (%d) has been closed by remote peer!", + clib_warning ("[%d] sid (%u) has been closed by remote peer!", vcm->my_pid, session_index); return VPPCOM_ECONNRESET; } @@ -2294,12 +2371,15 @@ vppcom_session_write_ready (session_t * session, u32 session_index) fifo_str = ((!session->is_cut_thru || session->is_server) ? "server_tx_fifo" : "server_rx_fifo"); - rv = svm_fifo_max_enqueue (tx_fifo); + ready = svm_fifo_max_enqueue (tx_fifo); if (VPPCOM_DEBUG > 3) clib_warning ("[%d] sid %d, peek %s (%p), ready = %d", vcm->my_pid, - session_index, fifo_str, tx_fifo, rv); - return rv; + session_index, fifo_str, tx_fifo, ready); + if (ready == 0) + session->vep.et_mask |= EPOLLOUT; + + return ready; } int @@ -2431,10 +2511,520 @@ select_done: return (bits_set); } -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ +static inline void +vep_verify_epoll_chain (u32 vep_idx) +{ + session_t *session; + vppcom_epoll_t *vep; + int rv; + u32 sid; + + if (VPPCOM_DEBUG < 1) + return; + + /* Assumes caller has acquired spinlock: vcm->sessions_lockp */ + rv = vppcom_session_at_index (vep_idx, &session); + if (PREDICT_FALSE (rv)) + { + clib_warning ("ERROR: Invalid vep_idx (%u)!", vep_idx); + goto done; + } + if (PREDICT_FALSE (!session->is_vep)) + { + clib_warning ("ERROR: vep_idx (%u) is not a vep!", vep_idx); + goto done; + } + if (VPPCOM_DEBUG > 1) + clib_warning ("vep_idx (%u): Dumping epoll chain\n" + "{\n" + " is_vep = %u\n" + " is_vep_session = %u\n" + " wait_cont_idx = 0x%x (%u)\n" + "}\n", + vep_idx, session->is_vep, session->is_vep_session, + session->wait_cont_idx, session->wait_cont_idx); + do + { + vep = &session->vep; + if (session->is_vep_session) + { + if (VPPCOM_DEBUG > 1) + clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n" + "{\n" + " next_sid = 0x%x (%u)\n" + " prev_sid = 0x%x (%u)\n" + " vep_idx = 0x%x (%u)\n" + " ev.events = 0x%x\n" + " ev.data.u64 = 0x%llx\n" + " et_mask = 0x%x\n" + "}\n", + vep_idx, sid, sid, + vep->next_sid, vep->next_sid, + vep->prev_sid, vep->prev_sid, + vep->vep_idx, vep->vep_idx, + vep->ev.events, vep->ev.data.u64, vep->et_mask); + } + sid = vep->next_sid; + if (sid != ~0) + { + rv = vppcom_session_at_index (sid, &session); + if (PREDICT_FALSE (rv)) + { + clib_warning ("ERROR: Invalid sid (%u)!", sid); + goto done; + } + if (PREDICT_FALSE (session->is_vep)) + clib_warning ("ERROR: sid (%u) is a vep!", vep_idx); + else if (PREDICT_FALSE (!session->is_vep_session)) + { + clib_warning ("ERROR: session (%u) is not a vep session!", sid); + goto done; + } + if (PREDICT_FALSE (session->vep.vep_idx != vep_idx)) + clib_warning ("ERROR: session (%u) vep_idx (%u) != " + "vep_idx (%u)!", + sid, session->vep.vep_idx, vep_idx); + } + } + while (sid != ~0); + +done: + if (VPPCOM_DEBUG > 1) + clib_warning ("vep_idx (%u): Dump complete!", vep_idx); +} + +int +vppcom_epoll_create (void) +{ + vppcom_main_t *vcm = &vppcom_main; + session_t *vep_session; + u32 vep_idx; + + clib_spinlock_lock (&vcm->sessions_lockp); + pool_get (vcm->sessions, vep_session); + memset (vep_session, 0, sizeof (*vep_session)); + vep_idx = vep_session - vcm->sessions; + + vep_session->is_vep = 1; + vep_session->vep.vep_idx = ~0; + vep_session->vep.next_sid = ~0; + vep_session->vep.prev_sid = ~0; + vep_session->wait_cont_idx = ~0; + clib_spinlock_unlock (&vcm->sessions_lockp); + + if (VPPCOM_DEBUG > 0) + clib_warning ("Created vep_idx %u!", vep_idx); + + return (vep_idx); +} + +int +vppcom_epoll_ctl (uint32_t vep_idx, int op, uint32_t session_index, + struct epoll_event *event) +{ + vppcom_main_t *vcm = &vppcom_main; + session_t *vep_session; + session_t *session; + int rv; + + if (vep_idx == session_index) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: vep_idx == session_index (%u)!", vep_idx); + return VPPCOM_EINVAL; + } + + clib_spinlock_lock (&vcm->sessions_lockp); + rv = vppcom_session_at_index (vep_idx, &vep_session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: Invalid vep_idx (%u)!", vep_idx); + goto done; + } + if (PREDICT_FALSE (!vep_session->is_vep)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: vep_idx (%u) is not a vep!", vep_idx); + rv = VPPCOM_EINVAL; + goto done; + } + + ASSERT (vep_session->vep.vep_idx == ~0); + ASSERT (vep_session->vep.prev_sid == ~0); + + rv = vppcom_session_at_index (session_index, &session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: Invalid session_index (%u)!", session_index); + goto done; + } + if (PREDICT_FALSE (session->is_vep)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: session_index (%u) is a vep!", vep_idx); + rv = VPPCOM_EINVAL; + goto done; + } + + switch (op) + { + case EPOLL_CTL_ADD: + if (PREDICT_FALSE (!event)) + { + clib_warning ("NULL pointer to epoll_event structure!"); + rv = VPPCOM_EINVAL; + goto done; + } + if (vep_session->vep.next_sid != ~0) + { + session_t *next_session; + rv = vppcom_session_at_index (vep_session->vep.next_sid, + &next_session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("EPOLL_CTL_ADD: Invalid vep.next_sid (%u) on" + " vep_idx (%u)!", vep_session->vep.next_sid, + vep_idx); + goto done; + } + ASSERT (next_session->vep.prev_sid == vep_idx); + next_session->vep.prev_sid = session_index; + } + session->vep.next_sid = vep_session->vep.next_sid; + session->vep.prev_sid = vep_idx; + session->vep.vep_idx = vep_idx; + session->vep.et_mask = VEP_DEFAULT_ET_MASK; + session->vep.ev = *event; + session->is_vep_session = 1; + vep_session->vep.next_sid = session_index; + if (VPPCOM_DEBUG > 1) + clib_warning ("EPOLL_CTL_ADD: vep_idx %u, sid %u, events 0x%x," + " data 0x%llx!", vep_idx, session_index, + event->events, event->data.u64); + break; + + case EPOLL_CTL_MOD: + if (PREDICT_FALSE (!event)) + { + clib_warning ("NULL pointer to epoll_event structure!"); + rv = VPPCOM_EINVAL; + goto done; + } + if (PREDICT_FALSE (!session->is_vep_session && + (session->vep.vep_idx != vep_idx))) + { + if (VPPCOM_DEBUG > 0) + { + if (!session->is_vep_session) + clib_warning ("EPOLL_CTL_MOD: session (%u) is not " + "a vep session!", session_index); + else + clib_warning ("EPOLL_CTL_MOD: session (%u) vep_idx (%u) != " + "vep_idx (%u)!", session_index, + session->vep.vep_idx, vep_idx); + } + rv = VPPCOM_EINVAL; + goto done; + } + session->vep.et_mask = VEP_DEFAULT_ET_MASK; + session->vep.ev = *event; + if (VPPCOM_DEBUG > 1) + clib_warning ("EPOLL_CTL_MOD: vep_idx %u, sid %u, events 0x%x," + " data 0x%llx!", vep_idx, session_index, + event->events, event->data.u64); + break; + + case EPOLL_CTL_DEL: + if (PREDICT_FALSE (!session->is_vep_session && + (session->vep.vep_idx != vep_idx))) + { + if (VPPCOM_DEBUG > 0) + { + if (!session->is_vep_session) + clib_warning ("EPOLL_CTL_DEL: session (%u) is not " + "a vep session!", session_index); + else + clib_warning ("EPOLL_CTL_DEL: session (%u) vep_idx (%u) != " + "vep_idx (%u)!", session_index, + session->vep.vep_idx, vep_idx); + } + rv = VPPCOM_EINVAL; + goto done; + } + + vep_session->wait_cont_idx = + (vep_session->wait_cont_idx == session_index) ? + session->vep.next_sid : vep_session->wait_cont_idx; + + if (session->vep.prev_sid == vep_idx) + vep_session->vep.next_sid = session->vep.next_sid; + else + { + session_t *prev_session; + rv = vppcom_session_at_index (session->vep.prev_sid, &prev_session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("EPOLL_CTL_DEL: Invalid vep.prev_sid (%u) on" + " sid (%u)!", session->vep.prev_sid, + session_index); + goto done; + } + ASSERT (prev_session->vep.next_sid == session_index); + prev_session->vep.next_sid = session->vep.next_sid; + } + if (session->vep.next_sid != ~0) + { + session_t *next_session; + rv = vppcom_session_at_index (session->vep.next_sid, &next_session); + if (PREDICT_FALSE (rv)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("EPOLL_CTL_DEL: Invalid vep.next_sid (%u) on" + " sid (%u)!", session->vep.next_sid, + session_index); + goto done; + } + ASSERT (next_session->vep.prev_sid == session_index); + next_session->vep.prev_sid = session->vep.prev_sid; + } + + memset (&session->vep, 0, sizeof (session->vep)); + session->vep.next_sid = ~0; + session->vep.prev_sid = ~0; + session->vep.vep_idx = ~0; + session->is_vep_session = 0; + if (VPPCOM_DEBUG > 1) + clib_warning ("EPOLL_CTL_DEL: vep_idx %u, sid %u!", vep_idx, + session_index); + break; + + default: + clib_warning ("Invalid operation (%d)!", op); + rv = VPPCOM_EINVAL; + } + + vep_verify_epoll_chain (vep_idx); + +done: + clib_spinlock_unlock (&vcm->sessions_lockp); + return rv; +} + +#define VCL_LOCK_AND_GET_SESSION(I, S) \ +do { \ + vppcom_main_t *vcm = &vppcom_main; \ + \ + clib_spinlock_lock (&vcm->sessions_lockp); \ + rv = vppcom_session_at_index (I, S); \ + if (PREDICT_FALSE (rv)) \ + { \ + clib_spinlock_unlock (&vcm->sessions_lockp); \ + \ + if (VPPCOM_DEBUG > 0) \ + clib_warning ("ERROR: Invalid ##I (%u)!", I); \ + \ + goto done; \ + } \ +} while (0) + +int +vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events, + int maxevents, double wait_for_time) +{ + vppcom_main_t *vcm = &vppcom_main; + session_t *vep_session; + int rv; + f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time; + int num_ev = 0; + u32 vep_next_sid, wait_cont_idx; + u8 is_vep; + + if (PREDICT_FALSE (maxevents <= 0)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: Invalid maxevents (%d)!", maxevents); + return VPPCOM_EINVAL; + } + if (PREDICT_FALSE (wait_for_time < 0)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: Invalid wait_for_time (%f)!", wait_for_time); + return VPPCOM_EINVAL; + } + memset (events, 0, sizeof (*events) * maxevents); + + VCL_LOCK_AND_GET_SESSION (vep_idx, &vep_session); + vep_next_sid = vep_session->vep.next_sid; + is_vep = vep_session->is_vep; + wait_cont_idx = vep_session->wait_cont_idx; + clib_spinlock_unlock (&vcm->sessions_lockp); + + if (PREDICT_FALSE (!is_vep)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: vep_idx (%u) is not a vep!", vep_idx); + rv = VPPCOM_EINVAL; + goto done; + } + if ((VPPCOM_DEBUG > 0) && (PREDICT_FALSE (vep_next_sid == ~0))) + { + clib_warning ("WARNING: vep_idx (%u) is empty!", vep_idx); + goto done; + } + + do + { + u32 sid; + u32 next_sid = ~0; + session_t *session; + + for (sid = (wait_cont_idx == ~0) ? vep_next_sid : wait_cont_idx; + sid != ~0; sid = next_sid) + { + u32 session_events, et_mask, clear_et_mask, session_vep_idx; + u8 add_event, is_vep_session; + int ready; + u64 session_ev_data; + + VCL_LOCK_AND_GET_SESSION (sid, &session); + next_sid = session->vep.next_sid; + session_events = session->vep.ev.events; + et_mask = session->vep.et_mask; + is_vep = session->is_vep; + is_vep_session = session->is_vep_session; + session_vep_idx = session->vep.vep_idx; + session_ev_data = session->vep.ev.data.u64; + clib_spinlock_unlock (&vcm->sessions_lockp); + + if (PREDICT_FALSE (is_vep)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("ERROR: sid (%u) is a vep!", vep_idx); + rv = VPPCOM_EINVAL; + goto done; + } + if (PREDICT_FALSE (!is_vep_session)) + { + if (VPPCOM_DEBUG > 0) + clib_warning ("EPOLL_CTL_MOD: session (%u) is not " + "a vep session!", sid); + rv = VPPCOM_EINVAL; + goto done; + } + if (PREDICT_FALSE (session_vep_idx != vep_idx)) + { + clib_warning ("EPOLL_CTL_MOD: session (%u) " + "vep_idx (%u) != vep_idx (%u)!", + sid, session->vep.vep_idx, vep_idx); + rv = VPPCOM_EINVAL; + goto done; + } + + add_event = clear_et_mask = 0; + + if ((EPOLLIN & session_events) && (EPOLLIN & et_mask)) + { + VCL_LOCK_AND_GET_SESSION (sid, &session); + ready = vppcom_session_read_ready (session, sid); + clib_spinlock_unlock (&vcm->sessions_lockp); + if (ready > 0) + { + add_event = 1; + events[num_ev].events |= EPOLLIN; + if (EPOLLET & session_events) + clear_et_mask |= EPOLLIN; + } + else if (ready < 0) + { + add_event = 1; + switch (ready) + { + case VPPCOM_ECONNRESET: + events[num_ev].events |= EPOLLHUP | EPOLLRDHUP; + break; + + default: + events[num_ev].events |= EPOLLERR; + break; + } + } + } + + if ((EPOLLOUT & session_events) && (EPOLLOUT & et_mask)) + { + VCL_LOCK_AND_GET_SESSION (sid, &session); + ready = vppcom_session_write_ready (session, sid); + clib_spinlock_unlock (&vcm->sessions_lockp); + if (ready > 0) + { + add_event = 1; + events[num_ev].events |= EPOLLOUT; + if (EPOLLET & session_events) + clear_et_mask |= EPOLLOUT; + } + else if (ready < 0) + { + add_event = 1; + switch (ready) + { + case VPPCOM_ECONNRESET: + events[num_ev].events |= EPOLLHUP; + break; + + default: + events[num_ev].events |= EPOLLERR; + break; + } + } + } + + if (add_event) + { + events[num_ev].data.u64 = session_ev_data; + if (EPOLLONESHOT & session_events) + { + VCL_LOCK_AND_GET_SESSION (sid, &session); + session->vep.ev.events = 0; + clib_spinlock_unlock (&vcm->sessions_lockp); + } + num_ev++; + if (num_ev == maxevents) + { + VCL_LOCK_AND_GET_SESSION (vep_idx, &vep_session); + vep_session->wait_cont_idx = next_sid; + clib_spinlock_unlock (&vcm->sessions_lockp); + goto done; + } + } + if (wait_cont_idx != ~0) + { + if (next_sid == ~0) + next_sid = vep_next_sid; + else if (next_sid == wait_cont_idx) + next_sid = ~0; + } + } + } + while ((num_ev == 0) && (clib_time_now (&vcm->clib_time) <= timeout)); + + if (wait_cont_idx != ~0) + { + VCL_LOCK_AND_GET_SESSION (vep_idx, &vep_session); + vep_session->wait_cont_idx = ~0; + clib_spinlock_unlock (&vcm->sessions_lockp); + } +done: + return (rv != VPPCOM_OK) ? rv : num_ev; +} + + /* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/uri/vppcom.h b/src/uri/vppcom.h index 4b048e039ec..b7c802897d9 100644 --- a/src/uri/vppcom.h +++ b/src/uri/vppcom.h @@ -18,6 +18,7 @@ #include <netdb.h> #include <errno.h> +#include <sys/epoll.h> /* * VPPCOM Public API Definitions, Enums, and Data Structures @@ -141,6 +142,13 @@ extern int vppcom_select (unsigned long n_bits, unsigned long *write_map, unsigned long *except_map, double wait_for_time); +extern int vppcom_epoll_create (void); +extern int vppcom_epoll_ctl (uint32_t vep_idx, int op, + uint32_t session_index, + struct epoll_event *event); +extern int vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events, + int maxevents, double wait_for_time); + #endif /* included_vppcom_h */ /* |