aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket')
-rw-r--r--stacks/rsocket/CMakeLists.txt61
-rw-r--r--stacks/rsocket/config/module_config.json30
-rw-r--r--stacks/rsocket/config/rd_config.json10
-rw-r--r--stacks/rsocket/rsocket.patch751
-rw-r--r--stacks/rsocket/src/rsocket_adpt.c218
-rw-r--r--stacks/rsocket/src/rsocket_adpt.h55
-rw-r--r--stacks/rsocket/src/rsocket_in.h33
-rw-r--r--stacks/rsocket/src/rsocket_rdma.h97
-rw-r--r--stacks/rsocket/src/rsocket_rs.c354
-rw-r--r--stacks/rsocket/src/rsocket_sapi.h38
10 files changed, 1647 insertions, 0 deletions
diff --git a/stacks/rsocket/CMakeLists.txt b/stacks/rsocket/CMakeLists.txt
new file mode 100644
index 0000000..c1b2f18
--- /dev/null
+++ b/stacks/rsocket/CMakeLists.txt
@@ -0,0 +1,61 @@
+#########################################################################
+#
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#########################################################################
+
+
+SET(rdmacm_dir librdmacm-1.1.0)
+SET(dmm_inc_dir ${DMM_REL_INC_DIR})
+SET(RSOCKET_DEBUG 0)
+
+########################
+
+SET(rdmacm_url https://github.com/ofiwg/librdmacm/archive/v1.1.0.tar.gz)
+
+INCLUDE(ExternalProject)
+ExternalProject_Add(
+ rdmacm
+ URL ${rdmacm_url}
+ SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/${rdmacm_dir}
+ DOWNLOAD_DIR ${CMAKE_CURRENT_LIST_DIR}
+ PATCH_COMMAND patch -p1 -i ../rsocket.patch
+ CONFIGURE_COMMAND ./autogen.sh && ./configure dmm_inc_dir=${DMM_REL_INC_DIR} RSOCKET_DEBUG=${RSOCKET_DEBUG}
+ BUILD_IN_SOURCE 1
+ BUILD_COMMAND make
+ INSTALL_COMMAND cp -f libdmm_rdmacm.a ${LIB_PATH_STATIC}/
+ DEPENDS DPDK
+)
+set_target_properties(rdmacm PROPERTIES EXCLUDE_FROM_ALL TRUE)
+
+########################
+
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2 -g -fPIC -m64 -pthread")
+
+ADD_LIBRARY(dmm_rsocket SHARED src/rsocket_adpt.c)
+
+ADD_DEFINITIONS(-D_GNU_SOURCE -DRSOCKET_DEBUG=${RSOCKET_DEBUG})
+
+INCLUDE_DIRECTORIES(${DMM_REL_INC_DIR})
+INCLUDE_DIRECTORIES(./src ${rdmacm_dir} ${rdmacm_dir}/include ${rdmacm_dir}/src)
+
+TARGET_LINK_LIBRARIES(dmm_rsocket
+ -Wl,--whole-archive
+ ${LIB_PATH_STATIC}/libdmm_rdmacm.a
+ -Wl,--no-whole-archive
+ ibverbs pthread dl rt
+)
+
+ADD_DEPENDENCIES(dmm_rsocket rdmacm DPDK)
+
+set_target_properties(dmm_rsocket PROPERTIES EXCLUDE_FROM_ALL TRUE)
diff --git a/stacks/rsocket/config/module_config.json b/stacks/rsocket/config/module_config.json
new file mode 100644
index 0000000..2df82cd
--- /dev/null
+++ b/stacks/rsocket/config/module_config.json
@@ -0,0 +1,30 @@
+{
+ "default_stack_name": "kernel", /*when rd can't be find maybe choose the defualt one*/
+ "module_list": [
+ {
+ "stack_name": "kernel", /*stack name*/
+ "function_name": "kernel_stack_register", /*function name*/
+ "libname": "./", /*library name, if loadtype is static, this maybe
+ null, else must give a library name*/
+ "loadtype": "static", /*library load type: static or dynamic*/
+ "deploytype": "1", /*deploy model type:model type1, model type2,
+ model type3. Indicating single or multi process
+ deployment. Used during shared memory initialization.*/
+ "maxfd": "1024", /*the max fd supported*/
+ "minfd": "0", /*the min fd supported*/
+ "priorty": "1", /*priorty when executing, reserv*/
+ "stackid": "0", /*stack id, this must be ordered and not be repeated*/
+ },
+ {
+ "stack_name": "rsocket",
+ "function_name": "rsocket_stack_register",
+ "libname": "libdmm_rsocket.so",
+ "loadtype": "dynmic",
+ "deploytype": "1",
+ "maxfd": "1024",
+ "minfd": "0",
+ "priorty": "1",
+ "stackid": "1",
+ },
+ ]
+}
diff --git a/stacks/rsocket/config/rd_config.json b/stacks/rsocket/config/rd_config.json
new file mode 100644
index 0000000..ea1fc7b
--- /dev/null
+++ b/stacks/rsocket/config/rd_config.json
@@ -0,0 +1,10 @@
+{
+ "ip_route": [
+ {
+ "subnet": "192.168.1.1/24",
+ "type": "nstack-rsocket",
+ },
+ ],
+ "prot_route": [
+ ],
+}
diff --git a/stacks/rsocket/rsocket.patch b/stacks/rsocket/rsocket.patch
new file mode 100644
index 0000000..430f178
--- /dev/null
+++ b/stacks/rsocket/rsocket.patch
@@ -0,0 +1,751 @@
+diff --git a/Makefile.am b/Makefile.am
+index bf72134..cb02f56 100644
+--- a/Makefile.am
++++ b/Makefile.am
+@@ -22,6 +22,12 @@ src_librdmacm_la_LDFLAGS = -version-info 1 -export-dynamic \
+ $(librdmacm_version_script)
+ src_librdmacm_la_DEPENDENCIES = $(srcdir)/src/librdmacm.map
+
++noinst_LIBRARIES = libdmm_rdmacm.a
++libdmm_rdmacm_a_CFLAGS = $(src_librdmacm_la_CFLAGS) -O2 -fPIC -m64 \
++ -I@dmm_inc_dir@ -I$(srcdir)/../src -DDMM_RSOCKET=1 -DRSOCKET_DEBUG=@RSOCKET_DEBUG@
++libdmm_rdmacm_a_LDFLAGS =
++libdmm_rdmacm_a_SOURCES = $(src_librdmacm_la_SOURCES)
++
+ src_librspreload_la_SOURCES = src/preload.c src/indexer.c
+ src_librspreload_la_LDFLAGS = -version-info 1 -export-dynamic
+ src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la
+diff --git a/configure.ac b/configure.ac
+index 4a43995..98601ea 100644
+--- a/configure.ac
++++ b/configure.ac
+@@ -104,5 +104,12 @@ if test "x$rdmadir" = "x"; then
+ AC_SUBST(rdmadir, rdma)
+ fi
+
++AC_ARG_VAR(dmm_inc_dir, [Directory for dmm include files])
++if test "x$dmm_inc_dir" = "x"; then
++ AC_MSG_ERROR([must set dmm_inc_dir])
++fi
++
++AC_ARG_VAR(RSOCKET_DEBUG, [rsocket debug flag 0 or 1])
++
+ AC_CONFIG_FILES([Makefile librdmacm.spec])
+ AC_OUTPUT
+diff --git a/src/cma.c b/src/cma.c
+index a89e663..071222d 100644
+--- a/src/cma.c
++++ b/src/cma.c
+@@ -60,6 +60,12 @@
+ #include <rdma/rdma_verbs.h>
+ #include <infiniband/ib.h>
+
++#ifdef DMM_RSOCKET
++#include "rsocket_rdma.h"
++#else
++#define GSAPI(name) name
++#endif
++
+ #define CMA_INIT_CMD(req, req_size, op) \
+ do { \
+ memset(req, 0, req_size); \
+@@ -564,7 +570,7 @@ static int rdma_create_id2(struct rdma_event_channel *channel,
+ cmd.ps = ps;
+ cmd.qp_type = qp_type;
+
+- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ goto err;
+
+@@ -599,7 +605,7 @@ static int ucma_destroy_kern_id(int fd, uint32_t handle)
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, DESTROY_ID, &resp, sizeof resp);
+ cmd.id = handle;
+
+- ret = write(fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -659,7 +665,7 @@ static int ucma_query_addr(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_ADDR;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -692,7 +698,7 @@ static int ucma_query_gid(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_GID;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -753,7 +759,7 @@ static int ucma_query_path(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_PATH;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -784,7 +790,7 @@ static int ucma_query_route(struct rdma_cm_id *id)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -835,7 +841,7 @@ static int rdma_bind_addr2(struct rdma_cm_id *id, struct sockaddr *addr,
+ cmd.addr_size = addrlen;
+ memcpy(&cmd.addr, addr, addrlen);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -863,7 +869,7 @@ int rdma_bind_addr(struct rdma_cm_id *id, struct sockaddr *addr)
+ cmd.id = id_priv->handle;
+ memcpy(&cmd.addr, addr, addrlen);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -916,7 +922,7 @@ static int rdma_resolve_addr2(struct rdma_cm_id *id, struct sockaddr *src_addr,
+ cmd.dst_size = dst_len;
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -951,7 +957,7 @@ int rdma_resolve_addr(struct rdma_cm_id *id, struct sockaddr *src_addr,
+ memcpy(&cmd.dst_addr, dst_addr, dst_len);
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1003,7 +1009,7 @@ int rdma_resolve_route(struct rdma_cm_id *id, int timeout_ms)
+ cmd.id = id_priv->handle;
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1029,7 +1035,7 @@ static int rdma_init_qp_attr(struct rdma_cm_id *id, struct ibv_qp_attr *qp_attr,
+ cmd.id = id_priv->handle;
+ cmd.qp_state = qp_attr->qp_state;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1530,7 +1536,7 @@ int rdma_connect(struct rdma_cm_id *id, struct rdma_conn_param *conn_param)
+ conn_param, 0, 0);
+ }
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1553,7 +1559,7 @@ int rdma_listen(struct rdma_cm_id *id, int backlog)
+ cmd.id = id_priv->handle;
+ cmd.backlog = backlog;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1656,7 +1662,7 @@ int rdma_accept(struct rdma_cm_id *id, struct rdma_conn_param *conn_param)
+ conn_param, conn_param->qp_num,
+ conn_param->srq);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ucma_modify_qp_err(id);
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+@@ -1684,7 +1690,7 @@ int rdma_reject(struct rdma_cm_id *id, const void *private_data,
+ cmd.private_data_len = private_data_len;
+ }
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1702,7 +1708,7 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+ cmd.event = event;
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1735,7 +1741,7 @@ int rdma_disconnect(struct rdma_cm_id *id)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1778,7 +1784,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr,
+ cmd.uid = (uintptr_t) mc;
+ cmd.reserved = 0;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err2;
+@@ -1791,7 +1797,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr,
+ memcpy(&cmd.addr, addr, addrlen);
+ cmd.uid = (uintptr_t) mc;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err2;
+@@ -1857,7 +1863,7 @@ int rdma_leave_multicast(struct rdma_cm_id *id, struct sockaddr *addr)
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, LEAVE_MCAST, &resp, sizeof resp);
+ cmd.id = mc->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto free;
+@@ -2019,7 +2025,7 @@ static int ucma_process_conn_resp(struct cma_id_private *id_priv)
+ CMA_INIT_CMD(&cmd, sizeof cmd, ACCEPT);
+ cmd.id = id_priv->handle;
+
+- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err;
+@@ -2103,7 +2109,7 @@ int rdma_get_cm_event(struct rdma_event_channel *channel,
+ retry:
+ memset(evt, 0, sizeof(*evt));
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, GET_EVENT, &resp, sizeof resp);
+- ret = write(channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ free(evt);
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+@@ -2274,7 +2280,7 @@ int rdma_set_option(struct rdma_cm_id *id, int level, int optname,
+ cmd.optname = optname;
+ cmd.optlen = optlen;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -2302,7 +2308,7 @@ int rdma_migrate_id(struct rdma_cm_id *id, struct rdma_event_channel *channel)
+ cmd.id = id_priv->handle;
+ cmd.fd = id->channel->fd;
+
+- ret = write(channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ if (sync)
+ rdma_destroy_event_channel(channel);
+diff --git a/src/rsocket.c b/src/rsocket.c
+index c4f1b57..faa3d3a 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -382,6 +382,11 @@ struct rsocket {
+ dlist_entry iomap_queue;
+ int iomap_pending;
+ int unack_cqe;
++#ifdef DMM_RSOCKET
++ int rr_epoll_ref;
++ int rr_epoll_fd;
++ void *rr_epoll_pdata;
++#endif
+ };
+
+ #define DS_UDP_TAG 0x55555555
+@@ -404,6 +409,12 @@ struct ds_udp_header {
+
+ #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
+
++#ifdef DMM_RSOCKET
++#include "rsocket_in.h"
++#else
++#define GSAPI(name) name
++#endif
++
+ static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
+ {
+ if (!rs->qp_list)
+@@ -444,16 +455,16 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
+ msg.cmd = cmd;
+ msg.status = EINVAL;
+ msg.rs = rs;
+- write(svc->sock[0], &msg, sizeof msg);
+- read(svc->sock[0], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[0], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[0], &msg, sizeof msg);
+ ret = rdma_seterrno(msg.status);
+ if (svc->cnt)
+ goto unlock;
+
+ pthread_join(svc->id, NULL);
+ closepair:
+- close(svc->sock[0]);
+- close(svc->sock[1]);
++ GSAPI(close)(svc->sock[0]);
++ GSAPI(close)(svc->sock[1]);
+ unlock:
+ pthread_mutex_unlock(&mut);
+ return ret;
+@@ -607,6 +618,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
+ fastlock_init(&rs->map_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
++#ifdef DMM_RSOCKET
++ rr_rs_init(rs);
++#endif
+ return rs;
+ }
+
+@@ -617,16 +631,16 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg)
+
+ if (rs->type == SOCK_STREAM) {
+ if (rs->cm_id->recv_cq_channel)
+- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+
+ if (!ret && rs->state < rs_connected)
+- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, arg);
+ } else {
+- ret = fcntl(rs->epfd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->epfd, F_SETFL, arg);
+ if (!ret && rs->qp_list) {
+ qp = rs->qp_list;
+ do {
+- ret = fcntl(qp->cm_id->recv_cq_channel->fd,
++ ret = GSAPI(fcntl)(qp->cm_id->recv_cq_channel->fd,
+ F_SETFL, arg);
+ qp = ds_next_qp(qp);
+ } while (qp != rs->qp_list && !ret);
+@@ -767,7 +781,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
+ goto err1;
+
+ if (rs->fd_flags & O_NONBLOCK) {
+- if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
++ if (GSAPI(fcntl)(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
+ goto err2;
+ }
+
+@@ -918,7 +932,7 @@ static void ds_free_qp(struct ds_qp *qp)
+ if (qp->cm_id) {
+ if (qp->cm_id->qp) {
+ tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
+- epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
++ GSAPI(epoll_ctl)(qp->rs->epfd, EPOLL_CTL_DEL,
+ qp->cm_id->recv_cq_channel->fd, NULL);
+ rdma_destroy_qp(qp->cm_id);
+ }
+@@ -932,8 +946,12 @@ static void ds_free(struct rsocket *rs)
+ {
+ struct ds_qp *qp;
+
++#ifdef DMM_RSOCKET
++ rr_rs_dest(rs);
++#endif
++
+ if (rs->udp_sock >= 0)
+- close(rs->udp_sock);
++ GSAPI(close)(rs->udp_sock);
+
+ if (rs->index >= 0)
+ rs_remove(rs);
+@@ -947,7 +965,7 @@ static void ds_free(struct rsocket *rs)
+ }
+
+ if (rs->epfd >= 0)
+- close(rs->epfd);
++ GSAPI(close)(rs->epfd);
+
+ if (rs->sbuf)
+ free(rs->sbuf);
+@@ -968,6 +986,10 @@ static void rs_free(struct rsocket *rs)
+ return;
+ }
+
++#ifdef DMM_RSOCKET
++ rr_rs_dest(rs);
++#endif
++
+ if (rs->rmsg)
+ free(rs->rmsg);
+
+@@ -1059,11 +1081,11 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+
+ static int ds_init(struct rsocket *rs, int domain)
+ {
+- rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
++ rs->udp_sock = GSAPI(socket)(domain, SOCK_DGRAM, 0);
+ if (rs->udp_sock < 0)
+ return rs->udp_sock;
+
+- rs->epfd = epoll_create(2);
++ rs->epfd = GSAPI(epoll_create)(2);
+ if (rs->epfd < 0)
+ return rs->epfd;
+
+@@ -1164,7 +1186,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ if (ret)
+ return ret;
+ }
+- ret = bind(rs->udp_sock, addr, addrlen);
++ ret = GSAPI(bind)(rs->udp_sock, addr, addrlen);
+ }
+ return ret;
+ }
+@@ -1229,8 +1251,11 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ goto err;
+ }
+
+- if (rs->fd_flags & O_NONBLOCK)
+- fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
++ if (rs->fd_flags & O_NONBLOCK) {
++ ret = GSAPI(fcntl)(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
++ if (0 == ret)
++ new_rs->fd_flags |= O_NONBLOCK;
++ }
+
+ ret = rs_create_ep(new_rs);
+ if (ret)
+@@ -1354,7 +1379,7 @@ connected:
+ break;
+ case rs_accepting:
+ if (!(rs->fd_flags & O_NONBLOCK))
+- fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
++ GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, 0);
+
+ ret = ucma_complete(rs->cm_id);
+ if (ret)
+@@ -1375,6 +1400,10 @@ connected:
+ rs->err = errno;
+ }
+ }
++#ifdef DMM_RSOCKET
++ rr_rs_connected(rs);
++#endif
++
+ return ret;
+ }
+
+@@ -1397,24 +1426,24 @@ static int ds_get_src_addr(struct rsocket *rs,
+ uint16_t port;
+
+ *src_len = sizeof(*src_addr);
+- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
++ ret = GSAPI(getsockname)(rs->udp_sock, &src_addr->sa, src_len);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ port = src_addr->sin.sin_port;
+- sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
++ sock = GSAPI(socket)(dest_addr->sa_family, SOCK_DGRAM, 0);
+ if (sock < 0)
+ return sock;
+
+- ret = connect(sock, dest_addr, dest_len);
++ ret = GSAPI(connect)(sock, dest_addr, dest_len);
+ if (ret)
+ goto out;
+
+ *src_len = sizeof(*src_addr);
+- ret = getsockname(sock, &src_addr->sa, src_len);
++ ret = GSAPI(getsockname)(sock, &src_addr->sa, src_len);
+ src_addr->sin.sin_port = port;
+ out:
+- close(sock);
++ GSAPI(close)(sock);
+ return ret;
+ }
+
+@@ -1512,7 +1541,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+
+ event.events = EPOLLIN;
+ event.data.ptr = qp;
+- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
++ ret = GSAPI(epoll_ctl)(rs->epfd, EPOLL_CTL_ADD,
+ qp->cm_id->recv_cq_channel->fd, &event);
+ if (ret)
+ goto err;
+@@ -1609,7 +1638,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ }
+
+ fastlock_acquire(&rs->slock);
+- ret = connect(rs->udp_sock, addr, addrlen);
++ ret = GSAPI(connect)(rs->udp_sock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ fastlock_release(&rs->slock);
+@@ -1903,12 +1932,18 @@ static int rs_poll_cq(struct rsocket *rs)
+ case RS_OP_CTRL:
+ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
+ rs->state = rs_disconnected;
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return 0;
+ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
+ if (rs->state & rs_writable) {
+ rs->state &= ~rs_readable;
+ } else {
+ rs->state = rs_disconnected;
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return 0;
+ }
+ }
+@@ -1959,6 +1994,9 @@ static int rs_poll_cq(struct rsocket *rs)
+ rs->err = errno;
+ }
+ }
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return ret;
+ }
+
+@@ -1977,7 +2015,11 @@ static int rs_get_cq_event(struct rsocket *rs)
+ ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
+ rs->unack_cqe = 0;
+ }
++#ifdef DMM_RSOCKET
++ __sync_fetch_and_sub(&rs->cq_armed, 1);
++#else
+ rs->cq_armed = 0;
++#endif
+ } else if (!(errno == EAGAIN || errno == EINTR)) {
+ rs->state = rs_error;
+ }
+@@ -2015,8 +2057,13 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+ } else if (nonblock) {
+ ret = ERR(EWOULDBLOCK);
+ } else if (!rs->cq_armed) {
++#ifdef DMM_RSOCKET
++ if (0 == ibv_req_notify_cq(rs->cm_id->recv_cq, 0))
++ __sync_fetch_and_add(&rs->cq_armed, 1);
++#else
+ ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ rs->cq_armed = 1;
++#endif
+ } else {
+ rs_update_credits(rs);
+ fastlock_acquire(&rs->cq_wait_lock);
+@@ -2116,11 +2163,19 @@ static void ds_poll_cqs(struct rsocket *rs)
+ qp = ds_next_qp(qp);
+ if (!rs->rqe_avail && rs->sqe_avail) {
+ rs->qp_list = qp;
++#ifdef DMM_RSOCKET
++ goto END;
++#else
+ return;
++#endif
+ }
+ cnt++;
+ } while (qp != rs->qp_list);
+ } while (cnt);
++#ifdef DMM_RSOCKET
++END:
++ rr_rs_notify_udp(rs);
++#endif
+ }
+
+ static void ds_req_notify_cqs(struct rsocket *rs)
+@@ -2150,7 +2205,7 @@ static int ds_get_cq_event(struct rsocket *rs)
+ if (!rs->cq_armed)
+ return 0;
+
+- ret = epoll_wait(rs->epfd, &event, 1, -1);
++ ret = GSAPI(epoll_wait)(rs->epfd, &event, 1, -1);
+ if (ret <= 0)
+ return ret;
+
+@@ -2607,7 +2662,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+ msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
+- ret = sendmsg(rs->udp_sock, &msg, flags);
++ ret = GSAPI(sendmsg)(rs->udp_sock, &msg, flags);
+ return ret > 0 ? ret - hdr.length : ret;
+ }
+
+@@ -2958,7 +3013,7 @@ check_cq:
+ fds.fd = rs->cm_id->channel->fd;
+ fds.events = events;
+ fds.revents = 0;
+- poll(&fds, 1, 0);
++ GSAPI(poll)(&fds, 1, 0);
+ return fds.revents;
+ }
+
+@@ -2994,7 +3049,7 @@ static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
+ if (rs)
+ fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
+ else
+- poll(&fds[i], 1, 0);
++ GSAPI(poll)(&fds[i], 1, 0);
+
+ if (fds[i].revents)
+ cnt++;
+@@ -3094,7 +3149,7 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
+ if (ret)
+ break;
+
+- ret = poll(rfds, nfds, timeout);
++ ret = GSAPI(poll)(rfds, nfds, timeout);
+ if (ret <= 0)
+ break;
+
+@@ -3314,7 +3369,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+- return getpeername(rs->udp_sock, addr, addrlen);
++ return GSAPI(getpeername)(rs->udp_sock, addr, addrlen);
+ }
+ }
+
+@@ -3329,7 +3384,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+- return getsockname(rs->udp_sock, addr, addrlen);
++ return GSAPI(getsockname)(rs->udp_sock, addr, addrlen);
+ }
+ }
+
+@@ -3371,7 +3426,7 @@ int rsetsockopt(int socket, int level, int optname,
+ if (!rs)
+ return ERR(EBADF);
+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
+- ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
++ ret = GSAPI(setsockopt)(rs->udp_sock, level, optname, optval, optlen);
+ if (ret)
+ return ret;
+ }
+@@ -3985,7 +4040,7 @@ static void udp_svc_process_sock(struct rs_svc *svc)
+ {
+ struct rs_svc_msg msg;
+
+- read(svc->sock[1], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_DGRAM:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+@@ -4009,7 +4064,7 @@ static void udp_svc_process_sock(struct rs_svc *svc)
+ break;
+ }
+
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ }
+
+ static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+@@ -4137,7 +4192,7 @@ static void udp_svc_process_rs(struct rsocket *rs)
+ socklen_t addrlen = sizeof addr;
+ int len, ret;
+
+- ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
++ ret = GSAPI(recvfrom)(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+@@ -4184,7 +4239,7 @@ static void *udp_svc_run(void *arg)
+ ret = rs_svc_grow_sets(svc, 4);
+ if (ret) {
+ msg.status = ret;
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+@@ -4195,7 +4250,7 @@ static void *udp_svc_run(void *arg)
+ for (i = 0; i <= svc->cnt; i++)
+ udp_svc_fds[i].revents = 0;
+
+- poll(udp_svc_fds, svc->cnt + 1, -1);
++ GSAPI(poll)(udp_svc_fds, svc->cnt + 1, -1);
+ if (udp_svc_fds[0].revents)
+ udp_svc_process_sock(svc);
+
+@@ -4222,7 +4277,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc)
+ struct rs_svc_msg msg;
+ int i;
+
+- read(svc->sock[1], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_KEEPALIVE:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+@@ -4253,7 +4308,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc)
+ default:
+ break;
+ }
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ }
+
+ /*
+@@ -4282,7 +4337,7 @@ static void *tcp_svc_run(void *arg)
+ ret = rs_svc_grow_sets(svc, 16);
+ if (ret) {
+ msg.status = ret;
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+@@ -4291,7 +4346,7 @@ static void *tcp_svc_run(void *arg)
+ fds.events = POLLIN;
+ timeout = -1;
+ do {
+- poll(&fds, 1, timeout * 1000);
++ GSAPI(poll)(&fds, 1, timeout * 1000);
+ if (fds.revents)
+ tcp_svc_process_sock(svc);
+
+@@ -4311,3 +4366,8 @@ static void *tcp_svc_run(void *arg)
+
+ return NULL;
+ }
++
++#ifdef DMM_RSOCKET
++#include "rsocket_rs.c"
++#endif
++
diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c
new file mode 100644
index 0000000..eda7fd7
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_adpt.c
@@ -0,0 +1,218 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <dlfcn.h>
+
+#include "nstack_dmm_api.h"
+
+#include "rsocket_adpt.h"
+#include "rdma/rsocket.h"
+
+
+#define RR_EVFD(u64) ((int)((u64) >> 32))
+#define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF))
+#define RR_DATA(evfd, rsfd) ((((uint64_t)(evfd)) << 32) | (uint64_t)(uint32_t)(rsfd))
+
+#define RR_EV_NUM 64
+
+rsocket_var_t g_rr_var = {0};
+
+rr_sapi_t g_sapi = {0};
+
+int g_rr_log_level = -1;
+
+
+int rr_notify_event(void *pdata, int events)
+{
+ int ret;
+
+ ret = g_rr_var.event_cb(pdata, events);
+
+ RR_DBG("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno);
+
+ return ret;
+}
+
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd)
+{
+ int ret;
+ struct epoll_event event;
+ event.events = events;
+ event.data.u64 = RR_DATA(evfd, rsfd);
+ ret = GSAPI(epoll_ctl)(g_rr_var.epfd, op, evfd, &event);
+ return ret;
+}
+
+static void *rr_epoll_thread(void *arg)
+{
+ int i, ret, e;
+ struct epoll_event events[RR_EV_NUM];
+
+ while (1) {
+ ret = GSAPI(epoll_wait)(g_rr_var.epfd, events, RR_EV_NUM, 100);
+ e = errno;
+
+ for (i = 0; i < ret; ++i) {
+ if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) {
+ (void)rr_ep_del(RR_EVFD(events[i].data.u64));
+ }
+ }
+
+ if (ret < 0) {
+ RR_STAT_INC(RR_STAT_EPW_ERR);
+ if (e == EINTR) {
+ RR_STAT_INC(RR_STAT_EPW_EINTR);
+ } else if ( e == ETIMEDOUT) {
+ RR_STAT_INC(RR_STAT_EPW_ETIMEOUT);
+ } else {
+ RR_ERR("epoll_wait()=%d:%d\n", ret, errno);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+
+static int rr_init_sapi()
+{
+ void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
+ if (!handle) {
+ RR_ERR("dlopen(libc.so.6):NULL\n");
+ return -1;
+ }
+
+#define RR_SAPI(name) \
+ GSAPI(name) = dlsym(handle, #name); \
+ if (!GSAPI(name)) \
+ RR_ERR("dlsym(" #name "):NULL\n");
+#include "rsocket_sapi.h"
+#undef RR_SAPI
+
+ return 0;
+}
+
+static void rr_init_log()
+{
+ int level;
+ char *log;
+
+ if (g_rr_log_level >= 0)
+ return;
+
+ log = getenv("RSOCKET_LOG");
+ if (!log || !log[0]) {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
+ }
+
+ level = atoi(log);
+ if (level < 0 || level > 99999) {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
+ }
+
+ g_rr_log_level = level;
+}
+
+static int rsocket_init()
+{
+ int ret;
+
+ rr_init_log();
+
+ if (rr_init_sapi()) {
+ return -1;
+ }
+
+ g_rr_var.epfd = GSAPI(epoll_create)(1);
+ if (g_rr_var.epfd < 0)
+ return g_rr_var.epfd;
+
+ ret = pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
+ if (ret) {
+ GSAPI(close)(g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ return ret;
+ }
+ (void)pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll");
+
+ return 0;
+}
+
+int rsocket_exit()
+{
+ if (g_rr_var.epfd >= 0) {
+ (void)GSAPI(close)(g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ }
+
+ return 0;
+}
+
+unsigned int rsocket_ep_ctl(int epFD, int proFD, int ctl_ops, struct epoll_event * event, void *pdata)
+{
+ int ret;
+ struct eventpoll *ep;
+ unsigned int revents = 0;
+
+ RR_DBG("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, pdata);
+
+ switch (ctl_ops) {
+ case nstack_ep_triggle_add:
+ ret = rr_rs_ep_add(proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
+
+ case nstack_ep_triggle_mod:
+ ret = rr_rs_ep_mod(proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
+ case nstack_ep_triggle_del:
+ return rr_rs_ep_del(proFD);
+ }
+
+ return _err(EPERM);
+}
+
+int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops)
+{
+ rr_init_log();
+
+#define NSTACK_MK_DECL(ret, fn, args) \
+ do { \
+ proc_fun->socket_ops.pf##fn = dlsym(event_ops->handle, "r"#fn); \
+ if (!proc_fun->socket_ops.pf##fn) \
+ RR_LOG("socket API '" #fn "' not found\n"); \
+ } while (0)
+#include "declare_syscalls.h"
+#undef NSTACK_MK_DECL
+
+ proc_fun->extern_ops.module_init = rsocket_init;
+ proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl;
+ proc_fun->extern_ops.ep_getevt = NULL;
+
+ g_rr_var.type = event_ops->type;
+ g_rr_var.event_cb = event_ops->event_cb;
+
+ return 0;
+}
+
diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h
new file mode 100644
index 0000000..43ec6cf
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_adpt.h
@@ -0,0 +1,55 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_ADPT_H_
+#define _RSOCKET_ADPT_H_
+
+#include "indexer.h"
+#include "rsocket_rdma.h"
+
+enum {
+ RR_STAT_EPW_ERR,
+ RR_STAT_EPW_EINTR,
+ RR_STAT_EPW_ETIMEOUT,
+
+ RR_STAT_NUM
+};
+
+#define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num)
+#define RR_STAT_SUB(id, num) __sync_sub_and_fetch(&g_rr_var.stat[(id)], num)
+#define RR_STAT_INC(id) RR_STAT_ADD((id), 1)
+#define RR_STAT_DEC(id) RR_STAT_SUB((id), 1)
+
+#define RSRDMA_EXIT 1
+
+typedef struct rsocket_var {
+ pthread_t epoll_threadid;
+
+ int epfd;
+ int type;
+ int (*event_cb) (void *pdata, int events);
+
+ uint64_t stat[RR_STAT_NUM];
+} rsocket_var_t;
+
+extern rsocket_var_t g_rr_var;
+
+
+int rr_rs_handle(int fd, uint32_t events);
+
+
+#endif/* #ifndef _RSOCKET_ADPT_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_in.h b/stacks/rsocket/src/rsocket_in.h
new file mode 100644
index 0000000..b4e1ae6
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_in.h
@@ -0,0 +1,33 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_IN_H_
+#define _RSOCKET_IN_H_
+
+#include "rsocket_rdma.h"
+
+inline static void rr_rs_init(struct rsocket *rs);
+inline static void rr_rs_dest(struct rsocket *rs);
+
+static inline void rr_rs_notify_tcp(struct rsocket *rs);
+static inline void rr_rs_notify_udp(struct rsocket *rs);
+
+inline static void rr_rs_handle_tcp(struct rsocket *rs);
+inline static int rr_rs_evfd(struct rsocket *rs);
+inline static void rr_rs_connected(struct rsocket *rs);
+
+#endif/* #ifndef _RSOCKET_IN_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h
new file mode 100644
index 0000000..076b6c9
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_rdma.h
@@ -0,0 +1,97 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_RDMA_H_
+#define _RSOCKET_RDMA_H_
+
+#include <stdint.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+
+enum {
+ RR_LOG_OFF = 0x00,
+ RR_LOG_ERR = 0x01,
+ RR_LOG_WRN = 0x02,
+ RR_LOG_LOG = 0x03,
+ RR_LOG_DBG = 0x04,
+};
+
+#define RR_OUT(level, name, fmt, arg...) do { \
+ extern int g_rr_log_level; \
+ if (g_rr_log_level >= level) { \
+ (void)printf("#RSOCKET:%s# %s:%d> " fmt "", \
+ name, __func__, __LINE__, ##arg); \
+ } \
+} while (0)
+
+#define RR_ERR(fmt, arg...) RR_OUT(RR_LOG_ERR, "ERR", fmt, ##arg)
+#define RR_WRN(fmt, arg...) RR_OUT(RR_LOG_WRN, "WRN", fmt, ##arg)
+#define RR_LOG(fmt, arg...) RR_OUT(RR_LOG_LOG, "LOG", fmt, ##arg)
+#if defined(RSOCKET_DEBUG) && RSOCKET_DEBUG > 0
+#define RR_DBG(fmt, arg...) RR_OUT(RR_LOG_DBG, "DBG", fmt, ##arg)
+#else
+#define RR_DBG(fmt, arg...) ((void)0)
+#endif
+
+
+#define _err(err_no) ((errno = (err_no)), -1)
+
+
+int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent);
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent);
+int rr_rs_ep_del(int fd);
+
+uint32_t rr_rs_poll(int fd, uint32_t revents);
+
+int rr_notify_event(void *pdata, int events);
+
+
+typedef struct rr_socket_api {
+ #define RR_SAPI(name) typeof(name) *n_##name;/* native api */
+ #include "rsocket_sapi.h"
+ #undef RR_SAPI
+} rr_sapi_t;
+
+extern rr_sapi_t g_sapi;
+
+#define GSAPI(name) g_sapi.n_##name
+
+
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd);
+
+inline static int rr_ep_add(int evfd, int rsfd)
+{
+ return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, rsfd);
+}
+
+inline static int rr_ep_del(int evfd)
+{
+ if (evfd < 0)
+ return 0;
+ return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0);
+}
+
+
+#endif/* #ifndef _RSOCKET_RDMA_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c
new file mode 100644
index 0000000..ca92c3d
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_rs.c
@@ -0,0 +1,354 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_RS_C_
+#define _RSOCKET_RS_C_
+
+
+inline static void rr_rs_init(struct rsocket *rs)
+{
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
+}
+
+inline static void rr_rs_dest(struct rsocket *rs)
+{
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
+
+ if (rs->rr_epoll_ref) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
+ }
+}
+
+#ifndef POLL__RSOCKET_RS_H_
+#define POLL__RSOCKET_RS_H_
+
+static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs)
+{
+ uint32_t events = 0;
+ if (rs->state & rs_connected) {
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (rs_can_send(rs))
+ events |= EPOLLOUT;
+ }
+ if (rs->state & (rs_error | rs_connect_error))
+ events |= EPOLLERR;
+ if (rs->state & rs_disconnected)
+ events |= EPOLLHUP;
+ return events;
+}
+
+static inline uint32_t rr_rs_poll_udp(struct rsocket *rs)
+{
+ uint32_t events = 0;
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (ds_can_send(rs))
+ events |= EPOLLOUT;
+ if (rs->state & rs_error)
+ events |= EPOLLERR;
+ return events;
+}
+
+static inline uint32_t rr_rs_poll_both(struct rsocket *rs)
+{
+ if (rs->type == SOCK_STREAM)
+ return rr_rs_poll_tcp(rs);
+
+ if (rs->type == SOCK_DGRAM)
+ return rr_rs_poll_udp(rs);
+
+ return 0;
+}
+
+uint32_t rr_rs_poll(int fd, uint32_t revents)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+
+ if (!rs)
+ return 0;
+
+ if (rs->state == rs_listening)
+ return revents;
+
+ return rr_rs_poll_both(rs);
+}
+
+#endif/* #ifndef POLL__RSOCKET_RS_H_ */
+
+
+static inline void rr_rs_notify_tcp(struct rsocket *rs)
+{
+ if (rs->rr_epoll_ref) {
+ uint32_t events = rr_rs_poll_tcp(rs);
+ if (events)
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+}
+
+static inline void rr_rs_notify_udp(struct rsocket *rs)
+{
+ if (rs->rr_epoll_ref) {
+ uint32_t events = rr_rs_poll_udp(rs);
+ if (events)
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+}
+
+#ifndef HANDLE__RSOCKET_RS_H_
+#define HANDLE__RSOCKET_RS_H_
+
+inline static void rr_rs_handle_tcp(struct rsocket *rs)
+{
+ int ret;
+
+ RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state);
+
+ if (!(rs->state & (rs_connected | rs_opening)))
+ return;
+
+ fastlock_acquire(&rs->cq_wait_lock);
+ ret = rs_get_cq_event(rs);
+ RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
+ fastlock_release(&rs->cq_wait_lock);
+
+ fastlock_acquire(&rs->cq_lock);
+
+ if (rs->state & rs_connected) {
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
+ rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
+ }
+
+ if (rs->rr_epoll_ref && rs->cq_armed < 1) {
+ ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ if (0 == ret)
+ __sync_fetch_and_add(&rs->cq_armed, 1);
+ }
+
+ if (rs->state & rs_connected) {
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ rs_update_credits(rs);
+ }
+
+ fastlock_release(&rs->cq_lock);
+
+ RR_DBG("(%d)=\n", rs->index);
+}
+
+inline static void rr_rs_handle_udp(struct rsocket *rs)
+{
+ fastlock_acquire(&rs->cq_wait_lock);
+ ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
+
+ fastlock_acquire(&rs->cq_lock);
+ ds_poll_cqs(rs);
+ if (rs->rr_epoll_ref && !rs->cq_armed) {
+ ds_req_notify_cqs(rs);
+ rs->cq_armed = 1;
+ }
+ fastlock_release(&rs->cq_lock);
+}
+
+inline static void rr_rs_handle_rs(struct rsocket *rs)
+{
+ if (rs->state & rs_opening) {
+ int ret = rs_do_connect(rs);
+ RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
+ return;
+ }
+
+ if (rs->type == SOCK_STREAM) {
+ rr_rs_handle_tcp(rs);
+ }
+
+ if (rs->type == SOCK_DGRAM) {
+ rr_rs_handle_udp(rs);
+ }
+}
+
+int rr_rs_handle(int fd, uint32_t events)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+
+ RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
+
+ if (!rs)
+ return _err(EBADF);
+
+ if (rs->state == rs_listening) {
+ if (events & EPOLLIN) {
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+ return 0;
+ }
+
+ rr_rs_handle_rs(rs);
+
+ return 0;
+}
+
+
+#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */
+
+#ifndef ADPT__RSOCKET_RS_H_
+#define ADPT__RSOCKET_RS_H_
+
+inline static int rr_rs_evfd(struct rsocket *rs)
+{
+ if (rs->type == SOCK_STREAM) {
+ if (rs->state >= rs_connected)
+ return rs->cm_id->recv_cq_channel->fd;
+ else
+ return rs->cm_id->channel->fd;
+ } else {
+ return rs->epfd;
+ }
+
+ return -1;
+}
+
+int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent)
+{
+ int ref;
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
+
+ ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1);
+ if (1 == ref) {
+ rs->rr_epoll_fd = rr_rs_evfd(rs);
+ (void)rr_ep_add(rs->rr_epoll_fd, rs->index);
+ }
+
+ (void)rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+
+ rs->rr_epoll_pdata = pdata;
+
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
+}
+
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
+
+ if (rs->rr_epoll_ref <= 0)
+ return _err(ENOENT);
+
+ (void)rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+
+ rs->rr_epoll_pdata = pdata;
+
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
+}
+
+int rr_rs_ep_del(int fd)
+{
+ int ref;
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p))\n", fd, rs);
+
+ if (!rs)
+ return _err(EBADF);
+
+ ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1);
+ if (0 == ref) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = -1;
+ }
+
+ return 0;
+}
+
+#endif/* #ifndef ADPT__RSOCKET_RS_H_ */
+
+inline static void rr_rs_connected(struct rsocket *rs)
+{
+ RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
+ rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state);
+
+ if (!(rs->state & rs_connected)) {
+ rr_rs_notify_tcp(rs);
+ return;
+ }
+
+ if (rs->rr_epoll_ref) {
+ int evfd = rr_rs_evfd(rs);
+
+ if (evfd != rs->rr_epoll_fd) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = evfd;
+ (void)rr_ep_add(evfd, rs->index);
+ }
+
+ rr_rs_handle_tcp(rs);
+ }
+}
+
+int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags)
+{
+ int ret, fd;
+ struct rsocket *rs;
+
+ RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
+ fd = raccept(socket, addr, addrlen);
+ RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
+ if (fd < 0)
+ return fd;
+
+ rs = (struct rsocket*)idm_lookup(&idm, fd);
+ if (!rs) {
+ RR_ERR("panic\n");
+ return -1;
+ }
+
+ if (flags & SOCK_NONBLOCK) {
+ if (0 == (rs->fd_flags & O_NONBLOCK)) {
+ RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL));
+ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+ if (0 == ret)
+ rs->fd_flags |= O_NONBLOCK;
+ }
+ }
+
+ if (flags & SOCK_CLOEXEC) {
+ RR_LOG("ignore flag:SOCK_CLOEXEC\n");
+ }
+
+ return fd;
+}
+
+
+#endif/* #ifndef _RSOCKET_RS_C_ */
+
diff --git a/stacks/rsocket/src/rsocket_sapi.h b/stacks/rsocket/src/rsocket_sapi.h
new file mode 100644
index 0000000..a9ca932
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_sapi.h
@@ -0,0 +1,38 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+RR_SAPI(socket)
+RR_SAPI(close)
+RR_SAPI(bind)
+RR_SAPI(connect)
+RR_SAPI(getpeername)
+RR_SAPI(getsockname)
+
+RR_SAPI(fcntl)
+RR_SAPI(setsockopt)
+RR_SAPI(getsockopt)
+
+RR_SAPI(read)
+RR_SAPI(write)
+
+RR_SAPI(sendmsg)
+RR_SAPI(recvfrom)
+
+RR_SAPI(poll)
+
+RR_SAPI(epoll_create)
+RR_SAPI(epoll_ctl)
+RR_SAPI(epoll_wait)