aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket/src/rsocket_rs.c
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket/src/rsocket_rs.c')
-rw-r--r--stacks/rsocket/src/rsocket_rs.c457
1 files changed, 248 insertions, 209 deletions
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c
index ca92c3d..0f4e73f 100644
--- a/stacks/rsocket/src/rsocket_rs.c
+++ b/stacks/rsocket/src/rsocket_rs.c
@@ -17,338 +17,377 @@
#ifndef _RSOCKET_RS_C_
#define _RSOCKET_RS_C_
-
-inline static void rr_rs_init(struct rsocket *rs)
+inline static void
+rr_rs_init (struct rsocket *rs)
{
- RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
-inline static void rr_rs_dest(struct rsocket *rs)
+inline static void
+rr_rs_dest (struct rsocket *rs)
{
- RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
-
- if (rs->rr_epoll_ref) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
+
+ if (rs->rr_epoll_ref)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
}
#ifndef POLL__RSOCKET_RS_H_
#define POLL__RSOCKET_RS_H_
-static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_tcp (struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs->state & rs_connected) {
- if (rs_have_rdata(rs))
- events |= EPOLLIN;
- if (rs_can_send(rs))
- events |= EPOLLOUT;
+ uint32_t events = 0;
+ if (rs->state & rs_connected)
+ {
+ if (rs_have_rdata (rs))
+ events |= EPOLLIN;
+ if (rs_can_send (rs))
+ events |= EPOLLOUT;
}
- if (rs->state & (rs_error | rs_connect_error))
- events |= EPOLLERR;
- if (rs->state & rs_disconnected)
- events |= EPOLLHUP;
- return events;
+ if (rs->state & (rs_error | rs_connect_error))
+ events |= EPOLLERR;
+ if (rs->state & rs_disconnected)
+ events |= EPOLLHUP;
+ return events;
}
-static inline uint32_t rr_rs_poll_udp(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_udp (struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs_have_rdata(rs))
- events |= EPOLLIN;
- if (ds_can_send(rs))
- events |= EPOLLOUT;
- if (rs->state & rs_error)
- events |= EPOLLERR;
- return events;
+ uint32_t events = 0;
+ if (rs_have_rdata (rs))
+ events |= EPOLLIN;
+ if (ds_can_send (rs))
+ events |= EPOLLOUT;
+ if (rs->state & rs_error)
+ events |= EPOLLERR;
+ return events;
}
-static inline uint32_t rr_rs_poll_both(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_both (struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM)
- return rr_rs_poll_tcp(rs);
+ if (rs->type == SOCK_STREAM)
+ return rr_rs_poll_tcp (rs);
- if (rs->type == SOCK_DGRAM)
- return rr_rs_poll_udp(rs);
+ if (rs->type == SOCK_DGRAM)
+ return rr_rs_poll_udp (rs);
- return 0;
+ return 0;
}
-uint32_t rr_rs_poll(int fd, uint32_t revents)
+uint32_t
+rr_rs_poll (int fd, uint32_t revents)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- if (!rs)
- return 0;
+ if (!rs)
+ return 0;
- if (rs->state == rs_listening)
- return revents;
+ if (rs->state == rs_listening)
+ return revents;
- return rr_rs_poll_both(rs);
+ return rr_rs_poll_both (rs);
}
-#endif/* #ifndef POLL__RSOCKET_RS_H_ */
-
+#endif /* #ifndef POLL__RSOCKET_RS_H_ */
-static inline void rr_rs_notify_tcp(struct rsocket *rs)
+static inline void
+rr_rs_notify_tcp (struct rsocket *rs)
{
- if (rs->rr_epoll_ref) {
- uint32_t events = rr_rs_poll_tcp(rs);
- if (events)
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->rr_epoll_ref)
+ {
+ uint32_t events = rr_rs_poll_tcp (rs);
+ if (events)
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
}
-static inline void rr_rs_notify_udp(struct rsocket *rs)
+static inline void
+rr_rs_notify_udp (struct rsocket *rs)
{
- if (rs->rr_epoll_ref) {
- uint32_t events = rr_rs_poll_udp(rs);
- if (events)
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->rr_epoll_ref)
+ {
+ uint32_t events = rr_rs_poll_udp (rs);
+ if (events)
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
}
#ifndef HANDLE__RSOCKET_RS_H_
#define HANDLE__RSOCKET_RS_H_
-inline static void rr_rs_handle_tcp(struct rsocket *rs)
+inline static void
+rr_rs_handle_tcp (struct rsocket *rs)
{
- int ret;
+ int ret;
- RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state);
+ RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state);
- if (!(rs->state & (rs_connected | rs_opening)))
- return;
+ if (!(rs->state & (rs_connected | rs_opening)))
+ return;
- fastlock_acquire(&rs->cq_wait_lock);
- ret = rs_get_cq_event(rs);
- RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
- fastlock_release(&rs->cq_wait_lock);
+ fastlock_acquire (&rs->cq_wait_lock);
+ ret = rs_get_cq_event (rs);
+ RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
+ fastlock_release (&rs->cq_wait_lock);
- fastlock_acquire(&rs->cq_lock);
+ fastlock_acquire (&rs->cq_lock);
- if (rs->state & rs_connected) {
- rs_update_credits(rs);
- ret = rs_poll_cq(rs);
- RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
- rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
+ if (rs->state & rs_connected)
+ {
+ rs_update_credits (rs);
+ ret = rs_poll_cq (rs);
+ RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
+ rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
}
- if (rs->rr_epoll_ref && rs->cq_armed < 1) {
- ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
- RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
- if (0 == ret)
- __sync_fetch_and_add(&rs->cq_armed, 1);
+ if (rs->rr_epoll_ref && rs->cq_armed < 1)
+ {
+ ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0);
+ RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ if (0 == ret)
+ __sync_fetch_and_add (&rs->cq_armed, 1);
}
- if (rs->state & rs_connected) {
- ret = rs_poll_cq(rs);
- RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
- rs_update_credits(rs);
+ if (rs->state & rs_connected)
+ {
+ ret = rs_poll_cq (rs);
+ RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ rs_update_credits (rs);
}
- fastlock_release(&rs->cq_lock);
+ fastlock_release (&rs->cq_lock);
- RR_DBG("(%d)=\n", rs->index);
+ RR_DBG ("(%d)=\n", rs->index);
}
-inline static void rr_rs_handle_udp(struct rsocket *rs)
+inline static void
+rr_rs_handle_udp (struct rsocket *rs)
{
- fastlock_acquire(&rs->cq_wait_lock);
- ds_get_cq_event(rs);
- fastlock_release(&rs->cq_wait_lock);
-
- fastlock_acquire(&rs->cq_lock);
- ds_poll_cqs(rs);
- if (rs->rr_epoll_ref && !rs->cq_armed) {
- ds_req_notify_cqs(rs);
- rs->cq_armed = 1;
+ fastlock_acquire (&rs->cq_wait_lock);
+ ds_get_cq_event (rs);
+ fastlock_release (&rs->cq_wait_lock);
+
+ fastlock_acquire (&rs->cq_lock);
+ ds_poll_cqs (rs);
+ if (rs->rr_epoll_ref && !rs->cq_armed)
+ {
+ ds_req_notify_cqs (rs);
+ rs->cq_armed = 1;
}
- fastlock_release(&rs->cq_lock);
+ fastlock_release (&rs->cq_lock);
}
-inline static void rr_rs_handle_rs(struct rsocket *rs)
+inline static void
+rr_rs_handle_rs (struct rsocket *rs)
{
- if (rs->state & rs_opening) {
- int ret = rs_do_connect(rs);
- RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
- return;
+ if (rs->state & rs_opening)
+ {
+ int ret = rs_do_connect (rs);
+ RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
+ return;
}
- if (rs->type == SOCK_STREAM) {
- rr_rs_handle_tcp(rs);
+ if (rs->type == SOCK_STREAM)
+ {
+ rr_rs_handle_tcp (rs);
}
- if (rs->type == SOCK_DGRAM) {
- rr_rs_handle_udp(rs);
+ if (rs->type == SOCK_DGRAM)
+ {
+ rr_rs_handle_udp (rs);
}
}
-int rr_rs_handle(int fd, uint32_t events)
+int
+rr_rs_handle (int fd, uint32_t events)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
+ RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
- if (!rs)
- return _err(EBADF);
+ if (!rs)
+ return _err (EBADF);
- if (rs->state == rs_listening) {
- if (events & EPOLLIN) {
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->state == rs_listening)
+ {
+ if (events & EPOLLIN)
+ {
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
- return 0;
+ return 0;
}
- rr_rs_handle_rs(rs);
+ rr_rs_handle_rs (rs);
- return 0;
+ return 0;
}
-
-#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */
+#endif /* #ifndef HANDLE__RSOCKET_RS_H_ */
#ifndef ADPT__RSOCKET_RS_H_
#define ADPT__RSOCKET_RS_H_
-inline static int rr_rs_evfd(struct rsocket *rs)
+inline static int
+rr_rs_evfd (struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM) {
- if (rs->state >= rs_connected)
- return rs->cm_id->recv_cq_channel->fd;
- else
- return rs->cm_id->channel->fd;
- } else {
- return rs->epfd;
+ if (rs->type == SOCK_STREAM)
+ {
+ if (rs->state >= rs_connected)
+ return rs->cm_id->recv_cq_channel->fd;
+ else
+ return rs->cm_id->channel->fd;
+ }
+ else
+ {
+ return rs->epfd;
}
- return -1;
+ return -1;
}
-int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent)
+int
+rr_rs_ep_add (int fd, void *pdata, uint32_t * revent)
{
- int ref;
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err(EBADF);
-
- ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1);
- if (1 == ref) {
- rs->rr_epoll_fd = rr_rs_evfd(rs);
- (void)rr_ep_add(rs->rr_epoll_fd, rs->index);
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err (EBADF);
+
+ ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1);
+ if (1 == ref)
+ {
+ rs->rr_epoll_fd = rr_rs_evfd (rs);
+ (void) rr_ep_add (rs->rr_epoll_fd, rs->index);
}
- (void)rr_rs_handle_rs(rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+ (void) rr_rs_handle_rs (rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG ("*revent=0x%x\n", *revent);
+ return 0;
}
-int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent)
+int
+rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err(EBADF);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err (EBADF);
- if (rs->rr_epoll_ref <= 0)
- return _err(ENOENT);
+ if (rs->rr_epoll_ref <= 0)
+ return _err (ENOENT);
- (void)rr_rs_handle_rs(rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+ (void) rr_rs_handle_rs (rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG ("*revent=0x%x\n", *revent);
+ return 0;
}
-int rr_rs_ep_del(int fd)
+int
+rr_rs_ep_del (int fd)
{
- int ref;
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p))\n", fd, rs);
-
- if (!rs)
- return _err(EBADF);
-
- ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1);
- if (0 == ref) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_fd = -1;
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p))\n", fd, rs);
+
+ if (!rs)
+ return _err (EBADF);
+
+ ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1);
+ if (0 == ref)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_fd = -1;
}
- return 0;
+ return 0;
}
-#endif/* #ifndef ADPT__RSOCKET_RS_H_ */
+#endif /* #ifndef ADPT__RSOCKET_RS_H_ */
-inline static void rr_rs_connected(struct rsocket *rs)
+inline static void
+rr_rs_connected (struct rsocket *rs)
{
- RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
- rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state);
+ RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
+ rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state);
- if (!(rs->state & rs_connected)) {
- rr_rs_notify_tcp(rs);
- return;
+ if (!(rs->state & rs_connected))
+ {
+ rr_rs_notify_tcp (rs);
+ return;
}
- if (rs->rr_epoll_ref) {
- int evfd = rr_rs_evfd(rs);
+ if (rs->rr_epoll_ref)
+ {
+ int evfd = rr_rs_evfd (rs);
- if (evfd != rs->rr_epoll_fd) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_fd = evfd;
- (void)rr_ep_add(evfd, rs->index);
+ if (evfd != rs->rr_epoll_fd)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_fd = evfd;
+ (void) rr_ep_add (evfd, rs->index);
}
- rr_rs_handle_tcp(rs);
+ rr_rs_handle_tcp (rs);
}
}
-int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags)
+int
+raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags)
{
- int ret, fd;
- struct rsocket *rs;
-
- RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
- fd = raccept(socket, addr, addrlen);
- RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
- if (fd < 0)
- return fd;
-
- rs = (struct rsocket*)idm_lookup(&idm, fd);
- if (!rs) {
- RR_ERR("panic\n");
- return -1;
+ int ret, fd;
+ struct rsocket *rs;
+
+ RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
+ fd = raccept (socket, addr, addrlen);
+ RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
+ if (fd < 0)
+ return fd;
+
+ rs = (struct rsocket *) idm_lookup (&idm, fd);
+ if (!rs)
+ {
+ RR_ERR ("panic\n");
+ return -1;
}
- if (flags & SOCK_NONBLOCK) {
- if (0 == (rs->fd_flags & O_NONBLOCK)) {
- RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL));
- ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
- if (0 == ret)
- rs->fd_flags |= O_NONBLOCK;
+ if (flags & SOCK_NONBLOCK)
+ {
+ if (0 == (rs->fd_flags & O_NONBLOCK))
+ {
+ RR_DBG ("orig flag:%x\n",
+ GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL));
+ ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+ if (0 == ret)
+ rs->fd_flags |= O_NONBLOCK;
}
}
- if (flags & SOCK_CLOEXEC) {
- RR_LOG("ignore flag:SOCK_CLOEXEC\n");
+ if (flags & SOCK_CLOEXEC)
+ {
+ RR_LOG ("ignore flag:SOCK_CLOEXEC\n");
}
- return fd;
+ return fd;
}
-
-#endif/* #ifndef _RSOCKET_RS_C_ */
-
+#endif /* #ifndef _RSOCKET_RS_C_ */