From 6b3e017e5d25f15da73f7700f7f2ac553ef1a2e9 Mon Sep 17 00:00:00 2001 From: Christian Ehrhardt Date: Thu, 8 Dec 2016 14:07:29 +0100 Subject: Imported Upstream version 16.11 Change-Id: I1944c65ddc88a9ad70f8c0eb6731552b84fbcb77 Signed-off-by: Christian Ehrhardt --- lib/librte_vhost/socket.c | 615 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 615 insertions(+) create mode 100644 lib/librte_vhost/socket.c (limited to 'lib/librte_vhost/socket.c') diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c new file mode 100644 index 00000000..aaa9c270 --- /dev/null +++ b/lib/librte_vhost/socket.c @@ -0,0 +1,615 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2016 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "fd_man.h" +#include "vhost.h" +#include "vhost_user.h" + +/* + * Every time rte_vhost_driver_register() is invoked, an associated + * vhost_user_socket struct will be created. + */ +struct vhost_user_socket { + char *path; + int listenfd; + int connfd; + bool is_server; + bool reconnect; + bool dequeue_zero_copy; +}; + +struct vhost_user_connection { + struct vhost_user_socket *vsocket; + int vid; +}; + +#define MAX_VHOST_SOCKET 1024 +struct vhost_user { + struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; + struct fdset fdset; + int vsocket_cnt; + pthread_mutex_t mutex; +}; + +#define MAX_VIRTIO_BACKLOG 128 + +static void vhost_user_server_new_connection(int fd, void *data, int *remove); +static void vhost_user_read_cb(int fd, void *dat, int *remove); +static int vhost_user_create_client(struct vhost_user_socket *vsocket); + +static struct vhost_user vhost_user = { + .fdset = { + .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} }, + .fd_mutex = PTHREAD_MUTEX_INITIALIZER, + .num = 0 + }, + .vsocket_cnt = 0, + .mutex = PTHREAD_MUTEX_INITIALIZER, +}; + +/* return bytes# of read on success or negative val on failure. */ +int +read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) +{ + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fd_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + + ret = recvmsg(sockfd, &msgh, 0); + if (ret <= 0) { + RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n"); + return ret; + } + + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { + RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n"); + return -1; + } + + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { + if ((cmsg->cmsg_level == SOL_SOCKET) && + (cmsg->cmsg_type == SCM_RIGHTS)) { + memcpy(fds, CMSG_DATA(cmsg), fdsize); + break; + } + } + + return ret; +} + +int +send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) +{ + + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fd_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + + if (fds && fd_num > 0) { + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + cmsg = CMSG_FIRSTHDR(&msgh); + cmsg->cmsg_len = CMSG_LEN(fdsize); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(cmsg), fds, fdsize); + } else { + msgh.msg_control = NULL; + msgh.msg_controllen = 0; + } + + do { + ret = sendmsg(sockfd, &msgh, 0); + } while (ret < 0 && errno == EINTR); + + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n"); + return ret; + } + + return ret; +} + +static void +vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) +{ + int vid; + size_t size; + struct vhost_user_connection *conn; + int ret; + + conn = malloc(sizeof(*conn)); + if (conn == NULL) { + close(fd); + return; + } + + vid = vhost_new_device(); + if (vid == -1) { + close(fd); + free(conn); + return; + } + + size = strnlen(vsocket->path, PATH_MAX); + vhost_set_ifname(vid, vsocket->path, size); + + if (vsocket->dequeue_zero_copy) + vhost_enable_dequeue_zero_copy(vid); + + RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid); + + vsocket->connfd = fd; + conn->vsocket = vsocket; + conn->vid = vid; + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, + NULL, conn); + if (ret < 0) { + vsocket->connfd = -1; + free(conn); + close(fd); + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add fd %d into vhost server fdset\n", + fd); + } +} + +/* call back when there is new vhost-user connection from client */ +static void +vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused) +{ + struct vhost_user_socket *vsocket = dat; + + fd = accept(fd, NULL, NULL); + if (fd < 0) + return; + + RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd); + vhost_user_add_connection(fd, vsocket); +} + +static void +vhost_user_read_cb(int connfd, void *dat, int *remove) +{ + struct vhost_user_connection *conn = dat; + struct vhost_user_socket *vsocket = conn->vsocket; + int ret; + + ret = vhost_user_msg_handler(conn->vid, connfd); + if (ret < 0) { + vsocket->connfd = -1; + close(connfd); + *remove = 1; + vhost_destroy_device(conn->vid); + free(conn); + + if (vsocket->reconnect) + vhost_user_create_client(vsocket); + } +} + +static int +create_unix_socket(const char *path, struct sockaddr_un *un, bool is_server) +{ + int fd; + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return -1; + RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n", + is_server ? "server" : "client", fd); + + if (!is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "vhost-user: can't set nonblocking mode for socket, fd: " + "%d (%s)\n", fd, strerror(errno)); + close(fd); + return -1; + } + + memset(un, 0, sizeof(*un)); + un->sun_family = AF_UNIX; + strncpy(un->sun_path, path, sizeof(un->sun_path)); + un->sun_path[sizeof(un->sun_path) - 1] = '\0'; + + return fd; +} + +static int +vhost_user_create_server(struct vhost_user_socket *vsocket) +{ + int fd; + int ret; + struct sockaddr_un un; + const char *path = vsocket->path; + + fd = create_unix_socket(path, &un, vsocket->is_server); + if (fd < 0) + return -1; + + ret = bind(fd, (struct sockaddr *)&un, sizeof(un)); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to bind to %s: %s; remove it and try again\n", + path, strerror(errno)); + goto err; + } + RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path); + + ret = listen(fd, MAX_VIRTIO_BACKLOG); + if (ret < 0) + goto err; + + vsocket->listenfd = fd; + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, + NULL, vsocket); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add listen fd %d to vhost server fdset\n", + fd); + goto err; + } + + return 0; + +err: + close(fd); + return -1; +} + +struct vhost_user_reconnect { + struct sockaddr_un un; + int fd; + struct vhost_user_socket *vsocket; + + TAILQ_ENTRY(vhost_user_reconnect) next; +}; + +TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect); +struct vhost_user_reconnect_list { + struct vhost_user_reconnect_tailq_list head; + pthread_mutex_t mutex; +}; + +static struct vhost_user_reconnect_list reconn_list; +static pthread_t reconn_tid; + +static int +vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz) +{ + int ret, flags; + + ret = connect(fd, un, sz); + if (ret < 0 && errno != EISCONN) + return -1; + + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't get flags for connfd %d\n", fd); + return -2; + } + if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't disable nonblocking on fd %d\n", fd); + return -2; + } + return 0; +} + +static void * +vhost_user_client_reconnect(void *arg __rte_unused) +{ + int ret; + struct vhost_user_reconnect *reconn, *next; + + while (1) { + pthread_mutex_lock(&reconn_list.mutex); + + /* + * An equal implementation of TAILQ_FOREACH_SAFE, + * which does not exist on all platforms. + */ + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + ret = vhost_user_connect_nonblock(reconn->fd, + (struct sockaddr *)&reconn->un, + sizeof(reconn->un)); + if (ret == -2) { + close(reconn->fd); + RTE_LOG(ERR, VHOST_CONFIG, + "reconnection for fd %d failed\n", + reconn->fd); + goto remove_fd; + } + if (ret == -1) + continue; + + RTE_LOG(INFO, VHOST_CONFIG, + "%s: connected\n", reconn->vsocket->path); + vhost_user_add_connection(reconn->fd, reconn->vsocket); +remove_fd: + TAILQ_REMOVE(&reconn_list.head, reconn, next); + free(reconn); + } + + pthread_mutex_unlock(&reconn_list.mutex); + sleep(1); + } + + return NULL; +} + +static int +vhost_user_reconnect_init(void) +{ + int ret; + + pthread_mutex_init(&reconn_list.mutex, NULL); + TAILQ_INIT(&reconn_list.head); + + ret = pthread_create(&reconn_tid, NULL, + vhost_user_client_reconnect, NULL); + if (ret < 0) + RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread"); + + return ret; +} + +static int +vhost_user_create_client(struct vhost_user_socket *vsocket) +{ + int fd; + int ret; + struct sockaddr_un un; + const char *path = vsocket->path; + struct vhost_user_reconnect *reconn; + + fd = create_unix_socket(path, &un, vsocket->is_server); + if (fd < 0) + return -1; + + ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&un, + sizeof(un)); + if (ret == 0) { + vhost_user_add_connection(fd, vsocket); + return 0; + } + + RTE_LOG(ERR, VHOST_CONFIG, + "failed to connect to %s: %s\n", + path, strerror(errno)); + + if (ret == -2 || !vsocket->reconnect) { + close(fd); + return -1; + } + + RTE_LOG(ERR, VHOST_CONFIG, "%s: reconnecting...\n", path); + reconn = malloc(sizeof(*reconn)); + if (reconn == NULL) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to allocate memory for reconnect\n"); + close(fd); + return -1; + } + reconn->un = un; + reconn->fd = fd; + reconn->vsocket = vsocket; + pthread_mutex_lock(&reconn_list.mutex); + TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next); + pthread_mutex_unlock(&reconn_list.mutex); + + return 0; +} + +/* + * Register a new vhost-user socket; here we could act as server + * (the default case), or client (when RTE_VHOST_USER_CLIENT) flag + * is set. + */ +int +rte_vhost_driver_register(const char *path, uint64_t flags) +{ + int ret = -1; + struct vhost_user_socket *vsocket; + + if (!path) + return -1; + + pthread_mutex_lock(&vhost_user.mutex); + + if (vhost_user.vsocket_cnt == MAX_VHOST_SOCKET) { + RTE_LOG(ERR, VHOST_CONFIG, + "error: the number of vhost sockets reaches maximum\n"); + goto out; + } + + vsocket = malloc(sizeof(struct vhost_user_socket)); + if (!vsocket) + goto out; + memset(vsocket, 0, sizeof(struct vhost_user_socket)); + vsocket->path = strdup(path); + vsocket->connfd = -1; + vsocket->dequeue_zero_copy = flags & RTE_VHOST_USER_DEQUEUE_ZERO_COPY; + + if ((flags & RTE_VHOST_USER_CLIENT) != 0) { + vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT); + if (vsocket->reconnect && reconn_tid == 0) { + if (vhost_user_reconnect_init() < 0) { + free(vsocket->path); + free(vsocket); + goto out; + } + } + ret = vhost_user_create_client(vsocket); + } else { + vsocket->is_server = true; + ret = vhost_user_create_server(vsocket); + } + if (ret < 0) { + free(vsocket->path); + free(vsocket); + goto out; + } + + vhost_user.vsockets[vhost_user.vsocket_cnt++] = vsocket; + +out: + pthread_mutex_unlock(&vhost_user.mutex); + + return ret; +} + +static bool +vhost_user_remove_reconnect(struct vhost_user_socket *vsocket) +{ + int found = false; + struct vhost_user_reconnect *reconn, *next; + + pthread_mutex_lock(&reconn_list.mutex); + + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + if (reconn->vsocket == vsocket) { + TAILQ_REMOVE(&reconn_list.head, reconn, next); + close(reconn->fd); + free(reconn); + found = true; + break; + } + } + pthread_mutex_unlock(&reconn_list.mutex); + return found; +} + +/** + * Unregister the specified vhost socket + */ +int +rte_vhost_driver_unregister(const char *path) +{ + int i; + int count; + struct vhost_user_connection *conn; + + pthread_mutex_lock(&vhost_user.mutex); + + for (i = 0; i < vhost_user.vsocket_cnt; i++) { + struct vhost_user_socket *vsocket = vhost_user.vsockets[i]; + + if (!strcmp(vsocket->path, path)) { + if (vsocket->is_server) { + fdset_del(&vhost_user.fdset, vsocket->listenfd); + close(vsocket->listenfd); + unlink(path); + } else if (vsocket->reconnect) { + vhost_user_remove_reconnect(vsocket); + } + + conn = fdset_del(&vhost_user.fdset, vsocket->connfd); + if (conn) { + RTE_LOG(INFO, VHOST_CONFIG, + "free connfd = %d for device '%s'\n", + vsocket->connfd, path); + close(vsocket->connfd); + vhost_destroy_device(conn->vid); + free(conn); + } + + free(vsocket->path); + free(vsocket); + + count = --vhost_user.vsocket_cnt; + vhost_user.vsockets[i] = vhost_user.vsockets[count]; + vhost_user.vsockets[count] = NULL; + pthread_mutex_unlock(&vhost_user.mutex); + + return 0; + } + } + pthread_mutex_unlock(&vhost_user.mutex); + + return -1; +} + +int +rte_vhost_driver_session_start(void) +{ + fdset_event_dispatch(&vhost_user.fdset); + return 0; +} -- cgit 1.2.3-korg