diff options
Diffstat (limited to 'stacks/rsocket/src')
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.c | 231 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.h | 31 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_in.h | 17 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rdma.h | 57 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rs.c | 457 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_sapi.h | 41 |
6 files changed, 446 insertions, 388 deletions
diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c index eda7fd7..ea22c76 100644 --- a/stacks/rsocket/src/rsocket_adpt.c +++ b/stacks/rsocket/src/rsocket_adpt.c @@ -24,78 +24,89 @@ #include "rsocket_adpt.h" #include "rdma/rsocket.h" - #define RR_EVFD(u64) ((int)((u64) >> 32)) #define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF)) #define RR_DATA(evfd, rsfd) ((((uint64_t)(evfd)) << 32) | (uint64_t)(uint32_t)(rsfd)) #define RR_EV_NUM 64 -rsocket_var_t g_rr_var = {0}; +rsocket_var_t g_rr_var = { 0 }; -rr_sapi_t g_sapi = {0}; +rr_sapi_t g_sapi = { 0 }; int g_rr_log_level = -1; - -int rr_notify_event(void *pdata, int events) +int +rr_notify_event (void *pdata, int events) { - int ret; + int ret; - ret = g_rr_var.event_cb(pdata, events); + ret = g_rr_var.event_cb (pdata, events); - RR_DBG("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno); + RR_DBG ("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno); - return ret; + return ret; } -int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd) +int +rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd) { - int ret; - struct epoll_event event; - event.events = events; - event.data.u64 = RR_DATA(evfd, rsfd); - ret = GSAPI(epoll_ctl)(g_rr_var.epfd, op, evfd, &event); - return ret; + int ret; + struct epoll_event event; + event.events = events; + event.data.u64 = RR_DATA (evfd, rsfd); + ret = GSAPI (epoll_ctl) (g_rr_var.epfd, op, evfd, &event); + return ret; } -static void *rr_epoll_thread(void *arg) +static void * +rr_epoll_thread (void *arg) { - int i, ret, e; - struct epoll_event events[RR_EV_NUM]; - - while (1) { - ret = GSAPI(epoll_wait)(g_rr_var.epfd, events, RR_EV_NUM, 100); - e = errno; - - for (i = 0; i < ret; ++i) { - if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) { - (void)rr_ep_del(RR_EVFD(events[i].data.u64)); + int i, ret, e; + struct epoll_event events[RR_EV_NUM]; + + while (1) + { + ret = GSAPI (epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100); + e = errno; + + for (i = 0; i < ret; ++i) + { + if (rr_rs_handle (RR_RSFD (events[i].data.u64), events[i].events)) + { + (void) rr_ep_del (RR_EVFD (events[i].data.u64)); } } - if (ret < 0) { - RR_STAT_INC(RR_STAT_EPW_ERR); - if (e == EINTR) { - RR_STAT_INC(RR_STAT_EPW_EINTR); - } else if ( e == ETIMEDOUT) { - RR_STAT_INC(RR_STAT_EPW_ETIMEOUT); - } else { - RR_ERR("epoll_wait()=%d:%d\n", ret, errno); + if (ret < 0) + { + RR_STAT_INC (RR_STAT_EPW_ERR); + if (e == EINTR) + { + RR_STAT_INC (RR_STAT_EPW_EINTR); + } + else if (e == ETIMEDOUT) + { + RR_STAT_INC (RR_STAT_EPW_ETIMEOUT); + } + else + { + RR_ERR ("epoll_wait()=%d:%d\n", ret, errno); } } } - return NULL; + return NULL; } - -static int rr_init_sapi() +static int +rr_init_sapi () { - void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL); - if (!handle) { - RR_ERR("dlopen(libc.so.6):NULL\n"); - return -1; + void *handle = dlopen ("libc.so.6", RTLD_NOW | RTLD_GLOBAL); + if (!handle) + { + RR_ERR ("dlopen(libc.so.6):NULL\n"); + return -1; } #define RR_SAPI(name) \ @@ -105,97 +116,112 @@ static int rr_init_sapi() #include "rsocket_sapi.h" #undef RR_SAPI - return 0; + return 0; } -static void rr_init_log() +static void +rr_init_log () { - int level; - char *log; + int level; + char *log; - if (g_rr_log_level >= 0) - return; + if (g_rr_log_level >= 0) + return; - log = getenv("RSOCKET_LOG"); - if (!log || !log[0]) { - g_rr_log_level = RR_LOG_OFF; - return; + log = getenv ("RSOCKET_LOG"); + if (!log || !log[0]) + { + g_rr_log_level = RR_LOG_OFF; + return; } - level = atoi(log); - if (level < 0 || level > 99999) { - g_rr_log_level = RR_LOG_OFF; - return; + level = atoi (log); + if (level < 0 || level > 99999) + { + g_rr_log_level = RR_LOG_OFF; + return; } - g_rr_log_level = level; + g_rr_log_level = level; } -static int rsocket_init() +static int +rsocket_init () { - int ret; + int ret; - rr_init_log(); + rr_init_log (); - if (rr_init_sapi()) { - return -1; + if (rr_init_sapi ()) + { + return -1; } - g_rr_var.epfd = GSAPI(epoll_create)(1); - if (g_rr_var.epfd < 0) - return g_rr_var.epfd; - - ret = pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL); - if (ret) { - GSAPI(close)(g_rr_var.epfd); - g_rr_var.epfd = -1; - return ret; + g_rr_var.epfd = GSAPI (epoll_create) (1); + if (g_rr_var.epfd < 0) + return g_rr_var.epfd; + + ret = + pthread_create (&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL); + if (ret) + { + GSAPI (close) (g_rr_var.epfd); + g_rr_var.epfd = -1; + return ret; } - (void)pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll"); + (void) pthread_setname_np (g_rr_var.epoll_threadid, "rsocket_epoll"); - return 0; + return 0; } -int rsocket_exit() +int +rsocket_exit () { - if (g_rr_var.epfd >= 0) { - (void)GSAPI(close)(g_rr_var.epfd); - g_rr_var.epfd = -1; + if (g_rr_var.epfd >= 0) + { + (void) GSAPI (close) (g_rr_var.epfd); + g_rr_var.epfd = -1; } - return 0; + return 0; } -unsigned int rsocket_ep_ctl(int epFD, int proFD, int ctl_ops, struct epoll_event * event, void *pdata) +unsigned int +rsocket_ep_ctl (int epFD, int proFD, int ctl_ops, struct epoll_event *event, + void *pdata) { - int ret; - struct eventpoll *ep; - unsigned int revents = 0; + int ret; + struct eventpoll *ep; + unsigned int revents = 0; - RR_DBG("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, pdata); + RR_DBG ("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, + pdata); - switch (ctl_ops) { + switch (ctl_ops) + { case nstack_ep_triggle_add: - ret = rr_rs_ep_add(proFD, pdata, &revents); - if (ret) - return -1; - return revents; + ret = rr_rs_ep_add (proFD, pdata, &revents); + if (ret) + return -1; + return revents; case nstack_ep_triggle_mod: - ret = rr_rs_ep_mod(proFD, pdata, &revents); - if (ret) - return -1; - return revents; + ret = rr_rs_ep_mod (proFD, pdata, &revents); + if (ret) + return -1; + return revents; case nstack_ep_triggle_del: - return rr_rs_ep_del(proFD); + return rr_rs_ep_del (proFD); } - return _err(EPERM); + return _err (EPERM); } -int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops) +int +rsocket_stack_register (nstack_proc_cb * proc_fun, + nstack_event_cb * event_ops) { - rr_init_log(); + rr_init_log (); #define NSTACK_MK_DECL(ret, fn, args) \ do { \ @@ -206,13 +232,12 @@ int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops) #include "declare_syscalls.h" #undef NSTACK_MK_DECL - proc_fun->extern_ops.module_init = rsocket_init; - proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl; - proc_fun->extern_ops.ep_getevt = NULL; + proc_fun->extern_ops.module_init = rsocket_init; + proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl; + proc_fun->extern_ops.ep_getevt = NULL; - g_rr_var.type = event_ops->type; - g_rr_var.event_cb = event_ops->event_cb; + g_rr_var.type = event_ops->type; + g_rr_var.event_cb = event_ops->event_cb; - return 0; + return 0; } - diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h index 43ec6cf..9c53330 100644 --- a/stacks/rsocket/src/rsocket_adpt.h +++ b/stacks/rsocket/src/rsocket_adpt.h @@ -20,12 +20,13 @@ #include "indexer.h" #include "rsocket_rdma.h" -enum { - RR_STAT_EPW_ERR, - RR_STAT_EPW_EINTR, - RR_STAT_EPW_ETIMEOUT, +enum +{ + RR_STAT_EPW_ERR, + RR_STAT_EPW_EINTR, + RR_STAT_EPW_ETIMEOUT, - RR_STAT_NUM + RR_STAT_NUM }; #define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num) @@ -35,21 +36,19 @@ enum { #define RSRDMA_EXIT 1 -typedef struct rsocket_var { - pthread_t epoll_threadid; +typedef struct rsocket_var +{ + pthread_t epoll_threadid; - int epfd; - int type; - int (*event_cb) (void *pdata, int events); + int epfd; + int type; + int (*event_cb) (void *pdata, int events); - uint64_t stat[RR_STAT_NUM]; + uint64_t stat[RR_STAT_NUM]; } rsocket_var_t; extern rsocket_var_t g_rr_var; +int rr_rs_handle (int fd, uint32_t events); -int rr_rs_handle(int fd, uint32_t events); - - -#endif/* #ifndef _RSOCKET_ADPT_H_ */ - +#endif /* #ifndef _RSOCKET_ADPT_H_ */ diff --git a/stacks/rsocket/src/rsocket_in.h b/stacks/rsocket/src/rsocket_in.h index b4e1ae6..1e868d4 100644 --- a/stacks/rsocket/src/rsocket_in.h +++ b/stacks/rsocket/src/rsocket_in.h @@ -19,15 +19,14 @@ #include "rsocket_rdma.h" -inline static void rr_rs_init(struct rsocket *rs); -inline static void rr_rs_dest(struct rsocket *rs); +inline static void rr_rs_init (struct rsocket *rs); +inline static void rr_rs_dest (struct rsocket *rs); -static inline void rr_rs_notify_tcp(struct rsocket *rs); -static inline void rr_rs_notify_udp(struct rsocket *rs); +static inline void rr_rs_notify_tcp (struct rsocket *rs); +static inline void rr_rs_notify_udp (struct rsocket *rs); -inline static void rr_rs_handle_tcp(struct rsocket *rs); -inline static int rr_rs_evfd(struct rsocket *rs); -inline static void rr_rs_connected(struct rsocket *rs); - -#endif/* #ifndef _RSOCKET_IN_H_ */ +inline static void rr_rs_handle_tcp (struct rsocket *rs); +inline static int rr_rs_evfd (struct rsocket *rs); +inline static void rr_rs_connected (struct rsocket *rs); +#endif /* #ifndef _RSOCKET_IN_H_ */ diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h index 076b6c9..75f4268 100644 --- a/stacks/rsocket/src/rsocket_rdma.h +++ b/stacks/rsocket/src/rsocket_rdma.h @@ -29,12 +29,13 @@ #include <fcntl.h> #include <time.h> -enum { - RR_LOG_OFF = 0x00, - RR_LOG_ERR = 0x01, - RR_LOG_WRN = 0x02, - RR_LOG_LOG = 0x03, - RR_LOG_DBG = 0x04, +enum +{ + RR_LOG_OFF = 0x00, + RR_LOG_ERR = 0x01, + RR_LOG_WRN = 0x02, + RR_LOG_LOG = 0x03, + RR_LOG_DBG = 0x04, }; #define RR_OUT(level, name, fmt, arg...) do { \ @@ -54,44 +55,42 @@ enum { #define RR_DBG(fmt, arg...) ((void)0) #endif - #define _err(err_no) ((errno = (err_no)), -1) +int rr_rs_ep_add (int fd, void *pdata, uint32_t * revent); +int rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent); +int rr_rs_ep_del (int fd); -int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent); -int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent); -int rr_rs_ep_del(int fd); - -uint32_t rr_rs_poll(int fd, uint32_t revents); - -int rr_notify_event(void *pdata, int events); +uint32_t rr_rs_poll (int fd, uint32_t revents); +int rr_notify_event (void *pdata, int events); -typedef struct rr_socket_api { - #define RR_SAPI(name) typeof(name) *n_##name;/* native api */ - #include "rsocket_sapi.h" - #undef RR_SAPI +typedef struct rr_socket_api +{ +#define RR_SAPI(name) typeof(name) *n_##name; /* native api */ +#include "rsocket_sapi.h" +#undef RR_SAPI } rr_sapi_t; extern rr_sapi_t g_sapi; #define GSAPI(name) g_sapi.n_##name +int rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd); -int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd); - -inline static int rr_ep_add(int evfd, int rsfd) +inline static int +rr_ep_add (int evfd, int rsfd) { - return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, rsfd); + return rr_epoll_ctl (EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, + rsfd); } -inline static int rr_ep_del(int evfd) +inline static int +rr_ep_del (int evfd) { - if (evfd < 0) - return 0; - return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0); + if (evfd < 0) + return 0; + return rr_epoll_ctl (EPOLL_CTL_DEL, evfd, 0, 0); } - -#endif/* #ifndef _RSOCKET_RDMA_H_ */ - +#endif /* #ifndef _RSOCKET_RDMA_H_ */ diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c index ca92c3d..0f4e73f 100644 --- a/stacks/rsocket/src/rsocket_rs.c +++ b/stacks/rsocket/src/rsocket_rs.c @@ -17,338 +17,377 @@ #ifndef _RSOCKET_RS_C_ #define _RSOCKET_RS_C_ - -inline static void rr_rs_init(struct rsocket *rs) +inline static void +rr_rs_init (struct rsocket *rs) { - RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } -inline static void rr_rs_dest(struct rsocket *rs) +inline static void +rr_rs_dest (struct rsocket *rs) { - RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); - - if (rs->rr_epoll_ref) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); + + if (rs->rr_epoll_ref) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } } #ifndef POLL__RSOCKET_RS_H_ #define POLL__RSOCKET_RS_H_ -static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_tcp (struct rsocket *rs) { - uint32_t events = 0; - if (rs->state & rs_connected) { - if (rs_have_rdata(rs)) - events |= EPOLLIN; - if (rs_can_send(rs)) - events |= EPOLLOUT; + uint32_t events = 0; + if (rs->state & rs_connected) + { + if (rs_have_rdata (rs)) + events |= EPOLLIN; + if (rs_can_send (rs)) + events |= EPOLLOUT; } - if (rs->state & (rs_error | rs_connect_error)) - events |= EPOLLERR; - if (rs->state & rs_disconnected) - events |= EPOLLHUP; - return events; + if (rs->state & (rs_error | rs_connect_error)) + events |= EPOLLERR; + if (rs->state & rs_disconnected) + events |= EPOLLHUP; + return events; } -static inline uint32_t rr_rs_poll_udp(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_udp (struct rsocket *rs) { - uint32_t events = 0; - if (rs_have_rdata(rs)) - events |= EPOLLIN; - if (ds_can_send(rs)) - events |= EPOLLOUT; - if (rs->state & rs_error) - events |= EPOLLERR; - return events; + uint32_t events = 0; + if (rs_have_rdata (rs)) + events |= EPOLLIN; + if (ds_can_send (rs)) + events |= EPOLLOUT; + if (rs->state & rs_error) + events |= EPOLLERR; + return events; } -static inline uint32_t rr_rs_poll_both(struct rsocket *rs) +static inline uint32_t +rr_rs_poll_both (struct rsocket *rs) { - if (rs->type == SOCK_STREAM) - return rr_rs_poll_tcp(rs); + if (rs->type == SOCK_STREAM) + return rr_rs_poll_tcp (rs); - if (rs->type == SOCK_DGRAM) - return rr_rs_poll_udp(rs); + if (rs->type == SOCK_DGRAM) + return rr_rs_poll_udp (rs); - return 0; + return 0; } -uint32_t rr_rs_poll(int fd, uint32_t revents) +uint32_t +rr_rs_poll (int fd, uint32_t revents) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - if (!rs) - return 0; + if (!rs) + return 0; - if (rs->state == rs_listening) - return revents; + if (rs->state == rs_listening) + return revents; - return rr_rs_poll_both(rs); + return rr_rs_poll_both (rs); } -#endif/* #ifndef POLL__RSOCKET_RS_H_ */ - +#endif /* #ifndef POLL__RSOCKET_RS_H_ */ -static inline void rr_rs_notify_tcp(struct rsocket *rs) +static inline void +rr_rs_notify_tcp (struct rsocket *rs) { - if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_tcp(rs); - if (events) - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->rr_epoll_ref) + { + uint32_t events = rr_rs_poll_tcp (rs); + if (events) + (void) rr_notify_event (rs->rr_epoll_pdata, events); } } -static inline void rr_rs_notify_udp(struct rsocket *rs) +static inline void +rr_rs_notify_udp (struct rsocket *rs) { - if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_udp(rs); - if (events) - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->rr_epoll_ref) + { + uint32_t events = rr_rs_poll_udp (rs); + if (events) + (void) rr_notify_event (rs->rr_epoll_pdata, events); } } #ifndef HANDLE__RSOCKET_RS_H_ #define HANDLE__RSOCKET_RS_H_ -inline static void rr_rs_handle_tcp(struct rsocket *rs) +inline static void +rr_rs_handle_tcp (struct rsocket *rs) { - int ret; + int ret; - RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state); + RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state); - if (!(rs->state & (rs_connected | rs_opening))) - return; + if (!(rs->state & (rs_connected | rs_opening))) + return; - fastlock_acquire(&rs->cq_wait_lock); - ret = rs_get_cq_event(rs); - RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); - fastlock_release(&rs->cq_wait_lock); + fastlock_acquire (&rs->cq_wait_lock); + ret = rs_get_cq_event (rs); + RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); + fastlock_release (&rs->cq_wait_lock); - fastlock_acquire(&rs->cq_lock); + fastlock_acquire (&rs->cq_lock); - if (rs->state & rs_connected) { - rs_update_credits(rs); - ret = rs_poll_cq(rs); - RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", - rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); + if (rs->state & rs_connected) + { + rs_update_credits (rs); + ret = rs_poll_cq (rs); + RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", + rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); } - if (rs->rr_epoll_ref && rs->cq_armed < 1) { - ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0); - RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); - if (0 == ret) - __sync_fetch_and_add(&rs->cq_armed, 1); + if (rs->rr_epoll_ref && rs->cq_armed < 1) + { + ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0); + RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); + if (0 == ret) + __sync_fetch_and_add (&rs->cq_armed, 1); } - if (rs->state & rs_connected) { - ret = rs_poll_cq(rs); - RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); - rs_update_credits(rs); + if (rs->state & rs_connected) + { + ret = rs_poll_cq (rs); + RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); + rs_update_credits (rs); } - fastlock_release(&rs->cq_lock); + fastlock_release (&rs->cq_lock); - RR_DBG("(%d)=\n", rs->index); + RR_DBG ("(%d)=\n", rs->index); } -inline static void rr_rs_handle_udp(struct rsocket *rs) +inline static void +rr_rs_handle_udp (struct rsocket *rs) { - fastlock_acquire(&rs->cq_wait_lock); - ds_get_cq_event(rs); - fastlock_release(&rs->cq_wait_lock); - - fastlock_acquire(&rs->cq_lock); - ds_poll_cqs(rs); - if (rs->rr_epoll_ref && !rs->cq_armed) { - ds_req_notify_cqs(rs); - rs->cq_armed = 1; + fastlock_acquire (&rs->cq_wait_lock); + ds_get_cq_event (rs); + fastlock_release (&rs->cq_wait_lock); + + fastlock_acquire (&rs->cq_lock); + ds_poll_cqs (rs); + if (rs->rr_epoll_ref && !rs->cq_armed) + { + ds_req_notify_cqs (rs); + rs->cq_armed = 1; } - fastlock_release(&rs->cq_lock); + fastlock_release (&rs->cq_lock); } -inline static void rr_rs_handle_rs(struct rsocket *rs) +inline static void +rr_rs_handle_rs (struct rsocket *rs) { - if (rs->state & rs_opening) { - int ret = rs_do_connect(rs); - RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); - return; + if (rs->state & rs_opening) + { + int ret = rs_do_connect (rs); + RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); + return; } - if (rs->type == SOCK_STREAM) { - rr_rs_handle_tcp(rs); + if (rs->type == SOCK_STREAM) + { + rr_rs_handle_tcp (rs); } - if (rs->type == SOCK_DGRAM) { - rr_rs_handle_udp(rs); + if (rs->type == SOCK_DGRAM) + { + rr_rs_handle_udp (rs); } } -int rr_rs_handle(int fd, uint32_t events) +int +rr_rs_handle (int fd, uint32_t events) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); + RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); - if (!rs) - return _err(EBADF); + if (!rs) + return _err (EBADF); - if (rs->state == rs_listening) { - if (events & EPOLLIN) { - (void)rr_notify_event(rs->rr_epoll_pdata, events); + if (rs->state == rs_listening) + { + if (events & EPOLLIN) + { + (void) rr_notify_event (rs->rr_epoll_pdata, events); } - return 0; + return 0; } - rr_rs_handle_rs(rs); + rr_rs_handle_rs (rs); - return 0; + return 0; } - -#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */ +#endif /* #ifndef HANDLE__RSOCKET_RS_H_ */ #ifndef ADPT__RSOCKET_RS_H_ #define ADPT__RSOCKET_RS_H_ -inline static int rr_rs_evfd(struct rsocket *rs) +inline static int +rr_rs_evfd (struct rsocket *rs) { - if (rs->type == SOCK_STREAM) { - if (rs->state >= rs_connected) - return rs->cm_id->recv_cq_channel->fd; - else - return rs->cm_id->channel->fd; - } else { - return rs->epfd; + if (rs->type == SOCK_STREAM) + { + if (rs->state >= rs_connected) + return rs->cm_id->recv_cq_channel->fd; + else + return rs->cm_id->channel->fd; + } + else + { + return rs->epfd; } - return -1; + return -1; } -int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent) +int +rr_rs_ep_add (int fd, void *pdata, uint32_t * revent) { - int ref; - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p),)\n", fd, rs); - if (!rs) - return _err(EBADF); - - ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1); - if (1 == ref) { - rs->rr_epoll_fd = rr_rs_evfd(rs); - (void)rr_ep_add(rs->rr_epoll_fd, rs->index); + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p),)\n", fd, rs); + if (!rs) + return _err (EBADF); + + ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1); + if (1 == ref) + { + rs->rr_epoll_fd = rr_rs_evfd (rs); + (void) rr_ep_add (rs->rr_epoll_fd, rs->index); } - (void)rr_rs_handle_rs(rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + (void) rr_rs_handle_rs (rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG("*revent=0x%x\n", *revent); - return 0; + RR_DBG ("*revent=0x%x\n", *revent); + return 0; } -int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent) +int +rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent) { - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p),)\n", fd, rs); - if (!rs) - return _err(EBADF); + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p),)\n", fd, rs); + if (!rs) + return _err (EBADF); - if (rs->rr_epoll_ref <= 0) - return _err(ENOENT); + if (rs->rr_epoll_ref <= 0) + return _err (ENOENT); - (void)rr_rs_handle_rs(rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); + (void) rr_rs_handle_rs (rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG("*revent=0x%x\n", *revent); - return 0; + RR_DBG ("*revent=0x%x\n", *revent); + return 0; } -int rr_rs_ep_del(int fd) +int +rr_rs_ep_del (int fd) { - int ref; - struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd); - RR_DBG("(%d(%p))\n", fd, rs); - - if (!rs) - return _err(EBADF); - - ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1); - if (0 == ref) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_fd = -1; + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + RR_DBG ("(%d(%p))\n", fd, rs); + + if (!rs) + return _err (EBADF); + + ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1); + if (0 == ref) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_fd = -1; } - return 0; + return 0; } -#endif/* #ifndef ADPT__RSOCKET_RS_H_ */ +#endif /* #ifndef ADPT__RSOCKET_RS_H_ */ -inline static void rr_rs_connected(struct rsocket *rs) +inline static void +rr_rs_connected (struct rsocket *rs) { - RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, - rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state); + RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, + rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state); - if (!(rs->state & rs_connected)) { - rr_rs_notify_tcp(rs); - return; + if (!(rs->state & rs_connected)) + { + rr_rs_notify_tcp (rs); + return; } - if (rs->rr_epoll_ref) { - int evfd = rr_rs_evfd(rs); + if (rs->rr_epoll_ref) + { + int evfd = rr_rs_evfd (rs); - if (evfd != rs->rr_epoll_fd) { - (void)rr_ep_del(rs->rr_epoll_fd); - rs->rr_epoll_fd = evfd; - (void)rr_ep_add(evfd, rs->index); + if (evfd != rs->rr_epoll_fd) + { + (void) rr_ep_del (rs->rr_epoll_fd); + rs->rr_epoll_fd = evfd; + (void) rr_ep_add (evfd, rs->index); } - rr_rs_handle_tcp(rs); + rr_rs_handle_tcp (rs); } } -int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) +int +raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags) { - int ret, fd; - struct rsocket *rs; - - RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); - fd = raccept(socket, addr, addrlen); - RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); - if (fd < 0) - return fd; - - rs = (struct rsocket*)idm_lookup(&idm, fd); - if (!rs) { - RR_ERR("panic\n"); - return -1; + int ret, fd; + struct rsocket *rs; + + RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); + fd = raccept (socket, addr, addrlen); + RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); + if (fd < 0) + return fd; + + rs = (struct rsocket *) idm_lookup (&idm, fd); + if (!rs) + { + RR_ERR ("panic\n"); + return -1; } - if (flags & SOCK_NONBLOCK) { - if (0 == (rs->fd_flags & O_NONBLOCK)) { - RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL)); - ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); - if (0 == ret) - rs->fd_flags |= O_NONBLOCK; + if (flags & SOCK_NONBLOCK) + { + if (0 == (rs->fd_flags & O_NONBLOCK)) + { + RR_DBG ("orig flag:%x\n", + GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL)); + ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (0 == ret) + rs->fd_flags |= O_NONBLOCK; } } - if (flags & SOCK_CLOEXEC) { - RR_LOG("ignore flag:SOCK_CLOEXEC\n"); + if (flags & SOCK_CLOEXEC) + { + RR_LOG ("ignore flag:SOCK_CLOEXEC\n"); } - return fd; + return fd; } - -#endif/* #ifndef _RSOCKET_RS_C_ */ - +#endif /* #ifndef _RSOCKET_RS_C_ */ diff --git a/stacks/rsocket/src/rsocket_sapi.h b/stacks/rsocket/src/rsocket_sapi.h index a9ca932..21a7481 100644 --- a/stacks/rsocket/src/rsocket_sapi.h +++ b/stacks/rsocket/src/rsocket_sapi.h @@ -14,25 +14,22 @@ * limitations under the License. */ -RR_SAPI(socket) -RR_SAPI(close) -RR_SAPI(bind) -RR_SAPI(connect) -RR_SAPI(getpeername) -RR_SAPI(getsockname) - -RR_SAPI(fcntl) -RR_SAPI(setsockopt) -RR_SAPI(getsockopt) - -RR_SAPI(read) -RR_SAPI(write) - -RR_SAPI(sendmsg) -RR_SAPI(recvfrom) - -RR_SAPI(poll) - -RR_SAPI(epoll_create) -RR_SAPI(epoll_ctl) -RR_SAPI(epoll_wait) +/* *INDENT-OFF* */ +RR_SAPI (socket) +RR_SAPI (close) +RR_SAPI (bind) +RR_SAPI (connect) +RR_SAPI (getpeername) +RR_SAPI (getsockname) +RR_SAPI (fcntl) +RR_SAPI (setsockopt) +RR_SAPI (getsockopt) +RR_SAPI (read) +RR_SAPI (write) +RR_SAPI (sendmsg) +RR_SAPI (recvfrom) +RR_SAPI (poll) +RR_SAPI (epoll_create) +RR_SAPI (epoll_ctl) +RR_SAPI (epoll_wait) +/* *INDENT-ON* */ |