diff options
Diffstat (limited to 'stacks/rsocket/src/rsocket_rs.c')
-rw-r--r-- | stacks/rsocket/src/rsocket_rs.c | 457 |
1 files changed, 248 insertions, 209 deletions
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c index ca92c3d..0f4e73f 100644 --- a/stacks/rsocket/src/rsocket_rs.c +++ b/stacks/rsocket/src/rsocket_rs.c @@ -17,338 +17,377 @@ #ifndef _RSOCKET_RS_C_ #define _RSOCKET_RS_C_ - -inline static void rr_rs_init(struct rsocket *rs) +inline static void +rr_rs_init (struct rsocket *rs) { - RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } -inline static void rr_rs_dest(struct rsocket *rs) +inline static void +rr_rs_dest (struct rsocket *rs) { - RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); - - if (rs->rr_epoll_ref) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); + + if (rs->rr_epoll_ref) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } } #ifndef POLL__RSOCKET_RS_H_ #define POLL__RSOCKET_RS_H_ -static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_tcp (struct rsocket *rs) { - uint32_t events = 0; - if (rs->state & rs_connected) { - if (rs_have_rdata(rs)) - events |= EPOLLIN; - if (rs_can_send(rs)) - events |= EPOLLOUT; + uint32_t events = 0; + if (rs->state & rs_connected) + { + if (rs_have_rdata (rs)) + events |= EPOLLIN; + if (rs_can_send (rs)) + events |= EPOLLOUT; } - if (rs->state & (rs_error | rs_connect_error)) - events |= EPOLLERR; - if (rs->state & rs_disconnected) - events |= EPOLLHUP; - return events; + if (rs->state & (rs_error | rs_connect_error)) + events |= EPOLLERR; + if (rs->state & rs_disconnected) + events |= EPOLLHUP; + return events; } -static inline uint32_t rr_rs_poll_udp(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_udp (struct rsocket *rs) { - uint32_t events = 0; - if (rs_have_rdata(rs)) - events |= EPOLLIN; - if (ds_can_send(rs)) - events |= EPOLLOUT; - if (rs->state & rs_error) - events |= EPOLLERR; - return events; + uint32_t events = 0; + if (rs_have_rdata (rs)) + events |= EPOLLIN; + if (ds_can_send (rs)) + events |= EPOLLOUT; + if (rs->state & rs_error) + events |= EPOLLERR; + return events; } -static inline uint32_t rr_rs_poll_both(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_both (struct rsocket *rs) { - if (rs->type == SOCK_STREAM) - return rr_rs_poll_tcp(rs); + if (rs->type == SOCK_STREAM) + return rr_rs_poll_tcp (rs); - if (rs->type == SOCK_DGRAM) - return rr_rs_poll_udp(rs); + if (rs->type == SOCK_DGRAM) + return rr_rs_poll_udp (rs); - return 0; + return 0; } -uint32_t rr_rs_poll(int fd, uint32_t revents) +uint32_t +rr_rs_poll (int fd, uint32_t revents) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - if (!rs) - return 0; + if (!rs) + return 0; - if (rs->state == rs_listening) - return revents; + if (rs->state == rs_listening) + return revents; - return rr_rs_poll_both(rs); + return rr_rs_poll_both (rs); } -#endif/* #ifndef POLL__RSOCKET_RS_H_ */ - +#endif /* #ifndef POLL__RSOCKET_RS_H_ */ -static inline void rr_rs_notify_tcp(struct rsocket *rs) +static inline void +rr_rs_notify_tcp (struct rsocket *rs) { - if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_tcp(rs); - if (events) - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->rr_epoll_ref) + { + uint32_t events = rr_rs_poll_tcp (rs); + if (events) + (void) rr_notify_event (rs->rr_epoll_pdata, events); } } -static inline void rr_rs_notify_udp(struct rsocket *rs) +static inline void +rr_rs_notify_udp (struct rsocket *rs) { - if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_udp(rs); - if (events) - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->rr_epoll_ref) + { + uint32_t events = rr_rs_poll_udp (rs); + if (events) + (void) rr_notify_event (rs->rr_epoll_pdata, events); } } #ifndef HANDLE__RSOCKET_RS_H_ #define HANDLE__RSOCKET_RS_H_ -inline static void rr_rs_handle_tcp(struct rsocket *rs) +inline static void +rr_rs_handle_tcp (struct rsocket *rs) { - int ret; + int ret; - RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state); + RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state); - if (!(rs->state & (rs_connected | rs_opening))) - return; + if (!(rs->state & (rs_connected | rs_opening))) + return; - fastlock_acquire(&rs->cq_wait_lock); - ret = rs_get_cq_event(rs); - RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); - fastlock_release(&rs->cq_wait_lock); + fastlock_acquire (&rs->cq_wait_lock); + ret = rs_get_cq_event (rs); + RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); + fastlock_release (&rs->cq_wait_lock); - fastlock_acquire(&rs->cq_lock); + fastlock_acquire (&rs->cq_lock); - if (rs->state & rs_connected) { - rs_update_credits(rs); - ret = rs_poll_cq(rs); - RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", - rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); + if (rs->state & rs_connected) + { + rs_update_credits (rs); + ret = rs_poll_cq (rs); + RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", + rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); } - if (rs->rr_epoll_ref && rs->cq_armed < 1) { - ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0); - RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); - if (0 == ret) - __sync_fetch_and_add(&rs->cq_armed, 1); + if (rs->rr_epoll_ref && rs->cq_armed < 1) + { + ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0); + RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); + if (0 == ret) + __sync_fetch_and_add (&rs->cq_armed, 1); } - if (rs->state & rs_connected) { - ret = rs_poll_cq(rs); - RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); - rs_update_credits(rs); + if (rs->state & rs_connected) + { + ret = rs_poll_cq (rs); + RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); + rs_update_credits (rs); } - fastlock_release(&rs->cq_lock); + fastlock_release (&rs->cq_lock); - RR_DBG("(%d)=\n", rs->index); + RR_DBG ("(%d)=\n", rs->index); } -inline static void rr_rs_handle_udp(struct rsocket *rs) +inline static void +rr_rs_handle_udp (struct rsocket *rs) { - fastlock_acquire(&rs->cq_wait_lock); - ds_get_cq_event(rs); - fastlock_release(&rs->cq_wait_lock); - - fastlock_acquire(&rs->cq_lock); - ds_poll_cqs(rs); - if (rs->rr_epoll_ref && !rs->cq_armed) { - ds_req_notify_cqs(rs); - rs->cq_armed = 1; + fastlock_acquire (&rs->cq_wait_lock); + ds_get_cq_event (rs); + fastlock_release (&rs->cq_wait_lock); + + fastlock_acquire (&rs->cq_lock); + ds_poll_cqs (rs); + if (rs->rr_epoll_ref && !rs->cq_armed) + { + ds_req_notify_cqs (rs); + rs->cq_armed = 1; } - fastlock_release(&rs->cq_lock); + fastlock_release (&rs->cq_lock); } -inline static void rr_rs_handle_rs(struct rsocket *rs) +inline static void +rr_rs_handle_rs (struct rsocket *rs) { - if (rs->state & rs_opening) { - int ret = rs_do_connect(rs); - RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); - return; + if (rs->state & rs_opening) + { + int ret = rs_do_connect (rs); + RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); + return; } - if (rs->type == SOCK_STREAM) { - rr_rs_handle_tcp(rs); + if (rs->type == SOCK_STREAM) + { + rr_rs_handle_tcp (rs); } - if (rs->type == SOCK_DGRAM) { - rr_rs_handle_udp(rs); + if (rs->type == SOCK_DGRAM) + { + rr_rs_handle_udp (rs); } } -int rr_rs_handle(int fd, uint32_t events) +int +rr_rs_handle (int fd, uint32_t events) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); + RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); - if (!rs) - return _err(EBADF); + if (!rs) + return _err (EBADF); - if (rs->state == rs_listening) { - if (events & EPOLLIN) { - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->state == rs_listening) + { + if (events & EPOLLIN) + { + (void) rr_notify_event (rs->rr_epoll_pdata, events); } - return 0; + return 0; } - rr_rs_handle_rs(rs); + rr_rs_handle_rs (rs); - return 0; + return 0; } - -#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */ +#endif /* #ifndef HANDLE__RSOCKET_RS_H_ */ #ifndef ADPT__RSOCKET_RS_H_ #define ADPT__RSOCKET_RS_H_ -inline static int rr_rs_evfd(struct rsocket *rs) +inline static int +rr_rs_evfd (struct rsocket *rs) { - if (rs->type == SOCK_STREAM) { - if (rs->state >= rs_connected) - return rs->cm_id->recv_cq_channel->fd; - else - return rs->cm_id->channel->fd; - } else { - return rs->epfd; + if (rs->type == SOCK_STREAM) + { + if (rs->state >= rs_connected) + return rs->cm_id->recv_cq_channel->fd; + else + return rs->cm_id->channel->fd; + } + else + { + return rs->epfd; } - return -1; + return -1; } -int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent) +int +rr_rs_ep_add (int fd, void *pdata, uint32_t * revent) { - int ref; - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p),)\n", fd, rs); - if (!rs) - return _err(EBADF); - - ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1); - if (1 == ref) { - rs->rr_epoll_fd = rr_rs_evfd(rs); - (void)rr_ep_add(rs->rr_epoll_fd, rs->index); + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p),)\n", fd, rs); + if (!rs) + return _err (EBADF); + + ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1); + if (1 == ref) + { + rs->rr_epoll_fd = rr_rs_evfd (rs); + (void) rr_ep_add (rs->rr_epoll_fd, rs->index); } - (void)rr_rs_handle_rs(rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + (void) rr_rs_handle_rs (rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG("*revent=0x%x\n", *revent); - return 0; + RR_DBG ("*revent=0x%x\n", *revent); + return 0; } -int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent) +int +rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p),)\n", fd, rs); - if (!rs) - return _err(EBADF); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p),)\n", fd, rs); + if (!rs) + return _err (EBADF); - if (rs->rr_epoll_ref <= 0) - return _err(ENOENT); + if (rs->rr_epoll_ref <= 0) + return _err (ENOENT); - (void)rr_rs_handle_rs(rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + (void) rr_rs_handle_rs (rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG("*revent=0x%x\n", *revent); - return 0; + RR_DBG ("*revent=0x%x\n", *revent); + return 0; } -int rr_rs_ep_del(int fd) +int +rr_rs_ep_del (int fd) { - int ref; - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p))\n", fd, rs); - - if (!rs) - return _err(EBADF); - - ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1); - if (0 == ref) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_fd = -1; + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p))\n", fd, rs); + + if (!rs) + return _err (EBADF); + + ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1); + if (0 == ref) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_fd = -1; } - return 0; + return 0; } -#endif/* #ifndef ADPT__RSOCKET_RS_H_ */ +#endif /* #ifndef ADPT__RSOCKET_RS_H_ */ -inline static void rr_rs_connected(struct rsocket *rs) +inline static void +rr_rs_connected (struct rsocket *rs) { - RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, - rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state); + RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, + rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state); - if (!(rs->state & rs_connected)) { - rr_rs_notify_tcp(rs); - return; + if (!(rs->state & rs_connected)) + { + rr_rs_notify_tcp (rs); + return; } - if (rs->rr_epoll_ref) { - int evfd = rr_rs_evfd(rs); + if (rs->rr_epoll_ref) + { + int evfd = rr_rs_evfd (rs); - if (evfd != rs->rr_epoll_fd) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_fd = evfd; - (void)rr_ep_add(evfd, rs->index); + if (evfd != rs->rr_epoll_fd) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_fd = evfd; + (void) rr_ep_add (evfd, rs->index); } - rr_rs_handle_tcp(rs); + rr_rs_handle_tcp (rs); } } -int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) +int +raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags) { - int ret, fd; - struct rsocket *rs; - - RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); - fd = raccept(socket, addr, addrlen); - RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); - if (fd < 0) - return fd; - - rs = (struct rsocket*)idm_lookup(&idm, fd); - if (!rs) { - RR_ERR("panic\n"); - return -1; + int ret, fd; + struct rsocket *rs; + + RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); + fd = raccept (socket, addr, addrlen); + RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); + if (fd < 0) + return fd; + + rs = (struct rsocket *) idm_lookup (&idm, fd); + if (!rs) + { + RR_ERR ("panic\n"); + return -1; } - if (flags & SOCK_NONBLOCK) { - if (0 == (rs->fd_flags & O_NONBLOCK)) { - RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL)); - ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); - if (0 == ret) - rs->fd_flags |= O_NONBLOCK; + if (flags & SOCK_NONBLOCK) + { + if (0 == (rs->fd_flags & O_NONBLOCK)) + { + RR_DBG ("orig flag:%x\n", + GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL)); + ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (0 == ret) + rs->fd_flags |= O_NONBLOCK; } } - if (flags & SOCK_CLOEXEC) { - RR_LOG("ignore flag:SOCK_CLOEXEC\n"); + if (flags & SOCK_CLOEXEC) + { + RR_LOG ("ignore flag:SOCK_CLOEXEC\n"); } - return fd; + return fd; } - -#endif/* #ifndef _RSOCKET_RS_C_ */ - +#endif /* #ifndef _RSOCKET_RS_C_ */ |