diff --git a/Makefile.am b/Makefile.am index bf72134..cb02f56 100644 --- a/Makefile.am +++ b/Makefile.am @@ -22,6 +22,12 @@ src_librdmacm_la_LDFLAGS = -version-info 1 -export-dynamic \ $(librdmacm_version_script) src_librdmacm_la_DEPENDENCIES = $(srcdir)/src/librdmacm.map +noinst_LIBRARIES = libdmm_rdmacm.a +libdmm_rdmacm_a_CFLAGS = $(src_librdmacm_la_CFLAGS) -O2 -fPIC -m64 \ + -I@dmm_inc_dir@ -I$(srcdir)/../src -DDMM_RSOCKET=1 -DRSOCKET_DEBUG=@RSOCKET_DEBUG@ +libdmm_rdmacm_a_LDFLAGS = +libdmm_rdmacm_a_SOURCES = $(src_librdmacm_la_SOURCES) + src_librspreload_la_SOURCES = src/preload.c src/indexer.c src_librspreload_la_LDFLAGS = -version-info 1 -export-dynamic src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la diff --git a/configure.ac b/configure.ac index 4a43995..98601ea 100644 --- a/configure.ac +++ b/configure.ac @@ -104,5 +104,12 @@ if test "x$rdmadir" = "x"; then AC_SUBST(rdmadir, rdma) fi +AC_ARG_VAR(dmm_inc_dir, [Directory for dmm include files]) +if test "x$dmm_inc_dir" = "x"; then + AC_MSG_ERROR([must set dmm_inc_dir]) +fi + +AC_ARG_VAR(RSOCKET_DEBUG, [rsocket debug flag 0 or 1]) + AC_CONFIG_FILES([Makefile librdmacm.spec]) AC_OUTPUT diff --git a/src/cma.c b/src/cma.c index a89e663..071222d 100644 --- a/src/cma.c +++ b/src/cma.c @@ -60,6 +60,12 @@ #include #include +#ifdef DMM_RSOCKET +#include "rsocket_rdma.h" +#else +#define GSAPI(name) name +#endif + #define CMA_INIT_CMD(req, req_size, op) \ do { \ memset(req, 0, req_size); \ @@ -564,7 +570,7 @@ static int rdma_create_id2(struct rdma_event_channel *channel, cmd.ps = ps; cmd.qp_type = qp_type; - ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) goto err; @@ -599,7 +605,7 @@ static int ucma_destroy_kern_id(int fd, uint32_t handle) CMA_INIT_CMD_RESP(&cmd, sizeof cmd, DESTROY_ID, &resp, sizeof resp); cmd.id = handle; - ret = write(fd, &cmd, sizeof cmd); + ret = GSAPI(write)(fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -659,7 +665,7 @@ static int ucma_query_addr(struct rdma_cm_id *id) cmd.id = id_priv->handle; cmd.option = UCMA_QUERY_ADDR; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -692,7 +698,7 @@ static int ucma_query_gid(struct rdma_cm_id *id) cmd.id = id_priv->handle; cmd.option = UCMA_QUERY_GID; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -753,7 +759,7 @@ static int ucma_query_path(struct rdma_cm_id *id) cmd.id = id_priv->handle; cmd.option = UCMA_QUERY_PATH; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -784,7 +790,7 @@ static int ucma_query_route(struct rdma_cm_id *id) id_priv = container_of(id, struct cma_id_private, id); cmd.id = id_priv->handle; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -835,7 +841,7 @@ static int rdma_bind_addr2(struct rdma_cm_id *id, struct sockaddr *addr, cmd.addr_size = addrlen; memcpy(&cmd.addr, addr, addrlen); - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -863,7 +869,7 @@ int rdma_bind_addr(struct rdma_cm_id *id, struct sockaddr *addr) cmd.id = id_priv->handle; memcpy(&cmd.addr, addr, addrlen); - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -916,7 +922,7 @@ static int rdma_resolve_addr2(struct rdma_cm_id *id, struct sockaddr *src_addr, cmd.dst_size = dst_len; cmd.timeout_ms = timeout_ms; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -951,7 +957,7 @@ int rdma_resolve_addr(struct rdma_cm_id *id, struct sockaddr *src_addr, memcpy(&cmd.dst_addr, dst_addr, dst_len); cmd.timeout_ms = timeout_ms; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1003,7 +1009,7 @@ int rdma_resolve_route(struct rdma_cm_id *id, int timeout_ms) cmd.id = id_priv->handle; cmd.timeout_ms = timeout_ms; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1029,7 +1035,7 @@ static int rdma_init_qp_attr(struct rdma_cm_id *id, struct ibv_qp_attr *qp_attr, cmd.id = id_priv->handle; cmd.qp_state = qp_attr->qp_state; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1530,7 +1536,7 @@ int rdma_connect(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) conn_param, 0, 0); } - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1553,7 +1559,7 @@ int rdma_listen(struct rdma_cm_id *id, int backlog) cmd.id = id_priv->handle; cmd.backlog = backlog; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1656,7 +1662,7 @@ int rdma_accept(struct rdma_cm_id *id, struct rdma_conn_param *conn_param) conn_param, conn_param->qp_num, conn_param->srq); - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { ucma_modify_qp_err(id); return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1684,7 +1690,7 @@ int rdma_reject(struct rdma_cm_id *id, const void *private_data, cmd.private_data_len = private_data_len; } - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1702,7 +1708,7 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event) id_priv = container_of(id, struct cma_id_private, id); cmd.id = id_priv->handle; cmd.event = event; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1735,7 +1741,7 @@ int rdma_disconnect(struct rdma_cm_id *id) id_priv = container_of(id, struct cma_id_private, id); cmd.id = id_priv->handle; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -1778,7 +1784,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, cmd.uid = (uintptr_t) mc; cmd.reserved = 0; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { ret = (ret >= 0) ? ERR(ENODATA) : -1; goto err2; @@ -1791,7 +1797,7 @@ static int rdma_join_multicast2(struct rdma_cm_id *id, struct sockaddr *addr, memcpy(&cmd.addr, addr, addrlen); cmd.uid = (uintptr_t) mc; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { ret = (ret >= 0) ? ERR(ENODATA) : -1; goto err2; @@ -1857,7 +1863,7 @@ int rdma_leave_multicast(struct rdma_cm_id *id, struct sockaddr *addr) CMA_INIT_CMD_RESP(&cmd, sizeof cmd, LEAVE_MCAST, &resp, sizeof resp); cmd.id = mc->handle; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { ret = (ret >= 0) ? ERR(ENODATA) : -1; goto free; @@ -2019,7 +2025,7 @@ static int ucma_process_conn_resp(struct cma_id_private *id_priv) CMA_INIT_CMD(&cmd, sizeof cmd, ACCEPT); cmd.id = id_priv->handle; - ret = write(id_priv->id.channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id_priv->id.channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { ret = (ret >= 0) ? ERR(ENODATA) : -1; goto err; @@ -2103,7 +2109,7 @@ int rdma_get_cm_event(struct rdma_event_channel *channel, retry: memset(evt, 0, sizeof(*evt)); CMA_INIT_CMD_RESP(&cmd, sizeof cmd, GET_EVENT, &resp, sizeof resp); - ret = write(channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { free(evt); return (ret >= 0) ? ERR(ENODATA) : -1; @@ -2274,7 +2280,7 @@ int rdma_set_option(struct rdma_cm_id *id, int level, int optname, cmd.optname = optname; cmd.optlen = optlen; - ret = write(id->channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(id->channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) return (ret >= 0) ? ERR(ENODATA) : -1; @@ -2302,7 +2308,7 @@ int rdma_migrate_id(struct rdma_cm_id *id, struct rdma_event_channel *channel) cmd.id = id_priv->handle; cmd.fd = id->channel->fd; - ret = write(channel->fd, &cmd, sizeof cmd); + ret = GSAPI(write)(channel->fd, &cmd, sizeof cmd); if (ret != sizeof cmd) { if (sync) rdma_destroy_event_channel(channel); diff --git a/src/rsocket.c b/src/rsocket.c index c4f1b57..faa3d3a 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -382,6 +382,11 @@ struct rsocket { dlist_entry iomap_queue; int iomap_pending; int unack_cqe; +#ifdef DMM_RSOCKET + int rr_epoll_ref; + int rr_epoll_fd; + void *rr_epoll_pdata; +#endif }; #define DS_UDP_TAG 0x55555555 @@ -404,6 +409,12 @@ struct ds_udp_header { #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list) +#ifdef DMM_RSOCKET +#include "rsocket_in.h" +#else +#define GSAPI(name) name +#endif + static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) { if (!rs->qp_list) @@ -444,16 +455,16 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) msg.cmd = cmd; msg.status = EINVAL; msg.rs = rs; - write(svc->sock[0], &msg, sizeof msg); - read(svc->sock[0], &msg, sizeof msg); + GSAPI(write)(svc->sock[0], &msg, sizeof msg); + GSAPI(read)(svc->sock[0], &msg, sizeof msg); ret = rdma_seterrno(msg.status); if (svc->cnt) goto unlock; pthread_join(svc->id, NULL); closepair: - close(svc->sock[0]); - close(svc->sock[1]); + GSAPI(close)(svc->sock[0]); + GSAPI(close)(svc->sock[1]); unlock: pthread_mutex_unlock(&mut); return ret; @@ -607,6 +618,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) fastlock_init(&rs->map_lock); dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); +#ifdef DMM_RSOCKET + rr_rs_init(rs); +#endif return rs; } @@ -617,16 +631,16 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg) if (rs->type == SOCK_STREAM) { if (rs->cm_id->recv_cq_channel) - ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); + ret = GSAPI(fcntl)(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); if (!ret && rs->state < rs_connected) - ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); + ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, arg); } else { - ret = fcntl(rs->epfd, F_SETFL, arg); + ret = GSAPI(fcntl)(rs->epfd, F_SETFL, arg); if (!ret && rs->qp_list) { qp = rs->qp_list; do { - ret = fcntl(qp->cm_id->recv_cq_channel->fd, + ret = GSAPI(fcntl)(qp->cm_id->recv_cq_channel->fd, F_SETFL, arg); qp = ds_next_qp(qp); } while (qp != rs->qp_list && !ret); @@ -767,7 +781,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) goto err1; if (rs->fd_flags & O_NONBLOCK) { - if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) + if (GSAPI(fcntl)(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) goto err2; } @@ -918,7 +932,7 @@ static void ds_free_qp(struct ds_qp *qp) if (qp->cm_id) { if (qp->cm_id->qp) { tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); - epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, + GSAPI(epoll_ctl)(qp->rs->epfd, EPOLL_CTL_DEL, qp->cm_id->recv_cq_channel->fd, NULL); rdma_destroy_qp(qp->cm_id); } @@ -932,8 +946,12 @@ static void ds_free(struct rsocket *rs) { struct ds_qp *qp; +#ifdef DMM_RSOCKET + rr_rs_dest(rs); +#endif + if (rs->udp_sock >= 0) - close(rs->udp_sock); + GSAPI(close)(rs->udp_sock); if (rs->index >= 0) rs_remove(rs); @@ -947,7 +965,7 @@ static void ds_free(struct rsocket *rs) } if (rs->epfd >= 0) - close(rs->epfd); + GSAPI(close)(rs->epfd); if (rs->sbuf) free(rs->sbuf); @@ -968,6 +986,10 @@ static void rs_free(struct rsocket *rs) return; } +#ifdef DMM_RSOCKET + rr_rs_dest(rs); +#endif + if (rs->rmsg) free(rs->rmsg); @@ -1059,11 +1081,11 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) static int ds_init(struct rsocket *rs, int domain) { - rs->udp_sock = socket(domain, SOCK_DGRAM, 0); + rs->udp_sock = GSAPI(socket)(domain, SOCK_DGRAM, 0); if (rs->udp_sock < 0) return rs->udp_sock; - rs->epfd = epoll_create(2); + rs->epfd = GSAPI(epoll_create)(2); if (rs->epfd < 0) return rs->epfd; @@ -1164,7 +1186,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) if (ret) return ret; } - ret = bind(rs->udp_sock, addr, addrlen); + ret = GSAPI(bind)(rs->udp_sock, addr, addrlen); } return ret; } @@ -1229,8 +1251,11 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) goto err; } - if (rs->fd_flags & O_NONBLOCK) - fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (rs->fd_flags & O_NONBLOCK) { + ret = GSAPI(fcntl)(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (0 == ret) + new_rs->fd_flags |= O_NONBLOCK; + } ret = rs_create_ep(new_rs); if (ret) @@ -1354,7 +1379,7 @@ connected: break; case rs_accepting: if (!(rs->fd_flags & O_NONBLOCK)) - fcntl(rs->cm_id->channel->fd, F_SETFL, 0); + GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, 0); ret = ucma_complete(rs->cm_id); if (ret) @@ -1375,6 +1400,10 @@ connected: rs->err = errno; } } +#ifdef DMM_RSOCKET + rr_rs_connected(rs); +#endif + return ret; } @@ -1397,24 +1426,24 @@ static int ds_get_src_addr(struct rsocket *rs, uint16_t port; *src_len = sizeof(*src_addr); - ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); + ret = GSAPI(getsockname)(rs->udp_sock, &src_addr->sa, src_len); if (ret || !rs_any_addr(src_addr)) return ret; port = src_addr->sin.sin_port; - sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0); + sock = GSAPI(socket)(dest_addr->sa_family, SOCK_DGRAM, 0); if (sock < 0) return sock; - ret = connect(sock, dest_addr, dest_len); + ret = GSAPI(connect)(sock, dest_addr, dest_len); if (ret) goto out; *src_len = sizeof(*src_addr); - ret = getsockname(sock, &src_addr->sa, src_len); + ret = GSAPI(getsockname)(sock, &src_addr->sa, src_len); src_addr->sin.sin_port = port; out: - close(sock); + GSAPI(close)(sock); return ret; } @@ -1512,7 +1541,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, event.events = EPOLLIN; event.data.ptr = qp; - ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, + ret = GSAPI(epoll_ctl)(rs->epfd, EPOLL_CTL_ADD, qp->cm_id->recv_cq_channel->fd, &event); if (ret) goto err; @@ -1609,7 +1638,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) } fastlock_acquire(&rs->slock); - ret = connect(rs->udp_sock, addr, addrlen); + ret = GSAPI(connect)(rs->udp_sock, addr, addrlen); if (!ret) ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); fastlock_release(&rs->slock); @@ -1903,12 +1932,18 @@ static int rs_poll_cq(struct rsocket *rs) case RS_OP_CTRL: if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; +#ifdef DMM_RSOCKET + rr_rs_notify_tcp(rs); +#endif return 0; } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { if (rs->state & rs_writable) { rs->state &= ~rs_readable; } else { rs->state = rs_disconnected; +#ifdef DMM_RSOCKET + rr_rs_notify_tcp(rs); +#endif return 0; } } @@ -1959,6 +1994,9 @@ static int rs_poll_cq(struct rsocket *rs) rs->err = errno; } } +#ifdef DMM_RSOCKET + rr_rs_notify_tcp(rs); +#endif return ret; } @@ -1977,7 +2015,11 @@ static int rs_get_cq_event(struct rsocket *rs) ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe); rs->unack_cqe = 0; } +#ifdef DMM_RSOCKET + __sync_fetch_and_sub(&rs->cq_armed, 1); +#else rs->cq_armed = 0; +#endif } else if (!(errno == EAGAIN || errno == EINTR)) { rs->state = rs_error; } @@ -2015,8 +2057,13 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs } else if (nonblock) { ret = ERR(EWOULDBLOCK); } else if (!rs->cq_armed) { +#ifdef DMM_RSOCKET + if (0 == ibv_req_notify_cq(rs->cm_id->recv_cq, 0)) + __sync_fetch_and_add(&rs->cq_armed, 1); +#else ibv_req_notify_cq(rs->cm_id->recv_cq, 0); rs->cq_armed = 1; +#endif } else { rs_update_credits(rs); fastlock_acquire(&rs->cq_wait_lock); @@ -2116,11 +2163,19 @@ static void ds_poll_cqs(struct rsocket *rs) qp = ds_next_qp(qp); if (!rs->rqe_avail && rs->sqe_avail) { rs->qp_list = qp; +#ifdef DMM_RSOCKET + goto END; +#else return; +#endif } cnt++; } while (qp != rs->qp_list); } while (cnt); +#ifdef DMM_RSOCKET +END: + rr_rs_notify_udp(rs); +#endif } static void ds_req_notify_cqs(struct rsocket *rs) @@ -2150,7 +2205,7 @@ static int ds_get_cq_event(struct rsocket *rs) if (!rs->cq_armed) return 0; - ret = epoll_wait(rs->epfd, &event, 1, -1); + ret = GSAPI(epoll_wait)(rs->epfd, &event, 1, -1); if (ret <= 0) return ret; @@ -2607,7 +2662,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); msg.msg_iov = miov; msg.msg_iovlen = iovcnt + 1; - ret = sendmsg(rs->udp_sock, &msg, flags); + ret = GSAPI(sendmsg)(rs->udp_sock, &msg, flags); return ret > 0 ? ret - hdr.length : ret; } @@ -2958,7 +3013,7 @@ check_cq: fds.fd = rs->cm_id->channel->fd; fds.events = events; fds.revents = 0; - poll(&fds, 1, 0); + GSAPI(poll)(&fds, 1, 0); return fds.revents; } @@ -2994,7 +3049,7 @@ static int rs_poll_check(struct pollfd *fds, nfds_t nfds) if (rs) fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); else - poll(&fds[i], 1, 0); + GSAPI(poll)(&fds[i], 1, 0); if (fds[i].revents) cnt++; @@ -3094,7 +3149,7 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) if (ret) break; - ret = poll(rfds, nfds, timeout); + ret = GSAPI(poll)(rfds, nfds, timeout); if (ret <= 0) break; @@ -3314,7 +3369,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); return 0; } else { - return getpeername(rs->udp_sock, addr, addrlen); + return GSAPI(getpeername)(rs->udp_sock, addr, addrlen); } } @@ -3329,7 +3384,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); return 0; } else { - return getsockname(rs->udp_sock, addr, addrlen); + return GSAPI(getsockname)(rs->udp_sock, addr, addrlen); } } @@ -3371,7 +3426,7 @@ int rsetsockopt(int socket, int level, int optname, if (!rs) return ERR(EBADF); if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { - ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); + ret = GSAPI(setsockopt)(rs->udp_sock, level, optname, optval, optlen); if (ret) return ret; } @@ -3985,7 +4040,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) { struct rs_svc_msg msg; - read(svc->sock[1], &msg, sizeof msg); + GSAPI(read)(svc->sock[1], &msg, sizeof msg); switch (msg.cmd) { case RS_SVC_ADD_DGRAM: msg.status = rs_svc_add_rs(svc, msg.rs); @@ -4009,7 +4064,7 @@ static void udp_svc_process_sock(struct rs_svc *svc) break; } - write(svc->sock[1], &msg, sizeof msg); + GSAPI(write)(svc->sock[1], &msg, sizeof msg); } static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) @@ -4137,7 +4192,7 @@ static void udp_svc_process_rs(struct rsocket *rs) socklen_t addrlen = sizeof addr; int len, ret; - ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); + ret = GSAPI(recvfrom)(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); if (ret < DS_UDP_IPV4_HDR_LEN) return; @@ -4184,7 +4239,7 @@ static void *udp_svc_run(void *arg) ret = rs_svc_grow_sets(svc, 4); if (ret) { msg.status = ret; - write(svc->sock[1], &msg, sizeof msg); + GSAPI(write)(svc->sock[1], &msg, sizeof msg); return (void *) (uintptr_t) ret; } @@ -4195,7 +4250,7 @@ static void *udp_svc_run(void *arg) for (i = 0; i <= svc->cnt; i++) udp_svc_fds[i].revents = 0; - poll(udp_svc_fds, svc->cnt + 1, -1); + GSAPI(poll)(udp_svc_fds, svc->cnt + 1, -1); if (udp_svc_fds[0].revents) udp_svc_process_sock(svc); @@ -4222,7 +4277,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) struct rs_svc_msg msg; int i; - read(svc->sock[1], &msg, sizeof msg); + GSAPI(read)(svc->sock[1], &msg, sizeof msg); switch (msg.cmd) { case RS_SVC_ADD_KEEPALIVE: msg.status = rs_svc_add_rs(svc, msg.rs); @@ -4253,7 +4308,7 @@ static void tcp_svc_process_sock(struct rs_svc *svc) default: break; } - write(svc->sock[1], &msg, sizeof msg); + GSAPI(write)(svc->sock[1], &msg, sizeof msg); } /* @@ -4282,7 +4337,7 @@ static void *tcp_svc_run(void *arg) ret = rs_svc_grow_sets(svc, 16); if (ret) { msg.status = ret; - write(svc->sock[1], &msg, sizeof msg); + GSAPI(write)(svc->sock[1], &msg, sizeof msg); return (void *) (uintptr_t) ret; } @@ -4291,7 +4346,7 @@ static void *tcp_svc_run(void *arg) fds.events = POLLIN; timeout = -1; do { - poll(&fds, 1, timeout * 1000); + GSAPI(poll)(&fds, 1, timeout * 1000); if (fds.revents) tcp_svc_process_sock(svc); @@ -4311,3 +4366,8 @@ static void *tcp_svc_run(void *arg) return NULL; } + +#ifdef DMM_RSOCKET +#include "rsocket_rs.c" +#endif +