aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket/rsocket.patch
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket/rsocket.patch')
-rw-r--r--stacks/rsocket/rsocket.patch751
1 files changed, 751 insertions, 0 deletions
diff --git a/stacks/rsocket/rsocket.patch b/stacks/rsocket/rsocket.patch
new file mode 100644
index 0000000..430f178
--- /dev/null
+++ b/stacks/rsocket/rsocket.patch
@@ -0,0 +1,751 @@
+diff --git a/Makefile.am b/Makefile.am
+index bf72134..cb02f56 100644
+--- a/Makefile.am
++++ b/Makefile.am
+@@ -22,6 +22,12 @@ src_librdmacm_la_LDFLAGS = -version-info 1 -export-dynamic \
+ $(librdmacm_version_script)
+ src_librdmacm_la_DEPENDENCIES = $(srcdir)/src/librdmacm.map
+
++noinst_LIBRARIES = libdmm_rdmacm.a
++libdmm_rdmacm_a_CFLAGS = $(src_librdmacm_la_CFLAGS) -O2 -fPIC -m64 \
++ -I@dmm_inc_dir@ -I$(srcdir)/../src -DDMM_RSOCKET=1 -DRSOCKET_DEBUG=@RSOCKET_DEBUG@
++libdmm_rdmacm_a_LDFLAGS =
++libdmm_rdmacm_a_SOURCES = $(src_librdmacm_la_SOURCES)
++
+ src_librspreload_la_SOURCES = src/preload.c src/indexer.c
+ src_librspreload_la_LDFLAGS = -version-info 1 -export-dynamic
+ src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la
+diff --git a/configure.ac b/configure.ac
+index 4a43995..98601ea 100644
+--- a/configure.ac
++++ b/configure.ac
+@@ -104,5 +104,12 @@ if test "x$rdmadir" = "x"; then
+ AC_SUBST(rdmadir, rdma)
+ fi
+
++AC_ARG_VAR(dmm_inc_dir, [Directory for dmm include files])
++if test "x$dmm_inc_dir" = "x"; then
++ AC_MSG_ERROR([must set dmm_inc_dir])
++fi
++
++AC_ARG_VAR(RSOCKET_DEBUG, [rsocket debug flag 0 or 1])
++
+ AC_CONFIG_FILES([Makefile librdmacm.spec])
+ AC_OUTPUT
+diff --git a/src/cma.c b/src/cma.c
+index a89e663..071222d 100644
+--- a/src/cma.c
++++ b/src/cma.c
+@@ -60,6 +60,12 @@
+ #include <rdma/rdma_verbs.h>
+ #include <infiniband/ib.h>
+
++#ifdef DMM_RSOCKET
++#include "rsocket_rdma.h"
++#else
++#define GSAPI(name) name
++#endif
++
+ #define CMA_INIT_CMD(req, req_size, op) \
+ do { \
+ memset(req, 0, req_size); \
+@@ -564,7 +570,7 @@ static int rdma_create_id2(struct rdma_event_channel *channel,
+ cmd.ps = ps;
+ cmd.qp_type = qp_type;
+
+- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ goto err;
+
+@@ -599,7 +605,7 @@ static int ucma_destroy_kern_id(int fd, uint32_t handle)
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, DESTROY_ID, &resp, sizeof resp);
+ cmd.id = handle;
+
+- ret = write(fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -659,7 +665,7 @@ static int ucma_query_addr(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_ADDR;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -692,7 +698,7 @@ static int ucma_query_gid(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_GID;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -753,7 +759,7 @@ static int ucma_query_path(struct rdma_cm_id *id)
+ cmd.id = id_priv->handle;
+ cmd.option = UCMA_QUERY_PATH;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -784,7 +790,7 @@ static int ucma_query_route(struct rdma_cm_id *id)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -835,7 +841,7 @@ static int rdma_bind_addr2(struct rdma_cm_id *id, struct sockaddr *addr,
+ cmd.addr_size = addrlen;
+ memcpy(&cmd.addr, addr, addrlen);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -863,7 +869,7 @@ int rdma_bind_addr(struct rdma_cm_id *id, struct sockaddr *addr)
+ cmd.id = id_priv->handle;
+ memcpy(&cmd.addr, addr, addrlen);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -916,7 +922,7 @@ static int rdma_resolve_addr2(struct rdma_cm_id *id, struct sockaddr *src_addr,
+ cmd.dst_size = dst_len;
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -951,7 +957,7 @@ int rdma_resolve_addr(struct rdma_cm_id *id, struct sockaddr *src_addr,
+ memcpy(&cmd.dst_addr, dst_addr, dst_len);
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1003,7 +1009,7 @@ int rdma_resolve_route(struct rdma_cm_id *id, int timeout_ms)
+ cmd.id = id_priv->handle;
+ cmd.timeout_ms = timeout_ms;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1029,7 +1035,7 @@ static int rdma_init_qp_attr(struct rdma_cm_id *id, struct ibv_qp_attr *qp_attr,
+ cmd.id = id_priv->handle;
+ cmd.qp_state = qp_attr->qp_state;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1530,7 +1536,7 @@ int rdma_connect(struct rdma_cm_id *id, struct rdma_conn_param *conn_param)
+ conn_param, 0, 0);
+ }
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1553,7 +1559,7 @@ int rdma_listen(struct rdma_cm_id *id, int backlog)
+ cmd.id = id_priv->handle;
+ cmd.backlog = backlog;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1656,7 +1662,7 @@ int rdma_accept(struct rdma_cm_id *id, struct rdma_conn_param *conn_param)
+ conn_param, conn_param->qp_num,
+ conn_param->srq);
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ucma_modify_qp_err(id);
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+@@ -1684,7 +1690,7 @@ int rdma_reject(struct rdma_cm_id *id, const void *private_data,
+ cmd.private_data_len = private_data_len;
+ }
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1702,7 +1708,7 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+ cmd.event = event;
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1735,7 +1741,7 @@ int rdma_disconnect(struct rdma_cm_id *id)
+ id_priv = container_of(id, struct cma_id_private, id);
+ cmd.id = id_priv->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -1778,7 +1784,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr,
+ cmd.uid = (uintptr_t) mc;
+ cmd.reserved = 0;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err2;
+@@ -1791,7 +1797,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr,
+ memcpy(&cmd.addr, addr, addrlen);
+ cmd.uid = (uintptr_t) mc;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err2;
+@@ -1857,7 +1863,7 @@ int rdma_leave_multicast(struct rdma_cm_id *id, struct sockaddr *addr)
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, LEAVE_MCAST, &resp, sizeof resp);
+ cmd.id = mc->handle;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto free;
+@@ -2019,7 +2025,7 @@ static int ucma_process_conn_resp(struct cma_id_private *id_priv)
+ CMA_INIT_CMD(&cmd, sizeof cmd, ACCEPT);
+ cmd.id = id_priv->handle;
+
+- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ ret = (ret >= 0) ? ERR(ENODATA) : -1;
+ goto err;
+@@ -2103,7 +2109,7 @@ int rdma_get_cm_event(struct rdma_event_channel *channel,
+ retry:
+ memset(evt, 0, sizeof(*evt));
+ CMA_INIT_CMD_RESP(&cmd, sizeof cmd, GET_EVENT, &resp, sizeof resp);
+- ret = write(channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ free(evt);
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+@@ -2274,7 +2280,7 @@ int rdma_set_option(struct rdma_cm_id *id, int level, int optname,
+ cmd.optname = optname;
+ cmd.optlen = optlen;
+
+- ret = write(id->channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd)
+ return (ret >= 0) ? ERR(ENODATA) : -1;
+
+@@ -2302,7 +2308,7 @@ int rdma_migrate_id(struct rdma_cm_id *id, struct rdma_event_channel *channel)
+ cmd.id = id_priv->handle;
+ cmd.fd = id->channel->fd;
+
+- ret = write(channel->fd, &cmd, sizeof cmd);
++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd);
+ if (ret != sizeof cmd) {
+ if (sync)
+ rdma_destroy_event_channel(channel);
+diff --git a/src/rsocket.c b/src/rsocket.c
+index c4f1b57..faa3d3a 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -382,6 +382,11 @@ struct rsocket {
+ dlist_entry iomap_queue;
+ int iomap_pending;
+ int unack_cqe;
++#ifdef DMM_RSOCKET
++ int rr_epoll_ref;
++ int rr_epoll_fd;
++ void *rr_epoll_pdata;
++#endif
+ };
+
+ #define DS_UDP_TAG 0x55555555
+@@ -404,6 +409,12 @@ struct ds_udp_header {
+
+ #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
+
++#ifdef DMM_RSOCKET
++#include "rsocket_in.h"
++#else
++#define GSAPI(name) name
++#endif
++
+ static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
+ {
+ if (!rs->qp_list)
+@@ -444,16 +455,16 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
+ msg.cmd = cmd;
+ msg.status = EINVAL;
+ msg.rs = rs;
+- write(svc->sock[0], &msg, sizeof msg);
+- read(svc->sock[0], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[0], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[0], &msg, sizeof msg);
+ ret = rdma_seterrno(msg.status);
+ if (svc->cnt)
+ goto unlock;
+
+ pthread_join(svc->id, NULL);
+ closepair:
+- close(svc->sock[0]);
+- close(svc->sock[1]);
++ GSAPI(close)(svc->sock[0]);
++ GSAPI(close)(svc->sock[1]);
+ unlock:
+ pthread_mutex_unlock(&mut);
+ return ret;
+@@ -607,6 +618,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
+ fastlock_init(&rs->map_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
++#ifdef DMM_RSOCKET
++ rr_rs_init(rs);
++#endif
+ return rs;
+ }
+
+@@ -617,16 +631,16 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg)
+
+ if (rs->type == SOCK_STREAM) {
+ if (rs->cm_id->recv_cq_channel)
+- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+
+ if (!ret && rs->state < rs_connected)
+- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, arg);
+ } else {
+- ret = fcntl(rs->epfd, F_SETFL, arg);
++ ret = GSAPI(fcntl)(rs->epfd, F_SETFL, arg);
+ if (!ret && rs->qp_list) {
+ qp = rs->qp_list;
+ do {
+- ret = fcntl(qp->cm_id->recv_cq_channel->fd,
++ ret = GSAPI(fcntl)(qp->cm_id->recv_cq_channel->fd,
+ F_SETFL, arg);
+ qp = ds_next_qp(qp);
+ } while (qp != rs->qp_list && !ret);
+@@ -767,7 +781,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
+ goto err1;
+
+ if (rs->fd_flags & O_NONBLOCK) {
+- if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
++ if (GSAPI(fcntl)(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
+ goto err2;
+ }
+
+@@ -918,7 +932,7 @@ static void ds_free_qp(struct ds_qp *qp)
+ if (qp->cm_id) {
+ if (qp->cm_id->qp) {
+ tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
+- epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
++ GSAPI(epoll_ctl)(qp->rs->epfd, EPOLL_CTL_DEL,
+ qp->cm_id->recv_cq_channel->fd, NULL);
+ rdma_destroy_qp(qp->cm_id);
+ }
+@@ -932,8 +946,12 @@ static void ds_free(struct rsocket *rs)
+ {
+ struct ds_qp *qp;
+
++#ifdef DMM_RSOCKET
++ rr_rs_dest(rs);
++#endif
++
+ if (rs->udp_sock >= 0)
+- close(rs->udp_sock);
++ GSAPI(close)(rs->udp_sock);
+
+ if (rs->index >= 0)
+ rs_remove(rs);
+@@ -947,7 +965,7 @@ static void ds_free(struct rsocket *rs)
+ }
+
+ if (rs->epfd >= 0)
+- close(rs->epfd);
++ GSAPI(close)(rs->epfd);
+
+ if (rs->sbuf)
+ free(rs->sbuf);
+@@ -968,6 +986,10 @@ static void rs_free(struct rsocket *rs)
+ return;
+ }
+
++#ifdef DMM_RSOCKET
++ rr_rs_dest(rs);
++#endif
++
+ if (rs->rmsg)
+ free(rs->rmsg);
+
+@@ -1059,11 +1081,11 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+
+ static int ds_init(struct rsocket *rs, int domain)
+ {
+- rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
++ rs->udp_sock = GSAPI(socket)(domain, SOCK_DGRAM, 0);
+ if (rs->udp_sock < 0)
+ return rs->udp_sock;
+
+- rs->epfd = epoll_create(2);
++ rs->epfd = GSAPI(epoll_create)(2);
+ if (rs->epfd < 0)
+ return rs->epfd;
+
+@@ -1164,7 +1186,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ if (ret)
+ return ret;
+ }
+- ret = bind(rs->udp_sock, addr, addrlen);
++ ret = GSAPI(bind)(rs->udp_sock, addr, addrlen);
+ }
+ return ret;
+ }
+@@ -1229,8 +1251,11 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ goto err;
+ }
+
+- if (rs->fd_flags & O_NONBLOCK)
+- fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
++ if (rs->fd_flags & O_NONBLOCK) {
++ ret = GSAPI(fcntl)(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
++ if (0 == ret)
++ new_rs->fd_flags |= O_NONBLOCK;
++ }
+
+ ret = rs_create_ep(new_rs);
+ if (ret)
+@@ -1354,7 +1379,7 @@ connected:
+ break;
+ case rs_accepting:
+ if (!(rs->fd_flags & O_NONBLOCK))
+- fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
++ GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, 0);
+
+ ret = ucma_complete(rs->cm_id);
+ if (ret)
+@@ -1375,6 +1400,10 @@ connected:
+ rs->err = errno;
+ }
+ }
++#ifdef DMM_RSOCKET
++ rr_rs_connected(rs);
++#endif
++
+ return ret;
+ }
+
+@@ -1397,24 +1426,24 @@ static int ds_get_src_addr(struct rsocket *rs,
+ uint16_t port;
+
+ *src_len = sizeof(*src_addr);
+- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
++ ret = GSAPI(getsockname)(rs->udp_sock, &src_addr->sa, src_len);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ port = src_addr->sin.sin_port;
+- sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
++ sock = GSAPI(socket)(dest_addr->sa_family, SOCK_DGRAM, 0);
+ if (sock < 0)
+ return sock;
+
+- ret = connect(sock, dest_addr, dest_len);
++ ret = GSAPI(connect)(sock, dest_addr, dest_len);
+ if (ret)
+ goto out;
+
+ *src_len = sizeof(*src_addr);
+- ret = getsockname(sock, &src_addr->sa, src_len);
++ ret = GSAPI(getsockname)(sock, &src_addr->sa, src_len);
+ src_addr->sin.sin_port = port;
+ out:
+- close(sock);
++ GSAPI(close)(sock);
+ return ret;
+ }
+
+@@ -1512,7 +1541,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+
+ event.events = EPOLLIN;
+ event.data.ptr = qp;
+- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
++ ret = GSAPI(epoll_ctl)(rs->epfd, EPOLL_CTL_ADD,
+ qp->cm_id->recv_cq_channel->fd, &event);
+ if (ret)
+ goto err;
+@@ -1609,7 +1638,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ }
+
+ fastlock_acquire(&rs->slock);
+- ret = connect(rs->udp_sock, addr, addrlen);
++ ret = GSAPI(connect)(rs->udp_sock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ fastlock_release(&rs->slock);
+@@ -1903,12 +1932,18 @@ static int rs_poll_cq(struct rsocket *rs)
+ case RS_OP_CTRL:
+ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
+ rs->state = rs_disconnected;
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return 0;
+ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
+ if (rs->state & rs_writable) {
+ rs->state &= ~rs_readable;
+ } else {
+ rs->state = rs_disconnected;
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return 0;
+ }
+ }
+@@ -1959,6 +1994,9 @@ static int rs_poll_cq(struct rsocket *rs)
+ rs->err = errno;
+ }
+ }
++#ifdef DMM_RSOCKET
++ rr_rs_notify_tcp(rs);
++#endif
+ return ret;
+ }
+
+@@ -1977,7 +2015,11 @@ static int rs_get_cq_event(struct rsocket *rs)
+ ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe);
+ rs->unack_cqe = 0;
+ }
++#ifdef DMM_RSOCKET
++ __sync_fetch_and_sub(&rs->cq_armed, 1);
++#else
+ rs->cq_armed = 0;
++#endif
+ } else if (!(errno == EAGAIN || errno == EINTR)) {
+ rs->state = rs_error;
+ }
+@@ -2015,8 +2057,13 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+ } else if (nonblock) {
+ ret = ERR(EWOULDBLOCK);
+ } else if (!rs->cq_armed) {
++#ifdef DMM_RSOCKET
++ if (0 == ibv_req_notify_cq(rs->cm_id->recv_cq, 0))
++ __sync_fetch_and_add(&rs->cq_armed, 1);
++#else
+ ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ rs->cq_armed = 1;
++#endif
+ } else {
+ rs_update_credits(rs);
+ fastlock_acquire(&rs->cq_wait_lock);
+@@ -2116,11 +2163,19 @@ static void ds_poll_cqs(struct rsocket *rs)
+ qp = ds_next_qp(qp);
+ if (!rs->rqe_avail && rs->sqe_avail) {
+ rs->qp_list = qp;
++#ifdef DMM_RSOCKET
++ goto END;
++#else
+ return;
++#endif
+ }
+ cnt++;
+ } while (qp != rs->qp_list);
+ } while (cnt);
++#ifdef DMM_RSOCKET
++END:
++ rr_rs_notify_udp(rs);
++#endif
+ }
+
+ static void ds_req_notify_cqs(struct rsocket *rs)
+@@ -2150,7 +2205,7 @@ static int ds_get_cq_event(struct rsocket *rs)
+ if (!rs->cq_armed)
+ return 0;
+
+- ret = epoll_wait(rs->epfd, &event, 1, -1);
++ ret = GSAPI(epoll_wait)(rs->epfd, &event, 1, -1);
+ if (ret <= 0)
+ return ret;
+
+@@ -2607,7 +2662,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+ msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
+- ret = sendmsg(rs->udp_sock, &msg, flags);
++ ret = GSAPI(sendmsg)(rs->udp_sock, &msg, flags);
+ return ret > 0 ? ret - hdr.length : ret;
+ }
+
+@@ -2958,7 +3013,7 @@ check_cq:
+ fds.fd = rs->cm_id->channel->fd;
+ fds.events = events;
+ fds.revents = 0;
+- poll(&fds, 1, 0);
++ GSAPI(poll)(&fds, 1, 0);
+ return fds.revents;
+ }
+
+@@ -2994,7 +3049,7 @@ static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
+ if (rs)
+ fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
+ else
+- poll(&fds[i], 1, 0);
++ GSAPI(poll)(&fds[i], 1, 0);
+
+ if (fds[i].revents)
+ cnt++;
+@@ -3094,7 +3149,7 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
+ if (ret)
+ break;
+
+- ret = poll(rfds, nfds, timeout);
++ ret = GSAPI(poll)(rfds, nfds, timeout);
+ if (ret <= 0)
+ break;
+
+@@ -3314,7 +3369,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+- return getpeername(rs->udp_sock, addr, addrlen);
++ return GSAPI(getpeername)(rs->udp_sock, addr, addrlen);
+ }
+ }
+
+@@ -3329,7 +3384,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+- return getsockname(rs->udp_sock, addr, addrlen);
++ return GSAPI(getsockname)(rs->udp_sock, addr, addrlen);
+ }
+ }
+
+@@ -3371,7 +3426,7 @@ int rsetsockopt(int socket, int level, int optname,
+ if (!rs)
+ return ERR(EBADF);
+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
+- ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
++ ret = GSAPI(setsockopt)(rs->udp_sock, level, optname, optval, optlen);
+ if (ret)
+ return ret;
+ }
+@@ -3985,7 +4040,7 @@ static void udp_svc_process_sock(struct rs_svc *svc)
+ {
+ struct rs_svc_msg msg;
+
+- read(svc->sock[1], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_DGRAM:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+@@ -4009,7 +4064,7 @@ static void udp_svc_process_sock(struct rs_svc *svc)
+ break;
+ }
+
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ }
+
+ static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+@@ -4137,7 +4192,7 @@ static void udp_svc_process_rs(struct rsocket *rs)
+ socklen_t addrlen = sizeof addr;
+ int len, ret;
+
+- ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
++ ret = GSAPI(recvfrom)(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+@@ -4184,7 +4239,7 @@ static void *udp_svc_run(void *arg)
+ ret = rs_svc_grow_sets(svc, 4);
+ if (ret) {
+ msg.status = ret;
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+@@ -4195,7 +4250,7 @@ static void *udp_svc_run(void *arg)
+ for (i = 0; i <= svc->cnt; i++)
+ udp_svc_fds[i].revents = 0;
+
+- poll(udp_svc_fds, svc->cnt + 1, -1);
++ GSAPI(poll)(udp_svc_fds, svc->cnt + 1, -1);
+ if (udp_svc_fds[0].revents)
+ udp_svc_process_sock(svc);
+
+@@ -4222,7 +4277,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc)
+ struct rs_svc_msg msg;
+ int i;
+
+- read(svc->sock[1], &msg, sizeof msg);
++ GSAPI(read)(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_KEEPALIVE:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+@@ -4253,7 +4308,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc)
+ default:
+ break;
+ }
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ }
+
+ /*
+@@ -4282,7 +4337,7 @@ static void *tcp_svc_run(void *arg)
+ ret = rs_svc_grow_sets(svc, 16);
+ if (ret) {
+ msg.status = ret;
+- write(svc->sock[1], &msg, sizeof msg);
++ GSAPI(write)(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+@@ -4291,7 +4346,7 @@ static void *tcp_svc_run(void *arg)
+ fds.events = POLLIN;
+ timeout = -1;
+ do {
+- poll(&fds, 1, timeout * 1000);
++ GSAPI(poll)(&fds, 1, timeout * 1000);
+ if (fds.revents)
+ tcp_svc_process_sock(svc);
+
+@@ -4311,3 +4366,8 @@ static void *tcp_svc_run(void *arg)
+
+ return NULL;
+ }
++
++#ifdef DMM_RSOCKET
++#include "rsocket_rs.c"
++#endif
++