diff options
Diffstat (limited to 'stacks/rsocket/rsocket.patch')
-rw-r--r-- | stacks/rsocket/rsocket.patch | 751 |
1 files changed, 751 insertions, 0 deletions
diff --git a/stacks/rsocket/rsocket.patch b/stacks/rsocket/rsocket.patch new file mode 100644 index 0000000..430f178 --- /dev/null +++ b/stacks/rsocket/rsocket.patch @@ -0,0 +1,751 @@ +diff --git a/Makefile.am b/Makefile.am +index bf72134..cb02f56 100644 +--- a/Makefile.am ++++ b/Makefile.am +@@ -22,6 +22,12 @@ src_librdmacm_la_LDFLAGS = -version-info 1 -export-dynamic \ + $(librdmacm_version_script) + src_librdmacm_la_DEPENDENCIES = $(srcdir)/src/librdmacm.map + ++noinst_LIBRARIES = libdmm_rdmacm.a ++libdmm_rdmacm_a_CFLAGS = $(src_librdmacm_la_CFLAGS) -O2 -fPIC -m64 \ ++ -I@dmm_inc_dir@ -I$(srcdir)/../src -DDMM_RSOCKET=1 -DRSOCKET_DEBUG=@RSOCKET_DEBUG@ ++libdmm_rdmacm_a_LDFLAGS = ++libdmm_rdmacm_a_SOURCES = $(src_librdmacm_la_SOURCES) ++ + src_librspreload_la_SOURCES = src/preload.c src/indexer.c + src_librspreload_la_LDFLAGS = -version-info 1 -export-dynamic + src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la +diff --git a/configure.ac b/configure.ac +index 4a43995..98601ea 100644 +--- a/configure.ac ++++ b/configure.ac +@@ -104,5 +104,12 @@ if test "x$rdmadir" = "x"; then + AC_SUBST(rdmadir, rdma) + fi + ++AC_ARG_VAR(dmm_inc_dir, [Directory for dmm include files]) ++if test "x$dmm_inc_dir" = "x"; then ++ AC_MSG_ERROR([must set dmm_inc_dir]) ++fi ++ ++AC_ARG_VAR(RSOCKET_DEBUG, [rsocket debug flag 0 or 1]) ++ + AC_CONFIG_FILES([Makefile librdmacm.spec]) + AC_OUTPUT +diff --git a/src/cma.c b/src/cma.c +index a89e663..071222d 100644 +--- a/src/cma.c ++++ b/src/cma.c +@@ -60,6 +60,12 @@ + #include <rdma/rdma_verbs.h> + #include <infiniband/ib.h> + ++#ifdef DMM_RSOCKET ++#include "rsocket_rdma.h" ++#else ++#define GSAPI(name) name ++#endif ++ + #define CMA_INIT_CMD(req, req_size, op) \ + do { \ + memset(req, 0, req_size); \ +@@ -564,7 +570,7 @@ static int rdma_create_id2(struct rdma_event_channel *channel, + cmd.ps = ps; + cmd.qp_type = qp_type; + +- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + goto err; + +@@ -599,7 +605,7 @@ static int ucma_destroy_kern_id(int fd, uint32_t handle) + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, DESTROY_ID, &resp, sizeof resp); + cmd.id = handle; + +- ret = write(fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -659,7 +665,7 @@ static int ucma_query_addr(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_ADDR; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -692,7 +698,7 @@ static int ucma_query_gid(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_GID; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -753,7 +759,7 @@ static int ucma_query_path(struct rdma_cm_id *id) + cmd.id = id_priv->handle; + cmd.option = UCMA_QUERY_PATH; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -784,7 +790,7 @@ static int ucma_query_route(struct rdma_cm_id *id) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -835,7 +841,7 @@ static int rdma_bind_addr2(struct rdma_cm_id *id, struct sockaddr *addr, + cmd.addr_size = addrlen; + memcpy(&cmd.addr, addr, addrlen); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -863,7 +869,7 @@ int rdma_bind_addr(struct rdma_cm_id *id, struct sockaddr *addr) + cmd.id = id_priv->handle; + memcpy(&cmd.addr, addr, addrlen); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -916,7 +922,7 @@ static int rdma_resolve_addr2(struct rdma_cm_id *id, struct sockaddr *src_addr, + cmd.dst_size = dst_len; + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -951,7 +957,7 @@ int rdma_resolve_addr(struct rdma_cm_id *id, struct sockaddr *src_addr, + memcpy(&cmd.dst_addr, dst_addr, dst_len); + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1003,7 +1009,7 @@ int rdma_resolve_route(struct rdma_cm_id *id, int timeout_ms) + cmd.id = id_priv->handle; + cmd.timeout_ms = timeout_ms; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1029,7 +1035,7 @@ static int rdma_init_qp_attr(struct rdma_cm_id *id, struct ibv_qp_attr *qp_attr, + cmd.id = id_priv->handle; + cmd.qp_state = qp_attr->qp_state; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1530,7 +1536,7 @@ int rdma_connect(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) + conn_param, 0, 0); + } + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1553,7 +1559,7 @@ int rdma_listen(struct rdma_cm_id *id, int backlog) + cmd.id = id_priv->handle; + cmd.backlog = backlog; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1656,7 +1662,7 @@ int rdma_accept(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) + conn_param, conn_param->qp_num, + conn_param->srq); + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ucma_modify_qp_err(id); + return (ret >= 0) ? ERR(ENODATA) : -1; +@@ -1684,7 +1690,7 @@ int rdma_reject(struct rdma_cm_id *id, const void *private_data, + cmd.private_data_len = private_data_len; + } + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1702,7 +1708,7 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + cmd.event = event; +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1735,7 +1741,7 @@ int rdma_disconnect(struct rdma_cm_id *id) + id_priv = container_of(id, struct cma_id_private, id); + cmd.id = id_priv->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -1778,7 +1784,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, + cmd.uid = (uintptr_t) mc; + cmd.reserved = 0; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err2; +@@ -1791,7 +1797,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, + memcpy(&cmd.addr, addr, addrlen); + cmd.uid = (uintptr_t) mc; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err2; +@@ -1857,7 +1863,7 @@ int rdma_leave_multicast(struct rdma_cm_id *id, struct sockaddr *addr) + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, LEAVE_MCAST, &resp, sizeof resp); + cmd.id = mc->handle; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto free; +@@ -2019,7 +2025,7 @@ static int ucma_process_conn_resp(struct cma_id_private *id_priv) + CMA_INIT_CMD(&cmd, sizeof cmd, ACCEPT); + cmd.id = id_priv->handle; + +- ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + ret = (ret >= 0) ? ERR(ENODATA) : -1; + goto err; +@@ -2103,7 +2109,7 @@ int rdma_get_cm_event(struct rdma_event_channel *channel, + retry: + memset(evt, 0, sizeof(*evt)); + CMA_INIT_CMD_RESP(&cmd, sizeof cmd, GET_EVENT, &resp, sizeof resp); +- ret = write(channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + free(evt); + return (ret >= 0) ? ERR(ENODATA) : -1; +@@ -2274,7 +2280,7 @@ int rdma_set_option(struct rdma_cm_id *id, int level, int optname, + cmd.optname = optname; + cmd.optlen = optlen; + +- ret = write(id->channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) + return (ret >= 0) ? ERR(ENODATA) : -1; + +@@ -2302,7 +2308,7 @@ int rdma_migrate_id(struct rdma_cm_id *id, struct rdma_event_channel *channel) + cmd.id = id_priv->handle; + cmd.fd = id->channel->fd; + +- ret = write(channel->fd, &cmd, sizeof cmd); ++ ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); + if (ret != sizeof cmd) { + if (sync) + rdma_destroy_event_channel(channel); +diff --git a/src/rsocket.c b/src/rsocket.c +index c4f1b57..faa3d3a 100644 +--- a/src/rsocket.c ++++ b/src/rsocket.c +@@ -382,6 +382,11 @@ struct rsocket { + dlist_entry iomap_queue; + int iomap_pending; + int unack_cqe; ++#ifdef DMM_RSOCKET ++ int rr_epoll_ref; ++ int rr_epoll_fd; ++ void *rr_epoll_pdata; ++#endif + }; + + #define DS_UDP_TAG 0x55555555 +@@ -404,6 +409,12 @@ struct ds_udp_header { + + #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list) + ++#ifdef DMM_RSOCKET ++#include "rsocket_in.h" ++#else ++#define GSAPI(name) name ++#endif ++ + static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) + { + if (!rs->qp_list) +@@ -444,16 +455,16 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) + msg.cmd = cmd; + msg.status = EINVAL; + msg.rs = rs; +- write(svc->sock[0], &msg, sizeof msg); +- read(svc->sock[0], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[0], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[0], &msg, sizeof msg); + ret = rdma_seterrno(msg.status); + if (svc->cnt) + goto unlock; + + pthread_join(svc->id, NULL); + closepair: +- close(svc->sock[0]); +- close(svc->sock[1]); ++ GSAPI(close)(svc->sock[0]); ++ GSAPI(close)(svc->sock[1]); + unlock: + pthread_mutex_unlock(&mut); + return ret; +@@ -607,6 +618,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) + fastlock_init(&rs->map_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); ++#ifdef DMM_RSOCKET ++ rr_rs_init(rs); ++#endif + return rs; + } + +@@ -617,16 +631,16 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg) + + if (rs->type == SOCK_STREAM) { + if (rs->cm_id->recv_cq_channel) +- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); + + if (!ret && rs->state < rs_connected) +- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, arg); + } else { +- ret = fcntl(rs->epfd, F_SETFL, arg); ++ ret = GSAPI(fcntl)(rs->epfd, F_SETFL, arg); + if (!ret && rs->qp_list) { + qp = rs->qp_list; + do { +- ret = fcntl(qp->cm_id->recv_cq_channel->fd, ++ ret = GSAPI(fcntl)(qp->cm_id->recv_cq_channel->fd, + F_SETFL, arg); + qp = ds_next_qp(qp); + } while (qp != rs->qp_list && !ret); +@@ -767,7 +781,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) + goto err1; + + if (rs->fd_flags & O_NONBLOCK) { +- if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) ++ if (GSAPI(fcntl)(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) + goto err2; + } + +@@ -918,7 +932,7 @@ static void ds_free_qp(struct ds_qp *qp) + if (qp->cm_id) { + if (qp->cm_id->qp) { + tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); +- epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, ++ GSAPI(epoll_ctl)(qp->rs->epfd, EPOLL_CTL_DEL, + qp->cm_id->recv_cq_channel->fd, NULL); + rdma_destroy_qp(qp->cm_id); + } +@@ -932,8 +946,12 @@ static void ds_free(struct rsocket *rs) + { + struct ds_qp *qp; + ++#ifdef DMM_RSOCKET ++ rr_rs_dest(rs); ++#endif ++ + if (rs->udp_sock >= 0) +- close(rs->udp_sock); ++ GSAPI(close)(rs->udp_sock); + + if (rs->index >= 0) + rs_remove(rs); +@@ -947,7 +965,7 @@ static void ds_free(struct rsocket *rs) + } + + if (rs->epfd >= 0) +- close(rs->epfd); ++ GSAPI(close)(rs->epfd); + + if (rs->sbuf) + free(rs->sbuf); +@@ -968,6 +986,10 @@ static void rs_free(struct rsocket *rs) + return; + } + ++#ifdef DMM_RSOCKET ++ rr_rs_dest(rs); ++#endif ++ + if (rs->rmsg) + free(rs->rmsg); + +@@ -1059,11 +1081,11 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) + + static int ds_init(struct rsocket *rs, int domain) + { +- rs->udp_sock = socket(domain, SOCK_DGRAM, 0); ++ rs->udp_sock = GSAPI(socket)(domain, SOCK_DGRAM, 0); + if (rs->udp_sock < 0) + return rs->udp_sock; + +- rs->epfd = epoll_create(2); ++ rs->epfd = GSAPI(epoll_create)(2); + if (rs->epfd < 0) + return rs->epfd; + +@@ -1164,7 +1186,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) + if (ret) + return ret; + } +- ret = bind(rs->udp_sock, addr, addrlen); ++ ret = GSAPI(bind)(rs->udp_sock, addr, addrlen); + } + return ret; + } +@@ -1229,8 +1251,11 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) + goto err; + } + +- if (rs->fd_flags & O_NONBLOCK) +- fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); ++ if (rs->fd_flags & O_NONBLOCK) { ++ ret = GSAPI(fcntl)(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); ++ if (0 == ret) ++ new_rs->fd_flags |= O_NONBLOCK; ++ } + + ret = rs_create_ep(new_rs); + if (ret) +@@ -1354,7 +1379,7 @@ connected: + break; + case rs_accepting: + if (!(rs->fd_flags & O_NONBLOCK)) +- fcntl(rs->cm_id->channel->fd, F_SETFL, 0); ++ GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, 0); + + ret = ucma_complete(rs->cm_id); + if (ret) +@@ -1375,6 +1400,10 @@ connected: + rs->err = errno; + } + } ++#ifdef DMM_RSOCKET ++ rr_rs_connected(rs); ++#endif ++ + return ret; + } + +@@ -1397,24 +1426,24 @@ static int ds_get_src_addr(struct rsocket *rs, + uint16_t port; + + *src_len = sizeof(*src_addr); +- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); ++ ret = GSAPI(getsockname)(rs->udp_sock, &src_addr->sa, src_len); + if (ret || !rs_any_addr(src_addr)) + return ret; + + port = src_addr->sin.sin_port; +- sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0); ++ sock = GSAPI(socket)(dest_addr->sa_family, SOCK_DGRAM, 0); + if (sock < 0) + return sock; + +- ret = connect(sock, dest_addr, dest_len); ++ ret = GSAPI(connect)(sock, dest_addr, dest_len); + if (ret) + goto out; + + *src_len = sizeof(*src_addr); +- ret = getsockname(sock, &src_addr->sa, src_len); ++ ret = GSAPI(getsockname)(sock, &src_addr->sa, src_len); + src_addr->sin.sin_port = port; + out: +- close(sock); ++ GSAPI(close)(sock); + return ret; + } + +@@ -1512,7 +1541,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, + + event.events = EPOLLIN; + event.data.ptr = qp; +- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, ++ ret = GSAPI(epoll_ctl)(rs->epfd, EPOLL_CTL_ADD, + qp->cm_id->recv_cq_channel->fd, &event); + if (ret) + goto err; +@@ -1609,7 +1638,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) + } + + fastlock_acquire(&rs->slock); +- ret = connect(rs->udp_sock, addr, addrlen); ++ ret = GSAPI(connect)(rs->udp_sock, addr, addrlen); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + fastlock_release(&rs->slock); +@@ -1903,12 +1932,18 @@ static int rs_poll_cq(struct rsocket *rs) + case RS_OP_CTRL: + if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) { + rs->state = rs_disconnected; ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return 0; + } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { + if (rs->state & rs_writable) { + rs->state &= ~rs_readable; + } else { + rs->state = rs_disconnected; ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return 0; + } + } +@@ -1959,6 +1994,9 @@ static int rs_poll_cq(struct rsocket *rs) + rs->err = errno; + } + } ++#ifdef DMM_RSOCKET ++ rr_rs_notify_tcp(rs); ++#endif + return ret; + } + +@@ -1977,7 +2015,11 @@ static int rs_get_cq_event(struct rsocket *rs) + ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe); + rs->unack_cqe = 0; + } ++#ifdef DMM_RSOCKET ++ __sync_fetch_and_sub(&rs->cq_armed, 1); ++#else + rs->cq_armed = 0; ++#endif + } else if (!(errno == EAGAIN || errno == EINTR)) { + rs->state = rs_error; + } +@@ -2015,8 +2057,13 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs + } else if (nonblock) { + ret = ERR(EWOULDBLOCK); + } else if (!rs->cq_armed) { ++#ifdef DMM_RSOCKET ++ if (0 == ibv_req_notify_cq(rs->cm_id->recv_cq, 0)) ++ __sync_fetch_and_add(&rs->cq_armed, 1); ++#else + ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + rs->cq_armed = 1; ++#endif + } else { + rs_update_credits(rs); + fastlock_acquire(&rs->cq_wait_lock); +@@ -2116,11 +2163,19 @@ static void ds_poll_cqs(struct rsocket *rs) + qp = ds_next_qp(qp); + if (!rs->rqe_avail && rs->sqe_avail) { + rs->qp_list = qp; ++#ifdef DMM_RSOCKET ++ goto END; ++#else + return; ++#endif + } + cnt++; + } while (qp != rs->qp_list); + } while (cnt); ++#ifdef DMM_RSOCKET ++END: ++ rr_rs_notify_udp(rs); ++#endif + } + + static void ds_req_notify_cqs(struct rsocket *rs) +@@ -2150,7 +2205,7 @@ static int ds_get_cq_event(struct rsocket *rs) + if (!rs->cq_armed) + return 0; + +- ret = epoll_wait(rs->epfd, &event, 1, -1); ++ ret = GSAPI(epoll_wait)(rs->epfd, &event, 1, -1); + if (ret <= 0) + return ret; + +@@ -2607,7 +2662,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, + msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; +- ret = sendmsg(rs->udp_sock, &msg, flags); ++ ret = GSAPI(sendmsg)(rs->udp_sock, &msg, flags); + return ret > 0 ? ret - hdr.length : ret; + } + +@@ -2958,7 +3013,7 @@ check_cq: + fds.fd = rs->cm_id->channel->fd; + fds.events = events; + fds.revents = 0; +- poll(&fds, 1, 0); ++ GSAPI(poll)(&fds, 1, 0); + return fds.revents; + } + +@@ -2994,7 +3049,7 @@ static int rs_poll_check(struct pollfd *fds, nfds_t nfds) + if (rs) + fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); + else +- poll(&fds[i], 1, 0); ++ GSAPI(poll)(&fds[i], 1, 0); + + if (fds[i].revents) + cnt++; +@@ -3094,7 +3149,7 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) + if (ret) + break; + +- ret = poll(rfds, nfds, timeout); ++ ret = GSAPI(poll)(rfds, nfds, timeout); + if (ret <= 0) + break; + +@@ -3314,7 +3369,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) + rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); + return 0; + } else { +- return getpeername(rs->udp_sock, addr, addrlen); ++ return GSAPI(getpeername)(rs->udp_sock, addr, addrlen); + } + } + +@@ -3329,7 +3384,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) + rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); + return 0; + } else { +- return getsockname(rs->udp_sock, addr, addrlen); ++ return GSAPI(getsockname)(rs->udp_sock, addr, addrlen); + } + } + +@@ -3371,7 +3426,7 @@ int rsetsockopt(int socket, int level, int optname, + if (!rs) + return ERR(EBADF); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { +- ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); ++ ret = GSAPI(setsockopt)(rs->udp_sock, level, optname, optval, optlen); + if (ret) + return ret; + } +@@ -3985,7 +4040,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) + { + struct rs_svc_msg msg; + +- read(svc->sock[1], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_DGRAM: + msg.status = rs_svc_add_rs(svc, msg.rs); +@@ -4009,7 +4064,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) + break; + } + +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + } + + static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) +@@ -4137,7 +4192,7 @@ static void udp_svc_process_rs(struct rsocket *rs) + socklen_t addrlen = sizeof addr; + int len, ret; + +- ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); ++ ret = GSAPI(recvfrom)(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); + if (ret < DS_UDP_IPV4_HDR_LEN) + return; + +@@ -4184,7 +4239,7 @@ static void *udp_svc_run(void *arg) + ret = rs_svc_grow_sets(svc, 4); + if (ret) { + msg.status = ret; +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + +@@ -4195,7 +4250,7 @@ static void *udp_svc_run(void *arg) + for (i = 0; i <= svc->cnt; i++) + udp_svc_fds[i].revents = 0; + +- poll(udp_svc_fds, svc->cnt + 1, -1); ++ GSAPI(poll)(udp_svc_fds, svc->cnt + 1, -1); + if (udp_svc_fds[0].revents) + udp_svc_process_sock(svc); + +@@ -4222,7 +4277,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) + struct rs_svc_msg msg; + int i; + +- read(svc->sock[1], &msg, sizeof msg); ++ GSAPI(read)(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_KEEPALIVE: + msg.status = rs_svc_add_rs(svc, msg.rs); +@@ -4253,7 +4308,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) + default: + break; + } +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + } + + /* +@@ -4282,7 +4337,7 @@ static void *tcp_svc_run(void *arg) + ret = rs_svc_grow_sets(svc, 16); + if (ret) { + msg.status = ret; +- write(svc->sock[1], &msg, sizeof msg); ++ GSAPI(write)(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + +@@ -4291,7 +4346,7 @@ static void *tcp_svc_run(void *arg) + fds.events = POLLIN; + timeout = -1; + do { +- poll(&fds, 1, timeout * 1000); ++ GSAPI(poll)(&fds, 1, timeout * 1000); + if (fds.revents) + tcp_svc_process_sock(svc); + +@@ -4311,3 +4366,8 @@ static void *tcp_svc_run(void *arg) + + return NULL; + } ++ ++#ifdef DMM_RSOCKET ++#include "rsocket_rs.c" ++#endif ++ |