diff options
Diffstat (limited to 'stacks/rsocket')
-rw-r--r-- | stacks/rsocket/CMakeLists.txt | 61 | ||||
-rw-r--r-- | stacks/rsocket/config/module_config.json | 30 | ||||
-rw-r--r-- | stacks/rsocket/config/rd_config.json | 10 | ||||
-rw-r--r-- | stacks/rsocket/rsocket.patch | 751 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.c | 218 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.h | 55 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_in.h | 33 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rdma.h | 97 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rs.c | 354 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_sapi.h | 38 |
10 files changed, 1647 insertions, 0 deletions
diff --git a/stacks/rsocket/CMakeLists.txt b/stacks/rsocket/CMakeLists.txt new file mode 100644 index 0000000..c1b2f18 --- /dev/null +++ b/stacks/rsocket/CMakeLists.txt @@ -0,0 +1,61 @@ +######################################################################### +# +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +######################################################################### + + +SET(rdmacm_dir librdmacm-1.1.0) +SET(dmm_inc_dir ${DMM_REL_INC_DIR}) +SET(RSOCKET_DEBUG 0) + +######################## + +SET(rdmacm_url https://github.com/ofiwg/librdmacm/archive/v1.1.0.tar.gz) + +INCLUDE(ExternalProject) +ExternalProject_Add( + rdmacm + URL ${rdmacm_url} + SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/${rdmacm_dir} + DOWNLOAD_DIR ${CMAKE_CURRENT_LIST_DIR} + PATCH_COMMAND patch -p1 -i ../rsocket.patch + CONFIGURE_COMMAND ./autogen.sh && ./configure dmm_inc_dir=${DMM_REL_INC_DIR} RSOCKET_DEBUG=${RSOCKET_DEBUG} + BUILD_IN_SOURCE 1 + BUILD_COMMAND make + INSTALL_COMMAND cp -f libdmm_rdmacm.a ${LIB_PATH_STATIC}/ + DEPENDS DPDK +) +set_target_properties(rdmacm PROPERTIES EXCLUDE_FROM_ALL TRUE) + +######################## + +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2 -g -fPIC -m64 -pthread") + +ADD_LIBRARY(dmm_rsocket SHARED src/rsocket_adpt.c) + +ADD_DEFINITIONS(-D_GNU_SOURCE -DRSOCKET_DEBUG=${RSOCKET_DEBUG}) + +INCLUDE_DIRECTORIES(${DMM_REL_INC_DIR}) +INCLUDE_DIRECTORIES(./src ${rdmacm_dir} ${rdmacm_dir}/include ${rdmacm_dir}/src) + +TARGET_LINK_LIBRARIES(dmm_rsocket + -Wl,--whole-archive + ${LIB_PATH_STATIC}/libdmm_rdmacm.a + -Wl,--no-whole-archive + ibverbs pthread dl rt +) + +ADD_DEPENDENCIES(dmm_rsocket rdmacm DPDK) + +set_target_properties(dmm_rsocket PROPERTIES EXCLUDE_FROM_ALL TRUE) diff --git a/stacks/rsocket/config/module_config.json b/stacks/rsocket/config/module_config.json new file mode 100644 index 0000000..2df82cd --- /dev/null +++ b/stacks/rsocket/config/module_config.json @@ -0,0 +1,30 @@ +{ + "default_stack_name": "kernel", /*when rd can't be find maybe choose the defualt one*/ + "module_list": [ + { + "stack_name": "kernel", /*stack name*/ + "function_name": "kernel_stack_register", /*function name*/ + "libname": "./", /*library name, if loadtype is static, this maybe + null, else must give a library name*/ + "loadtype": "static", /*library load type: static or dynamic*/ + "deploytype": "1", /*deploy model type:model type1, model type2, + model type3. Indicating single or multi process + deployment. Used during shared memory initialization.*/ + "maxfd": "1024", /*the max fd supported*/ + "minfd": "0", /*the min fd supported*/ + "priorty": "1", /*priorty when executing, reserv*/ + "stackid": "0", /*stack id, this must be ordered and not be repeated*/ + }, + { + "stack_name": "rsocket", + "function_name": "rsocket_stack_register", + "libname": "libdmm_rsocket.so", + "loadtype": "dynmic", + "deploytype": "1", + "maxfd": "1024", + "minfd": "0", + "priorty": "1", + "stackid": "1", + }, + ] +} diff --git a/stacks/rsocket/config/rd_config.json b/stacks/rsocket/config/rd_config.json new file mode 100644 index 0000000..ea1fc7b --- /dev/null +++ b/stacks/rsocket/config/rd_config.json @@ -0,0 +1,10 @@ +{ + "ip_route": [ + { + "subnet": "192.168.1.1/24", + "type": "nstack-rsocket", + }, + ], + "prot_route": [ + ], +} diff --git a/stacks/rsocket/rsocket.patch b/stacks/rsocket/rsocket.patch new file mode 100644 index 0000000..430f178 --- /dev/null +++ b/stacks/rsocket/rsocket.patch @@ -0,0 +1,751 @@ +diff --git a/Makefile.am b/Makefile.am +index bf72134..cb02f56 100644 +--- a/Makefile.am ++++ b/Makefile.am +@@ -22,6 +22,12 @@ src_librdmacm_la_LDFLAGS = -version-info 1 -export-dynamic \ + $(librdmacm_version_script) + src_librdmacm_la_DEPENDENCIES = $(srcdir)/src/librdmacm.map + ++noinst_LIBRARIES = libdmm_rdmacm.a ++libdmm_rdmacm_a_CFLAGS = $(src_librdmacm_la_CFLAGS) -O2 -fPIC -m64 \ ++ -I@dmm_inc_dir@ -I$(srcdir)/../src -DDMM_RSOCKET=1 -DRSOCKET_DEBUG=@RSOCKET_DEBUG@ ++libdmm_rdmacm_a_LDFLAGS = ++libdmm_rdmacm_a_SOURCES = $(src_librdmacm_la_SOURCES) ++ + src_librspreload_la_SOURCES = src/preload.c src/indexer.c + src_librspreload_la_LDFLAGS = -version-info 1 -export-dynamic + src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la +diff --git a/configure.ac b/configure.ac +index 4a43995..98601ea 100644 +--- a/configure.ac ++++ b/configure.ac +@@ -104,5 +104,12 @@ if test "x$rdmadir" = "x"; then + AC_SUBST(rdmadir, rdma) + fi + ++AC_ARG_VAR(dmm_inc_dir, [Directory for dmm include files]) ++if test "x$dmm_inc_dir" = "x"; then ++ AC_MSG_ERROR([must set dmm_inc_dir]) ++fi ++ ++AC_ARG_VAR(RSOCKET_DEBUG, [rsocket debug flag 0 or 1]) ++ + AC_CONFIG_FILES([Makefile librdmacm.spec]) + AC_OUTPUT +diff --git a/src/cma.c b/src/cma.c +index a89e663..071222d 100644 +--- a/src/cma.c ++++ b/src/cma.c +@@ -60,6 +60,12 @@ + #include <rdma/rdma_verbs.h> + #include <infiniband/ib.h> + ++#ifdef DMM_RSOCKET ++#include "rsocket_rdma.h" ++#else ++#define GSAPI(name) name ++#endif ++ + #define CMA_INIT_CMD(req, req_size, op) \ + do { \ + memset(req, 0, req_size); \ +@@ -564,7 +570,7 @@ static int rdma_create_id2(struct rdma_event_channel *channel, + cmd.ps = ps; + cmd.qp_type = qp_type; + +- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + goto err; + +@@ -599,7 +605,7 @@ static int ucma_destroy_kern_id(int fd, uint32_t handle) + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, DESTROY_ID, &resp, sizeof resp); + cmd.id = handle; + +- ret = write(fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -659,7 +665,7 @@ static int ucma_query_addr(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_ADDR; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -692,7 +698,7 @@ static int ucma_query_gid(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_GID; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -753,7 +759,7 @@ static int ucma_query_path(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_PATH; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -784,7 +790,7 @@ static int ucma_query_route(struct rdma_cm_id *id) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -835,7 +841,7 @@ static int rdma_bind_addr2(struct rdma_cm_id *id, struct sockaddr *addr, + cmd.addr_size = addrlen; + memcpy(&cmd.addr, addr, addrlen); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -863,7 +869,7 @@ int rdma_bind_addr(struct rdma_cm_id *id, struct sockaddr *addr) + cmd.id = id_priv->handle; + memcpy(&cmd.addr, addr, addrlen); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -916,7 +922,7 @@ static int rdma_resolve_addr2(struct rdma_cm_id *id, struct sockaddr *src_addr, + cmd.dst_size = dst_len; + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -951,7 +957,7 @@ int rdma_resolve_addr(struct rdma_cm_id *id, struct sockaddr *src_addr, + memcpy(&cmd.dst_addr, dst_addr, dst_len); + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1003,7 +1009,7 @@ int rdma_resolve_route(struct rdma_cm_id *id, int timeout_ms) + cmd.id = id_priv->handle; + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1029,7 +1035,7 @@ static int rdma_init_qp_attr(struct rdma_cm_id *id, struct ibv_qp_attr *qp_attr, + cmd.id = id_priv->handle; + cmd.qp_state = qp_attr->qp_state; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1530,7 +1536,7 @@ int rdma_connect(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) + conn_param, 0, 0); + } + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1553,7 +1559,7 @@ int rdma_listen(struct rdma_cm_id *id, int backlog) + cmd.id = id_priv->handle; + cmd.backlog = backlog; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1656,7 +1662,7 @@ int rdma_accept(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) + conn_param, conn_param->qp_num, + conn_param->srq); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ucma_modify_qp_err(id); + return (ret >= 0) ? ERR(ENODATA) : -1; +@@ -1684,7 +1690,7 @@ int rdma_reject(struct rdma_cm_id *id, const void *private_data, + cmd.private_data_len = private_data_len; + } + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1702,7 +1708,7 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + cmd.event = event; +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1735,7 +1741,7 @@ int rdma_disconnect(struct rdma_cm_id *id) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1778,7 +1784,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, + cmd.uid = (uintptr_t) mc; + cmd.reserved = 0; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err2; +@@ -1791,7 +1797,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, + memcpy(&cmd.addr, addr, addrlen); + cmd.uid = (uintptr_t) mc; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err2; +@@ -1857,7 +1863,7 @@ int rdma_leave_multicast(struct rdma_cm_id *id, struct sockaddr *addr) + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, LEAVE_MCAST, &resp, sizeof resp); + cmd.id = mc->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto free; +@@ -2019,7 +2025,7 @@ static int ucma_process_conn_resp(struct cma_id_private *id_priv) + CMA_INIT_CMD(&cmd, sizeof cmd, ACCEPT); + cmd.id = id_priv->handle; + +- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err; +@@ -2103,7 +2109,7 @@ int rdma_get_cm_event(struct rdma_event_channel *channel, + retry: + memset(evt, 0, sizeof(*evt)); + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, GET_EVENT, &resp, sizeof resp); +- ret = write(channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + free(evt); + return (ret >= 0) ? ERR(ENODATA) : -1; +@@ -2274,7 +2280,7 @@ int rdma_set_option(struct rdma_cm_id *id, int level, int optname, + cmd.optname = optname; + cmd.optlen = optlen; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -2302,7 +2308,7 @@ int rdma_migrate_id(struct rdma_cm_id *id, struct rdma_event_channel *channel) + cmd.id = id_priv->handle; + cmd.fd = id->channel->fd; + +- ret = write(channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + if (sync) + rdma_destroy_event_channel(channel); +diff --git a/src/rsocket.c b/src/rsocket.c +index c4f1b57..faa3d3a 100644 +--- a/src/rsocket.c ++++ b/src/rsocket.c +@@ -382,6 +382,11 @@ struct rsocket { + dlist_entry iomap_queue; + int iomap_pending; + int unack_cqe; ++#ifdef DMM_RSOCKET ++ int rr_epoll_ref; ++ int rr_epoll_fd; ++ void *rr_epoll_pdata; ++#endif + }; + + #define DS_UDP_TAG 0x55555555 +@@ -404,6 +409,12 @@ struct ds_udp_header { + + #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list) + ++#ifdef DMM_RSOCKET ++#include "rsocket_in.h" ++#else ++#define GSAPI(name) name ++#endif ++ + static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) + { + if (!rs->qp_list) +@@ -444,16 +455,16 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) + msg.cmd = cmd; + msg.status = EINVAL; + msg.rs = rs; +- write(svc->sock[0], &msg, sizeof msg); +- read(svc->sock[0], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[0], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[0], &msg, sizeof msg); + ret = rdma_seterrno(msg.status); + if (svc->cnt) + goto unlock; + + pthread_join(svc->id, NULL); + closepair: +- close(svc->sock[0]); +- close(svc->sock[1]); ++ GSAPI(close)(svc->sock[0]); ++ GSAPI(close)(svc->sock[1]); + unlock: + pthread_mutex_unlock(&mut); + return ret; +@@ -607,6 +618,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) + fastlock_init(&rs->map_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); ++#ifdef DMM_RSOCKET ++ rr_rs_init(rs); ++#endif + return rs; + } + +@@ -617,16 +631,16 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg) + + if (rs->type == SOCK_STREAM) { + if (rs->cm_id->recv_cq_channel) +- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); + + if (!ret && rs->state < rs_connected) +- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, arg); + } else { +- ret = fcntl(rs->epfd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->epfd, F_SETFL, arg); + if (!ret && rs->qp_list) { + qp = rs->qp_list; + do { +- ret = fcntl(qp->cm_id->recv_cq_channel->fd, ++ ret = GSAPI(fcntl)(qp->cm_id->recv_cq_channel->fd, + F_SETFL, arg); + qp = ds_next_qp(qp); + } while (qp != rs->qp_list && !ret); +@@ -767,7 +781,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) + goto err1; + + if (rs->fd_flags & O_NONBLOCK) { +- if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) ++ if (GSAPI(fcntl)(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) + goto err2; + } + +@@ -918,7 +932,7 @@ static void ds_free_qp(struct ds_qp *qp) + if (qp->cm_id) { + if (qp->cm_id->qp) { + tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); +- epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, ++ GSAPI(epoll_ctl)(qp->rs->epfd, EPOLL_CTL_DEL, + qp->cm_id->recv_cq_channel->fd, NULL); + rdma_destroy_qp(qp->cm_id); + } +@@ -932,8 +946,12 @@ static void ds_free(struct rsocket *rs) + { + struct ds_qp *qp; + ++#ifdef DMM_RSOCKET ++ rr_rs_dest(rs); ++#endif ++ + if (rs->udp_sock >= 0) +- close(rs->udp_sock); ++ GSAPI(close)(rs->udp_sock); + + if (rs->index >= 0) + rs_remove(rs); +@@ -947,7 +965,7 @@ static void ds_free(struct rsocket *rs) + } + + if (rs->epfd >= 0) +- close(rs->epfd); ++ GSAPI(close)(rs->epfd); + + if (rs->sbuf) + free(rs->sbuf); +@@ -968,6 +986,10 @@ static void rs_free(struct rsocket *rs) + return; + } + ++#ifdef DMM_RSOCKET ++ rr_rs_dest(rs); ++#endif ++ + if (rs->rmsg) + free(rs->rmsg); + +@@ -1059,11 +1081,11 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) + + static int ds_init(struct rsocket *rs, int domain) + { +- rs->udp_sock = socket(domain, SOCK_DGRAM, 0); ++ rs->udp_sock = GSAPI(socket)(domain, SOCK_DGRAM, 0); + if (rs->udp_sock < 0) + return rs->udp_sock; + +- rs->epfd = epoll_create(2); ++ rs->epfd = GSAPI(epoll_create)(2); + if (rs->epfd < 0) + return rs->epfd; + +@@ -1164,7 +1186,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) + if (ret) + return ret; + } +- ret = bind(rs->udp_sock, addr, addrlen); ++ ret = GSAPI(bind)(rs->udp_sock, addr, addrlen); + } + return ret; + } +@@ -1229,8 +1251,11 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) + goto err; + } + +- if (rs->fd_flags & O_NONBLOCK) +- fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); ++ if (rs->fd_flags & O_NONBLOCK) { ++ ret = GSAPI(fcntl)(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); ++ if (0 == ret) ++ new_rs->fd_flags |= O_NONBLOCK; ++ } + + ret = rs_create_ep(new_rs); + if (ret) +@@ -1354,7 +1379,7 @@ connected: + break; + case rs_accepting: + if (!(rs->fd_flags & O_NONBLOCK)) +- fcntl(rs->cm_id->channel->fd, F_SETFL, 0); ++ GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, 0); + + ret = ucma_complete(rs->cm_id); + if (ret) +@@ -1375,6 +1400,10 @@ connected: + rs->err = errno; + } + } ++#ifdef DMM_RSOCKET ++ rr_rs_connected(rs); ++#endif ++ + return ret; + } + +@@ -1397,24 +1426,24 @@ static int ds_get_src_addr(struct rsocket *rs, + uint16_t port; + + *src_len = sizeof(*src_addr); +- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); ++ ret = GSAPI(getsockname)(rs->udp_sock, &src_addr->sa, src_len); + if (ret || !rs_any_addr(src_addr)) + return ret; + + port = src_addr->sin.sin_port; +- sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0); ++ sock = GSAPI(socket)(dest_addr->sa_family, SOCK_DGRAM, 0); + if (sock < 0) + return sock; + +- ret = connect(sock, dest_addr, dest_len); ++ ret = GSAPI(connect)(sock, dest_addr, dest_len); + if (ret) + goto out; + + *src_len = sizeof(*src_addr); +- ret = getsockname(sock, &src_addr->sa, src_len); ++ ret = GSAPI(getsockname)(sock, &src_addr->sa, src_len); + src_addr->sin.sin_port = port; + out: +- close(sock); ++ GSAPI(close)(sock); + return ret; + } + +@@ -1512,7 +1541,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, + + event.events = EPOLLIN; + event.data.ptr = qp; +- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, ++ ret = GSAPI(epoll_ctl)(rs->epfd, EPOLL_CTL_ADD, + qp->cm_id->recv_cq_channel->fd, &event); + if (ret) + goto err; +@@ -1609,7 +1638,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) + } + + fastlock_acquire(&rs->slock); +- ret = connect(rs->udp_sock, addr, addrlen); ++ ret = GSAPI(connect)(rs->udp_sock, addr, addrlen); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + fastlock_release(&rs->slock); +@@ -1903,12 +1932,18 @@ static int rs_poll_cq(struct rsocket *rs) + case RS_OP_CTRL: + if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) { + rs->state = rs_disconnected; ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return 0; + } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { + if (rs->state & rs_writable) { + rs->state &= ~rs_readable; + } else { + rs->state = rs_disconnected; ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return 0; + } + } +@@ -1959,6 +1994,9 @@ static int rs_poll_cq(struct rsocket *rs) + rs->err = errno; + } + } ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return ret; + } + +@@ -1977,7 +2015,11 @@ static int rs_get_cq_event(struct rsocket *rs) + ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe); + rs->unack_cqe = 0; + } ++#ifdef DMM_RSOCKET ++ __sync_fetch_and_sub(&rs->cq_armed, 1); ++#else + rs->cq_armed = 0; ++#endif + } else if (!(errno == EAGAIN || errno == EINTR)) { + rs->state = rs_error; + } +@@ -2015,8 +2057,13 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs + } else if (nonblock) { + ret = ERR(EWOULDBLOCK); + } else if (!rs->cq_armed) { ++#ifdef DMM_RSOCKET ++ if (0 == ibv_req_notify_cq(rs->cm_id->recv_cq, 0)) ++ __sync_fetch_and_add(&rs->cq_armed, 1); ++#else + ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + rs->cq_armed = 1; ++#endif + } else { + rs_update_credits(rs); + fastlock_acquire(&rs->cq_wait_lock); +@@ -2116,11 +2163,19 @@ static void ds_poll_cqs(struct rsocket *rs) + qp = ds_next_qp(qp); + if (!rs->rqe_avail && rs->sqe_avail) { + rs->qp_list = qp; ++#ifdef DMM_RSOCKET ++ goto END; ++#else + return; ++#endif + } + cnt++; + } while (qp != rs->qp_list); + } while (cnt); ++#ifdef DMM_RSOCKET ++END: ++ rr_rs_notify_udp(rs); ++#endif + } + + static void ds_req_notify_cqs(struct rsocket *rs) +@@ -2150,7 +2205,7 @@ static int ds_get_cq_event(struct rsocket *rs) + if (!rs->cq_armed) + return 0; + +- ret = epoll_wait(rs->epfd, &event, 1, -1); ++ ret = GSAPI(epoll_wait)(rs->epfd, &event, 1, -1); + if (ret <= 0) + return ret; + +@@ -2607,7 +2662,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, + msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; +- ret = sendmsg(rs->udp_sock, &msg, flags); ++ ret = GSAPI(sendmsg)(rs->udp_sock, &msg, flags); + return ret > 0 ? ret - hdr.length : ret; + } + +@@ -2958,7 +3013,7 @@ check_cq: + fds.fd = rs->cm_id->channel->fd; + fds.events = events; + fds.revents = 0; +- poll(&fds, 1, 0); ++ GSAPI(poll)(&fds, 1, 0); + return fds.revents; + } + +@@ -2994,7 +3049,7 @@ static int rs_poll_check(struct pollfd *fds, nfds_t nfds) + if (rs) + fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); + else +- poll(&fds[i], 1, 0); ++ GSAPI(poll)(&fds[i], 1, 0); + + if (fds[i].revents) + cnt++; +@@ -3094,7 +3149,7 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) + if (ret) + break; + +- ret = poll(rfds, nfds, timeout); ++ ret = GSAPI(poll)(rfds, nfds, timeout); + if (ret <= 0) + break; + +@@ -3314,7 +3369,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) + rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); + return 0; + } else { +- return getpeername(rs->udp_sock, addr, addrlen); ++ return GSAPI(getpeername)(rs->udp_sock, addr, addrlen); + } + } + +@@ -3329,7 +3384,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) + rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); + return 0; + } else { +- return getsockname(rs->udp_sock, addr, addrlen); ++ return GSAPI(getsockname)(rs->udp_sock, addr, addrlen); + } + } + +@@ -3371,7 +3426,7 @@ int rsetsockopt(int socket, int level, int optname, + if (!rs) + return ERR(EBADF); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { +- ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); ++ ret = GSAPI(setsockopt)(rs->udp_sock, level, optname, optval, optlen); + if (ret) + return ret; + } +@@ -3985,7 +4040,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) + { + struct rs_svc_msg msg; + +- read(svc->sock[1], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_DGRAM: + msg.status = rs_svc_add_rs(svc, msg.rs); +@@ -4009,7 +4064,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) + break; + } + +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + } + + static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) +@@ -4137,7 +4192,7 @@ static void udp_svc_process_rs(struct rsocket *rs) + socklen_t addrlen = sizeof addr; + int len, ret; + +- ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); ++ ret = GSAPI(recvfrom)(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); + if (ret < DS_UDP_IPV4_HDR_LEN) + return; + +@@ -4184,7 +4239,7 @@ static void *udp_svc_run(void *arg) + ret = rs_svc_grow_sets(svc, 4); + if (ret) { + msg.status = ret; +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + +@@ -4195,7 +4250,7 @@ static void *udp_svc_run(void *arg) + for (i = 0; i <= svc->cnt; i++) + udp_svc_fds[i].revents = 0; + +- poll(udp_svc_fds, svc->cnt + 1, -1); ++ GSAPI(poll)(udp_svc_fds, svc->cnt + 1, -1); + if (udp_svc_fds[0].revents) + udp_svc_process_sock(svc); + +@@ -4222,7 +4277,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) + struct rs_svc_msg msg; + int i; + +- read(svc->sock[1], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_KEEPALIVE: + msg.status = rs_svc_add_rs(svc, msg.rs); +@@ -4253,7 +4308,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) + default: + break; + } +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + } + + /* +@@ -4282,7 +4337,7 @@ static void *tcp_svc_run(void *arg) + ret = rs_svc_grow_sets(svc, 16); + if (ret) { + msg.status = ret; +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + +@@ -4291,7 +4346,7 @@ static void *tcp_svc_run(void *arg) + fds.events = POLLIN; + timeout = -1; + do { +- poll(&fds, 1, timeout * 1000); ++ GSAPI(poll)(&fds, 1, timeout * 1000); + if (fds.revents) + tcp_svc_process_sock(svc); + +@@ -4311,3 +4366,8 @@ static void *tcp_svc_run(void *arg) + + return NULL; + } ++ ++#ifdef DMM_RSOCKET ++#include "rsocket_rs.c" ++#endif ++ diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c new file mode 100644 index 0000000..eda7fd7 --- /dev/null +++ b/stacks/rsocket/src/rsocket_adpt.c @@ -0,0 +1,218 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <dlfcn.h> + +#include "nstack_dmm_api.h" + +#include "rsocket_adpt.h" +#include "rdma/rsocket.h" + + +#define RR_EVFD(u64) ((int)((u64) >> 32)) +#define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF)) +#define RR_DATA(evfd, rsfd) ((((uint64_t)(evfd)) << 32) | (uint64_t)(uint32_t)(rsfd)) + +#define RR_EV_NUM 64 + +rsocket_var_t g_rr_var = {0}; + +rr_sapi_t g_sapi = {0}; + +int g_rr_log_level = -1; + + +int rr_notify_event(void *pdata, int events) +{ + int ret; + + ret = g_rr_var.event_cb(pdata, events); + + RR_DBG("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno); + + return ret; +} + +int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd) +{ + int ret; + struct epoll_event event; + event.events = events; + event.data.u64 = RR_DATA(evfd, rsfd); + ret = GSAPI(epoll_ctl)(g_rr_var.epfd, op, evfd, &event); + return ret; +} + +static void *rr_epoll_thread(void *arg) +{ + int i, ret, e; + struct epoll_event events[RR_EV_NUM]; + + while (1) { + ret = GSAPI(epoll_wait)(g_rr_var.epfd, events, RR_EV_NUM, 100); + e = errno; + + for (i = 0; i < ret; ++i) { + if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) { + (void)rr_ep_del(RR_EVFD(events[i].data.u64)); + } + } + + if (ret < 0) { + RR_STAT_INC(RR_STAT_EPW_ERR); + if (e == EINTR) { + RR_STAT_INC(RR_STAT_EPW_EINTR); + } else if ( e == ETIMEDOUT) { + RR_STAT_INC(RR_STAT_EPW_ETIMEOUT); + } else { + RR_ERR("epoll_wait()=%d:%d\n", ret, errno); + } + } + } + + return NULL; +} + + +static int rr_init_sapi() +{ + void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL); + if (!handle) { + RR_ERR("dlopen(libc.so.6):NULL\n"); + return -1; + } + +#define RR_SAPI(name) \ + GSAPI(name) = dlsym(handle, #name); \ + if (!GSAPI(name)) \ + RR_ERR("dlsym(" #name "):NULL\n"); +#include "rsocket_sapi.h" +#undef RR_SAPI + + return 0; +} + +static void rr_init_log() +{ + int level; + char *log; + + if (g_rr_log_level >= 0) + return; + + log = getenv("RSOCKET_LOG"); + if (!log || !log[0]) { + g_rr_log_level = RR_LOG_OFF; + return; + } + + level = atoi(log); + if (level < 0 || level > 99999) { + g_rr_log_level = RR_LOG_OFF; + return; + } + + g_rr_log_level = level; +} + +static int rsocket_init() +{ + int ret; + + rr_init_log(); + + if (rr_init_sapi()) { + return -1; + } + + g_rr_var.epfd = GSAPI(epoll_create)(1); + if (g_rr_var.epfd < 0) + return g_rr_var.epfd; + + ret = pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL); + if (ret) { + GSAPI(close)(g_rr_var.epfd); + g_rr_var.epfd = -1; + return ret; + } + (void)pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll"); + + return 0; +} + +int rsocket_exit() +{ + if (g_rr_var.epfd >= 0) { + (void)GSAPI(close)(g_rr_var.epfd); + g_rr_var.epfd = -1; + } + + return 0; +} + +unsigned int rsocket_ep_ctl(int epFD, int proFD, int ctl_ops, struct epoll_event * event, void *pdata) +{ + int ret; + struct eventpoll *ep; + unsigned int revents = 0; + + RR_DBG("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, pdata); + + switch (ctl_ops) { + case nstack_ep_triggle_add: + ret = rr_rs_ep_add(proFD, pdata, &revents); + if (ret) + return -1; + return revents; + + case nstack_ep_triggle_mod: + ret = rr_rs_ep_mod(proFD, pdata, &revents); + if (ret) + return -1; + return revents; + case nstack_ep_triggle_del: + return rr_rs_ep_del(proFD); + } + + return _err(EPERM); +} + +int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops) +{ + rr_init_log(); + +#define NSTACK_MK_DECL(ret, fn, args) \ + do { \ + proc_fun->socket_ops.pf##fn = dlsym(event_ops->handle, "r"#fn); \ + if (!proc_fun->socket_ops.pf##fn) \ + RR_LOG("socket API '" #fn "' not found\n"); \ + } while (0) +#include "declare_syscalls.h" +#undef NSTACK_MK_DECL + + proc_fun->extern_ops.module_init = rsocket_init; + proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl; + proc_fun->extern_ops.ep_getevt = NULL; + + g_rr_var.type = event_ops->type; + g_rr_var.event_cb = event_ops->event_cb; + + return 0; +} + diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h new file mode 100644 index 0000000..43ec6cf --- /dev/null +++ b/stacks/rsocket/src/rsocket_adpt.h @@ -0,0 +1,55 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _RSOCKET_ADPT_H_ +#define _RSOCKET_ADPT_H_ + +#include "indexer.h" +#include "rsocket_rdma.h" + +enum { + RR_STAT_EPW_ERR, + RR_STAT_EPW_EINTR, + RR_STAT_EPW_ETIMEOUT, + + RR_STAT_NUM +}; + +#define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num) +#define RR_STAT_SUB(id, num) __sync_sub_and_fetch(&g_rr_var.stat[(id)], num) +#define RR_STAT_INC(id) RR_STAT_ADD((id), 1) +#define RR_STAT_DEC(id) RR_STAT_SUB((id), 1) + +#define RSRDMA_EXIT 1 + +typedef struct rsocket_var { + pthread_t epoll_threadid; + + int epfd; + int type; + int (*event_cb) (void *pdata, int events); + + uint64_t stat[RR_STAT_NUM]; +} rsocket_var_t; + +extern rsocket_var_t g_rr_var; + + +int rr_rs_handle(int fd, uint32_t events); + + +#endif/* #ifndef _RSOCKET_ADPT_H_ */ + diff --git a/stacks/rsocket/src/rsocket_in.h b/stacks/rsocket/src/rsocket_in.h new file mode 100644 index 0000000..b4e1ae6 --- /dev/null +++ b/stacks/rsocket/src/rsocket_in.h @@ -0,0 +1,33 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _RSOCKET_IN_H_ +#define _RSOCKET_IN_H_ + +#include "rsocket_rdma.h" + +inline static void rr_rs_init(struct rsocket *rs); +inline static void rr_rs_dest(struct rsocket *rs); + +static inline void rr_rs_notify_tcp(struct rsocket *rs); +static inline void rr_rs_notify_udp(struct rsocket *rs); + +inline static void rr_rs_handle_tcp(struct rsocket *rs); +inline static int rr_rs_evfd(struct rsocket *rs); +inline static void rr_rs_connected(struct rsocket *rs); + +#endif/* #ifndef _RSOCKET_IN_H_ */ + diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h new file mode 100644 index 0000000..076b6c9 --- /dev/null +++ b/stacks/rsocket/src/rsocket_rdma.h @@ -0,0 +1,97 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _RSOCKET_RDMA_H_ +#define _RSOCKET_RDMA_H_ + +#include <stdint.h> +#include <unistd.h> +#include <stddef.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/epoll.h> +#include <netinet/in.h> +#include <pthread.h> +#include <poll.h> +#include <fcntl.h> +#include <time.h> + +enum { + RR_LOG_OFF = 0x00, + RR_LOG_ERR = 0x01, + RR_LOG_WRN = 0x02, + RR_LOG_LOG = 0x03, + RR_LOG_DBG = 0x04, +}; + +#define RR_OUT(level, name, fmt, arg...) do { \ + extern int g_rr_log_level; \ + if (g_rr_log_level >= level) { \ + (void)printf("#RSOCKET:%s# %s:%d> " fmt "", \ + name, __func__, __LINE__, ##arg); \ + } \ +} while (0) + +#define RR_ERR(fmt, arg...) RR_OUT(RR_LOG_ERR, "ERR", fmt, ##arg) +#define RR_WRN(fmt, arg...) RR_OUT(RR_LOG_WRN, "WRN", fmt, ##arg) +#define RR_LOG(fmt, arg...) RR_OUT(RR_LOG_LOG, "LOG", fmt, ##arg) +#if defined(RSOCKET_DEBUG) && RSOCKET_DEBUG > 0 +#define RR_DBG(fmt, arg...) RR_OUT(RR_LOG_DBG, "DBG", fmt, ##arg) +#else +#define RR_DBG(fmt, arg...) ((void)0) +#endif + + +#define _err(err_no) ((errno = (err_no)), -1) + + +int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent); +int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent); +int rr_rs_ep_del(int fd); + +uint32_t rr_rs_poll(int fd, uint32_t revents); + +int rr_notify_event(void *pdata, int events); + + +typedef struct rr_socket_api { + #define RR_SAPI(name) typeof(name) *n_##name;/* native api */ + #include "rsocket_sapi.h" + #undef RR_SAPI +} rr_sapi_t; + +extern rr_sapi_t g_sapi; + +#define GSAPI(name) g_sapi.n_##name + + +int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd); + +inline static int rr_ep_add(int evfd, int rsfd) +{ + return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, rsfd); +} + +inline static int rr_ep_del(int evfd) +{ + if (evfd < 0) + return 0; + return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0); +} + + +#endif/* #ifndef _RSOCKET_RDMA_H_ */ + diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c new file mode 100644 index 0000000..ca92c3d --- /dev/null +++ b/stacks/rsocket/src/rsocket_rs.c @@ -0,0 +1,354 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _RSOCKET_RS_C_ +#define _RSOCKET_RS_C_ + + +inline static void rr_rs_init(struct rsocket *rs) +{ + RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; +} + +inline static void rr_rs_dest(struct rsocket *rs) +{ + RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); + + if (rs->rr_epoll_ref) { + (void)rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; + } +} + +#ifndef POLL__RSOCKET_RS_H_ +#define POLL__RSOCKET_RS_H_ + +static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs) +{ + uint32_t events = 0; + if (rs->state & rs_connected) { + if (rs_have_rdata(rs)) + events |= EPOLLIN; + if (rs_can_send(rs)) + events |= EPOLLOUT; + } + if (rs->state & (rs_error | rs_connect_error)) + events |= EPOLLERR; + if (rs->state & rs_disconnected) + events |= EPOLLHUP; + return events; +} + +static inline uint32_t rr_rs_poll_udp(struct rsocket *rs) +{ + uint32_t events = 0; + if (rs_have_rdata(rs)) + events |= EPOLLIN; + if (ds_can_send(rs)) + events |= EPOLLOUT; + if (rs->state & rs_error) + events |= EPOLLERR; + return events; +} + +static inline uint32_t rr_rs_poll_both(struct rsocket *rs) +{ + if (rs->type == SOCK_STREAM) + return rr_rs_poll_tcp(rs); + + if (rs->type == SOCK_DGRAM) + return rr_rs_poll_udp(rs); + + return 0; +} + +uint32_t rr_rs_poll(int fd, uint32_t revents) +{ + struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + + if (!rs) + return 0; + + if (rs->state == rs_listening) + return revents; + + return rr_rs_poll_both(rs); +} + +#endif/* #ifndef POLL__RSOCKET_RS_H_ */ + + +static inline void rr_rs_notify_tcp(struct rsocket *rs) +{ + if (rs->rr_epoll_ref) { + uint32_t events = rr_rs_poll_tcp(rs); + if (events) + (void)rr_notify_event(rs->rr_epoll_pdata, events); + } +} + +static inline void rr_rs_notify_udp(struct rsocket *rs) +{ + if (rs->rr_epoll_ref) { + uint32_t events = rr_rs_poll_udp(rs); + if (events) + (void)rr_notify_event(rs->rr_epoll_pdata, events); + } +} + +#ifndef HANDLE__RSOCKET_RS_H_ +#define HANDLE__RSOCKET_RS_H_ + +inline static void rr_rs_handle_tcp(struct rsocket *rs) +{ + int ret; + + RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state); + + if (!(rs->state & (rs_connected | rs_opening))) + return; + + fastlock_acquire(&rs->cq_wait_lock); + ret = rs_get_cq_event(rs); + RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); + fastlock_release(&rs->cq_wait_lock); + + fastlock_acquire(&rs->cq_lock); + + if (rs->state & rs_connected) { + rs_update_credits(rs); + ret = rs_poll_cq(rs); + RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", + rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); + } + + if (rs->rr_epoll_ref && rs->cq_armed < 1) { + ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); + if (0 == ret) + __sync_fetch_and_add(&rs->cq_armed, 1); + } + + if (rs->state & rs_connected) { + ret = rs_poll_cq(rs); + RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); + rs_update_credits(rs); + } + + fastlock_release(&rs->cq_lock); + + RR_DBG("(%d)=\n", rs->index); +} + +inline static void rr_rs_handle_udp(struct rsocket *rs) +{ + fastlock_acquire(&rs->cq_wait_lock); + ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); + + fastlock_acquire(&rs->cq_lock); + ds_poll_cqs(rs); + if (rs->rr_epoll_ref && !rs->cq_armed) { + ds_req_notify_cqs(rs); + rs->cq_armed = 1; + } + fastlock_release(&rs->cq_lock); +} + +inline static void rr_rs_handle_rs(struct rsocket *rs) +{ + if (rs->state & rs_opening) { + int ret = rs_do_connect(rs); + RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); + return; + } + + if (rs->type == SOCK_STREAM) { + rr_rs_handle_tcp(rs); + } + + if (rs->type == SOCK_DGRAM) { + rr_rs_handle_udp(rs); + } +} + +int rr_rs_handle(int fd, uint32_t events) +{ + struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + + RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); + + if (!rs) + return _err(EBADF); + + if (rs->state == rs_listening) { + if (events & EPOLLIN) { + (void)rr_notify_event(rs->rr_epoll_pdata, events); + } + return 0; + } + + rr_rs_handle_rs(rs); + + return 0; +} + + +#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */ + +#ifndef ADPT__RSOCKET_RS_H_ +#define ADPT__RSOCKET_RS_H_ + +inline static int rr_rs_evfd(struct rsocket *rs) +{ + if (rs->type == SOCK_STREAM) { + if (rs->state >= rs_connected) + return rs->cm_id->recv_cq_channel->fd; + else + return rs->cm_id->channel->fd; + } else { + return rs->epfd; + } + + return -1; +} + +int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent) +{ + int ref; + struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + RR_DBG("(%d(%p),)\n", fd, rs); + if (!rs) + return _err(EBADF); + + ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1); + if (1 == ref) { + rs->rr_epoll_fd = rr_rs_evfd(rs); + (void)rr_ep_add(rs->rr_epoll_fd, rs->index); + } + + (void)rr_rs_handle_rs(rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + + rs->rr_epoll_pdata = pdata; + + RR_DBG("*revent=0x%x\n", *revent); + return 0; +} + +int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent) +{ + struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + RR_DBG("(%d(%p),)\n", fd, rs); + if (!rs) + return _err(EBADF); + + if (rs->rr_epoll_ref <= 0) + return _err(ENOENT); + + (void)rr_rs_handle_rs(rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + + rs->rr_epoll_pdata = pdata; + + RR_DBG("*revent=0x%x\n", *revent); + return 0; +} + +int rr_rs_ep_del(int fd) +{ + int ref; + struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + RR_DBG("(%d(%p))\n", fd, rs); + + if (!rs) + return _err(EBADF); + + ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1); + if (0 == ref) { + (void)rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_fd = -1; + } + + return 0; +} + +#endif/* #ifndef ADPT__RSOCKET_RS_H_ */ + +inline static void rr_rs_connected(struct rsocket *rs) +{ + RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, + rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state); + + if (!(rs->state & rs_connected)) { + rr_rs_notify_tcp(rs); + return; + } + + if (rs->rr_epoll_ref) { + int evfd = rr_rs_evfd(rs); + + if (evfd != rs->rr_epoll_fd) { + (void)rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_fd = evfd; + (void)rr_ep_add(evfd, rs->index); + } + + rr_rs_handle_tcp(rs); + } +} + +int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int ret, fd; + struct rsocket *rs; + + RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); + fd = raccept(socket, addr, addrlen); + RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); + if (fd < 0) + return fd; + + rs = (struct rsocket*)idm_lookup(&idm, fd); + if (!rs) { + RR_ERR("panic\n"); + return -1; + } + + if (flags & SOCK_NONBLOCK) { + if (0 == (rs->fd_flags & O_NONBLOCK)) { + RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL)); + ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (0 == ret) + rs->fd_flags |= O_NONBLOCK; + } + } + + if (flags & SOCK_CLOEXEC) { + RR_LOG("ignore flag:SOCK_CLOEXEC\n"); + } + + return fd; +} + + +#endif/* #ifndef _RSOCKET_RS_C_ */ + diff --git a/stacks/rsocket/src/rsocket_sapi.h b/stacks/rsocket/src/rsocket_sapi.h new file mode 100644 index 0000000..a9ca932 --- /dev/null +++ b/stacks/rsocket/src/rsocket_sapi.h @@ -0,0 +1,38 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +RR_SAPI(socket) +RR_SAPI(close) +RR_SAPI(bind) +RR_SAPI(connect) +RR_SAPI(getpeername) +RR_SAPI(getsockname) + +RR_SAPI(fcntl) +RR_SAPI(setsockopt) +RR_SAPI(getsockopt) + +RR_SAPI(read) +RR_SAPI(write) + +RR_SAPI(sendmsg) +RR_SAPI(recvfrom) + +RR_SAPI(poll) + +RR_SAPI(epoll_create) +RR_SAPI(epoll_ctl) +RR_SAPI(epoll_wait) |