aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket/src
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket/src')
-rw-r--r--stacks/rsocket/src/rsocket_adpt.c231
-rw-r--r--stacks/rsocket/src/rsocket_adpt.h31
-rw-r--r--stacks/rsocket/src/rsocket_in.h17
-rw-r--r--stacks/rsocket/src/rsocket_rdma.h57
-rw-r--r--stacks/rsocket/src/rsocket_rs.c457
-rw-r--r--stacks/rsocket/src/rsocket_sapi.h41
6 files changed, 446 insertions, 388 deletions
diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c
index eda7fd7..ea22c76 100644
--- a/stacks/rsocket/src/rsocket_adpt.c
+++ b/stacks/rsocket/src/rsocket_adpt.c
@@ -24,78 +24,89 @@
#include "rsocket_adpt.h"
#include "rdma/rsocket.h"
-
#define RR_EVFD(u64) ((int)((u64) >> 32))
#define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF))
#define RR_DATA(evfd, rsfd) ((((uint64_t)(evfd)) << 32) | (uint64_t)(uint32_t)(rsfd))
#define RR_EV_NUM 64
-rsocket_var_t g_rr_var = {0};
+rsocket_var_t g_rr_var = { 0 };
-rr_sapi_t g_sapi = {0};
+rr_sapi_t g_sapi = { 0 };
int g_rr_log_level = -1;
-
-int rr_notify_event(void *pdata, int events)
+int
+rr_notify_event (void *pdata, int events)
{
- int ret;
+ int ret;
- ret = g_rr_var.event_cb(pdata, events);
+ ret = g_rr_var.event_cb (pdata, events);
- RR_DBG("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno);
+ RR_DBG ("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno);
- return ret;
+ return ret;
}
-int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd)
+int
+rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd)
{
- int ret;
- struct epoll_event event;
- event.events = events;
- event.data.u64 = RR_DATA(evfd, rsfd);
- ret = GSAPI(epoll_ctl)(g_rr_var.epfd, op, evfd, &event);
- return ret;
+ int ret;
+ struct epoll_event event;
+ event.events = events;
+ event.data.u64 = RR_DATA (evfd, rsfd);
+ ret = GSAPI (epoll_ctl) (g_rr_var.epfd, op, evfd, &event);
+ return ret;
}
-static void *rr_epoll_thread(void *arg)
+static void *
+rr_epoll_thread (void *arg)
{
- int i, ret, e;
- struct epoll_event events[RR_EV_NUM];
-
- while (1) {
- ret = GSAPI(epoll_wait)(g_rr_var.epfd, events, RR_EV_NUM, 100);
- e = errno;
-
- for (i = 0; i < ret; ++i) {
- if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) {
- (void)rr_ep_del(RR_EVFD(events[i].data.u64));
+ int i, ret, e;
+ struct epoll_event events[RR_EV_NUM];
+
+ while (1)
+ {
+ ret = GSAPI (epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100);
+ e = errno;
+
+ for (i = 0; i < ret; ++i)
+ {
+ if (rr_rs_handle (RR_RSFD (events[i].data.u64), events[i].events))
+ {
+ (void) rr_ep_del (RR_EVFD (events[i].data.u64));
}
}
- if (ret < 0) {
- RR_STAT_INC(RR_STAT_EPW_ERR);
- if (e == EINTR) {
- RR_STAT_INC(RR_STAT_EPW_EINTR);
- } else if ( e == ETIMEDOUT) {
- RR_STAT_INC(RR_STAT_EPW_ETIMEOUT);
- } else {
- RR_ERR("epoll_wait()=%d:%d\n", ret, errno);
+ if (ret < 0)
+ {
+ RR_STAT_INC (RR_STAT_EPW_ERR);
+ if (e == EINTR)
+ {
+ RR_STAT_INC (RR_STAT_EPW_EINTR);
+ }
+ else if (e == ETIMEDOUT)
+ {
+ RR_STAT_INC (RR_STAT_EPW_ETIMEOUT);
+ }
+ else
+ {
+ RR_ERR ("epoll_wait()=%d:%d\n", ret, errno);
}
}
}
- return NULL;
+ return NULL;
}
-
-static int rr_init_sapi()
+static int
+rr_init_sapi ()
{
- void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
- if (!handle) {
- RR_ERR("dlopen(libc.so.6):NULL\n");
- return -1;
+ void *handle = dlopen ("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
+ if (!handle)
+ {
+ RR_ERR ("dlopen(libc.so.6):NULL\n");
+ return -1;
}
#define RR_SAPI(name) \
@@ -105,97 +116,112 @@ static int rr_init_sapi()
#include "rsocket_sapi.h"
#undef RR_SAPI
- return 0;
+ return 0;
}
-static void rr_init_log()
+static void
+rr_init_log ()
{
- int level;
- char *log;
+ int level;
+ char *log;
- if (g_rr_log_level >= 0)
- return;
+ if (g_rr_log_level >= 0)
+ return;
- log = getenv("RSOCKET_LOG");
- if (!log || !log[0]) {
- g_rr_log_level = RR_LOG_OFF;
- return;
+ log = getenv ("RSOCKET_LOG");
+ if (!log || !log[0])
+ {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
}
- level = atoi(log);
- if (level < 0 || level > 99999) {
- g_rr_log_level = RR_LOG_OFF;
- return;
+ level = atoi (log);
+ if (level < 0 || level > 99999)
+ {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
}
- g_rr_log_level = level;
+ g_rr_log_level = level;
}
-static int rsocket_init()
+static int
+rsocket_init ()
{
- int ret;
+ int ret;
- rr_init_log();
+ rr_init_log ();
- if (rr_init_sapi()) {
- return -1;
+ if (rr_init_sapi ())
+ {
+ return -1;
}
- g_rr_var.epfd = GSAPI(epoll_create)(1);
- if (g_rr_var.epfd < 0)
- return g_rr_var.epfd;
-
- ret = pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
- if (ret) {
- GSAPI(close)(g_rr_var.epfd);
- g_rr_var.epfd = -1;
- return ret;
+ g_rr_var.epfd = GSAPI (epoll_create) (1);
+ if (g_rr_var.epfd < 0)
+ return g_rr_var.epfd;
+
+ ret =
+ pthread_create (&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
+ if (ret)
+ {
+ GSAPI (close) (g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ return ret;
}
- (void)pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll");
+ (void) pthread_setname_np (g_rr_var.epoll_threadid, "rsocket_epoll");
- return 0;
+ return 0;
}
-int rsocket_exit()
+int
+rsocket_exit ()
{
- if (g_rr_var.epfd >= 0) {
- (void)GSAPI(close)(g_rr_var.epfd);
- g_rr_var.epfd = -1;
+ if (g_rr_var.epfd >= 0)
+ {
+ (void) GSAPI (close) (g_rr_var.epfd);
+ g_rr_var.epfd = -1;
}
- return 0;
+ return 0;
}
-unsigned int rsocket_ep_ctl(int epFD, int proFD, int ctl_ops, struct epoll_event * event, void *pdata)
+unsigned int
+rsocket_ep_ctl (int epFD, int proFD, int ctl_ops, struct epoll_event *event,
+ void *pdata)
{
- int ret;
- struct eventpoll *ep;
- unsigned int revents = 0;
+ int ret;
+ struct eventpoll *ep;
+ unsigned int revents = 0;
- RR_DBG("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, pdata);
+ RR_DBG ("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events,
+ pdata);
- switch (ctl_ops) {
+ switch (ctl_ops)
+ {
case nstack_ep_triggle_add:
- ret = rr_rs_ep_add(proFD, pdata, &revents);
- if (ret)
- return -1;
- return revents;
+ ret = rr_rs_ep_add (proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
case nstack_ep_triggle_mod:
- ret = rr_rs_ep_mod(proFD, pdata, &revents);
- if (ret)
- return -1;
- return revents;
+ ret = rr_rs_ep_mod (proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
case nstack_ep_triggle_del:
- return rr_rs_ep_del(proFD);
+ return rr_rs_ep_del (proFD);
}
- return _err(EPERM);
+ return _err (EPERM);
}
-int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops)
+int
+rsocket_stack_register (nstack_proc_cb * proc_fun,
+ nstack_event_cb * event_ops)
{
- rr_init_log();
+ rr_init_log ();
#define NSTACK_MK_DECL(ret, fn, args) \
do { \
@@ -206,13 +232,12 @@ int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops)
#include "declare_syscalls.h"
#undef NSTACK_MK_DECL
- proc_fun->extern_ops.module_init = rsocket_init;
- proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl;
- proc_fun->extern_ops.ep_getevt = NULL;
+ proc_fun->extern_ops.module_init = rsocket_init;
+ proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl;
+ proc_fun->extern_ops.ep_getevt = NULL;
- g_rr_var.type = event_ops->type;
- g_rr_var.event_cb = event_ops->event_cb;
+ g_rr_var.type = event_ops->type;
+ g_rr_var.event_cb = event_ops->event_cb;
- return 0;
+ return 0;
}
-
diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h
index 43ec6cf..9c53330 100644
--- a/stacks/rsocket/src/rsocket_adpt.h
+++ b/stacks/rsocket/src/rsocket_adpt.h
@@ -20,12 +20,13 @@
#include "indexer.h"
#include "rsocket_rdma.h"
-enum {
- RR_STAT_EPW_ERR,
- RR_STAT_EPW_EINTR,
- RR_STAT_EPW_ETIMEOUT,
+enum
+{
+ RR_STAT_EPW_ERR,
+ RR_STAT_EPW_EINTR,
+ RR_STAT_EPW_ETIMEOUT,
- RR_STAT_NUM
+ RR_STAT_NUM
};
#define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num)
@@ -35,21 +36,19 @@ enum {
#define RSRDMA_EXIT 1
-typedef struct rsocket_var {
- pthread_t epoll_threadid;
+typedef struct rsocket_var
+{
+ pthread_t epoll_threadid;
- int epfd;
- int type;
- int (*event_cb) (void *pdata, int events);
+ int epfd;
+ int type;
+ int (*event_cb) (void *pdata, int events);
- uint64_t stat[RR_STAT_NUM];
+ uint64_t stat[RR_STAT_NUM];
} rsocket_var_t;
extern rsocket_var_t g_rr_var;
+int rr_rs_handle (int fd, uint32_t events);
-int rr_rs_handle(int fd, uint32_t events);
-
-
-#endif/* #ifndef _RSOCKET_ADPT_H_ */
-
+#endif /* #ifndef _RSOCKET_ADPT_H_ */
diff --git a/stacks/rsocket/src/rsocket_in.h b/stacks/rsocket/src/rsocket_in.h
index b4e1ae6..1e868d4 100644
--- a/stacks/rsocket/src/rsocket_in.h
+++ b/stacks/rsocket/src/rsocket_in.h
@@ -19,15 +19,14 @@
#include "rsocket_rdma.h"
-inline static void rr_rs_init(struct rsocket *rs);
-inline static void rr_rs_dest(struct rsocket *rs);
+inline static void rr_rs_init (struct rsocket *rs);
+inline static void rr_rs_dest (struct rsocket *rs);
-static inline void rr_rs_notify_tcp(struct rsocket *rs);
-static inline void rr_rs_notify_udp(struct rsocket *rs);
+static inline void rr_rs_notify_tcp (struct rsocket *rs);
+static inline void rr_rs_notify_udp (struct rsocket *rs);
-inline static void rr_rs_handle_tcp(struct rsocket *rs);
-inline static int rr_rs_evfd(struct rsocket *rs);
-inline static void rr_rs_connected(struct rsocket *rs);
-
-#endif/* #ifndef _RSOCKET_IN_H_ */
+inline static void rr_rs_handle_tcp (struct rsocket *rs);
+inline static int rr_rs_evfd (struct rsocket *rs);
+inline static void rr_rs_connected (struct rsocket *rs);
+#endif /* #ifndef _RSOCKET_IN_H_ */
diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h
index 076b6c9..75f4268 100644
--- a/stacks/rsocket/src/rsocket_rdma.h
+++ b/stacks/rsocket/src/rsocket_rdma.h
@@ -29,12 +29,13 @@
#include <fcntl.h>
#include <time.h>
-enum {
- RR_LOG_OFF = 0x00,
- RR_LOG_ERR = 0x01,
- RR_LOG_WRN = 0x02,
- RR_LOG_LOG = 0x03,
- RR_LOG_DBG = 0x04,
+enum
+{
+ RR_LOG_OFF = 0x00,
+ RR_LOG_ERR = 0x01,
+ RR_LOG_WRN = 0x02,
+ RR_LOG_LOG = 0x03,
+ RR_LOG_DBG = 0x04,
};
#define RR_OUT(level, name, fmt, arg...) do { \
@@ -54,44 +55,42 @@ enum {
#define RR_DBG(fmt, arg...) ((void)0)
#endif
-
#define _err(err_no) ((errno = (err_no)), -1)
+int rr_rs_ep_add (int fd, void *pdata, uint32_t * revent);
+int rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent);
+int rr_rs_ep_del (int fd);
-int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent);
-int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent);
-int rr_rs_ep_del(int fd);
-
-uint32_t rr_rs_poll(int fd, uint32_t revents);
-
-int rr_notify_event(void *pdata, int events);
+uint32_t rr_rs_poll (int fd, uint32_t revents);
+int rr_notify_event (void *pdata, int events);
-typedef struct rr_socket_api {
- #define RR_SAPI(name) typeof(name) *n_##name;/* native api */
- #include "rsocket_sapi.h"
- #undef RR_SAPI
+typedef struct rr_socket_api
+{
+#define RR_SAPI(name) typeof(name) *n_##name; /* native api */
+#include "rsocket_sapi.h"
+#undef RR_SAPI
} rr_sapi_t;
extern rr_sapi_t g_sapi;
#define GSAPI(name) g_sapi.n_##name
+int rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd);
-int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd);
-
-inline static int rr_ep_add(int evfd, int rsfd)
+inline static int
+rr_ep_add (int evfd, int rsfd)
{
- return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, rsfd);
+ return rr_epoll_ctl (EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT,
+ rsfd);
}
-inline static int rr_ep_del(int evfd)
+inline static int
+rr_ep_del (int evfd)
{
- if (evfd < 0)
- return 0;
- return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0);
+ if (evfd < 0)
+ return 0;
+ return rr_epoll_ctl (EPOLL_CTL_DEL, evfd, 0, 0);
}
-
-#endif/* #ifndef _RSOCKET_RDMA_H_ */
-
+#endif /* #ifndef _RSOCKET_RDMA_H_ */
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c
index ca92c3d..0f4e73f 100644
--- a/stacks/rsocket/src/rsocket_rs.c
+++ b/stacks/rsocket/src/rsocket_rs.c
@@ -17,338 +17,377 @@
#ifndef _RSOCKET_RS_C_
#define _RSOCKET_RS_C_
-
-inline static void rr_rs_init(struct rsocket *rs)
+inline static void
+rr_rs_init (struct rsocket *rs)
{
- RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
-inline static void rr_rs_dest(struct rsocket *rs)
+inline static void
+rr_rs_dest (struct rsocket *rs)
{
- RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
-
- if (rs->rr_epoll_ref) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
+
+ if (rs->rr_epoll_ref)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
}
#ifndef POLL__RSOCKET_RS_H_
#define POLL__RSOCKET_RS_H_
-static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_tcp (struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs->state & rs_connected) {
- if (rs_have_rdata(rs))
- events |= EPOLLIN;
- if (rs_can_send(rs))
- events |= EPOLLOUT;
+ uint32_t events = 0;
+ if (rs->state & rs_connected)
+ {
+ if (rs_have_rdata (rs))
+ events |= EPOLLIN;
+ if (rs_can_send (rs))
+ events |= EPOLLOUT;
}
- if (rs->state & (rs_error | rs_connect_error))
- events |= EPOLLERR;
- if (rs->state & rs_disconnected)
- events |= EPOLLHUP;
- return events;
+ if (rs->state & (rs_error | rs_connect_error))
+ events |= EPOLLERR;
+ if (rs->state & rs_disconnected)
+ events |= EPOLLHUP;
+ return events;
}
-static inline uint32_t rr_rs_poll_udp(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_udp (struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs_have_rdata(rs))
- events |= EPOLLIN;
- if (ds_can_send(rs))
- events |= EPOLLOUT;
- if (rs->state & rs_error)
- events |= EPOLLERR;
- return events;
+ uint32_t events = 0;
+ if (rs_have_rdata (rs))
+ events |= EPOLLIN;
+ if (ds_can_send (rs))
+ events |= EPOLLOUT;
+ if (rs->state & rs_error)
+ events |= EPOLLERR;
+ return events;
}
-static inline uint32_t rr_rs_poll_both(struct rsocket *rs)
+static inline uint32_t
+rr_rs_poll_both (struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM)
- return rr_rs_poll_tcp(rs);
+ if (rs->type == SOCK_STREAM)
+ return rr_rs_poll_tcp (rs);
- if (rs->type == SOCK_DGRAM)
- return rr_rs_poll_udp(rs);
+ if (rs->type == SOCK_DGRAM)
+ return rr_rs_poll_udp (rs);
- return 0;
+ return 0;
}
-uint32_t rr_rs_poll(int fd, uint32_t revents)
+uint32_t
+rr_rs_poll (int fd, uint32_t revents)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- if (!rs)
- return 0;
+ if (!rs)
+ return 0;
- if (rs->state == rs_listening)
- return revents;
+ if (rs->state == rs_listening)
+ return revents;
- return rr_rs_poll_both(rs);
+ return rr_rs_poll_both (rs);
}
-#endif/* #ifndef POLL__RSOCKET_RS_H_ */
-
+#endif /* #ifndef POLL__RSOCKET_RS_H_ */
-static inline void rr_rs_notify_tcp(struct rsocket *rs)
+static inline void
+rr_rs_notify_tcp (struct rsocket *rs)
{
- if (rs->rr_epoll_ref) {
- uint32_t events = rr_rs_poll_tcp(rs);
- if (events)
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->rr_epoll_ref)
+ {
+ uint32_t events = rr_rs_poll_tcp (rs);
+ if (events)
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
}
-static inline void rr_rs_notify_udp(struct rsocket *rs)
+static inline void
+rr_rs_notify_udp (struct rsocket *rs)
{
- if (rs->rr_epoll_ref) {
- uint32_t events = rr_rs_poll_udp(rs);
- if (events)
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->rr_epoll_ref)
+ {
+ uint32_t events = rr_rs_poll_udp (rs);
+ if (events)
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
}
#ifndef HANDLE__RSOCKET_RS_H_
#define HANDLE__RSOCKET_RS_H_
-inline static void rr_rs_handle_tcp(struct rsocket *rs)
+inline static void
+rr_rs_handle_tcp (struct rsocket *rs)
{
- int ret;
+ int ret;
- RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state);
+ RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state);
- if (!(rs->state & (rs_connected | rs_opening)))
- return;
+ if (!(rs->state & (rs_connected | rs_opening)))
+ return;
- fastlock_acquire(&rs->cq_wait_lock);
- ret = rs_get_cq_event(rs);
- RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
- fastlock_release(&rs->cq_wait_lock);
+ fastlock_acquire (&rs->cq_wait_lock);
+ ret = rs_get_cq_event (rs);
+ RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
+ fastlock_release (&rs->cq_wait_lock);
- fastlock_acquire(&rs->cq_lock);
+ fastlock_acquire (&rs->cq_lock);
- if (rs->state & rs_connected) {
- rs_update_credits(rs);
- ret = rs_poll_cq(rs);
- RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
- rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
+ if (rs->state & rs_connected)
+ {
+ rs_update_credits (rs);
+ ret = rs_poll_cq (rs);
+ RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
+ rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
}
- if (rs->rr_epoll_ref && rs->cq_armed < 1) {
- ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
- RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
- if (0 == ret)
- __sync_fetch_and_add(&rs->cq_armed, 1);
+ if (rs->rr_epoll_ref && rs->cq_armed < 1)
+ {
+ ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0);
+ RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ if (0 == ret)
+ __sync_fetch_and_add (&rs->cq_armed, 1);
}
- if (rs->state & rs_connected) {
- ret = rs_poll_cq(rs);
- RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
- rs_update_credits(rs);
+ if (rs->state & rs_connected)
+ {
+ ret = rs_poll_cq (rs);
+ RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ rs_update_credits (rs);
}
- fastlock_release(&rs->cq_lock);
+ fastlock_release (&rs->cq_lock);
- RR_DBG("(%d)=\n", rs->index);
+ RR_DBG ("(%d)=\n", rs->index);
}
-inline static void rr_rs_handle_udp(struct rsocket *rs)
+inline static void
+rr_rs_handle_udp (struct rsocket *rs)
{
- fastlock_acquire(&rs->cq_wait_lock);
- ds_get_cq_event(rs);
- fastlock_release(&rs->cq_wait_lock);
-
- fastlock_acquire(&rs->cq_lock);
- ds_poll_cqs(rs);
- if (rs->rr_epoll_ref && !rs->cq_armed) {
- ds_req_notify_cqs(rs);
- rs->cq_armed = 1;
+ fastlock_acquire (&rs->cq_wait_lock);
+ ds_get_cq_event (rs);
+ fastlock_release (&rs->cq_wait_lock);
+
+ fastlock_acquire (&rs->cq_lock);
+ ds_poll_cqs (rs);
+ if (rs->rr_epoll_ref && !rs->cq_armed)
+ {
+ ds_req_notify_cqs (rs);
+ rs->cq_armed = 1;
}
- fastlock_release(&rs->cq_lock);
+ fastlock_release (&rs->cq_lock);
}
-inline static void rr_rs_handle_rs(struct rsocket *rs)
+inline static void
+rr_rs_handle_rs (struct rsocket *rs)
{
- if (rs->state & rs_opening) {
- int ret = rs_do_connect(rs);
- RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
- return;
+ if (rs->state & rs_opening)
+ {
+ int ret = rs_do_connect (rs);
+ RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
+ return;
}
- if (rs->type == SOCK_STREAM) {
- rr_rs_handle_tcp(rs);
+ if (rs->type == SOCK_STREAM)
+ {
+ rr_rs_handle_tcp (rs);
}
- if (rs->type == SOCK_DGRAM) {
- rr_rs_handle_udp(rs);
+ if (rs->type == SOCK_DGRAM)
+ {
+ rr_rs_handle_udp (rs);
}
}
-int rr_rs_handle(int fd, uint32_t events)
+int
+rr_rs_handle (int fd, uint32_t events)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
+ RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
- if (!rs)
- return _err(EBADF);
+ if (!rs)
+ return _err (EBADF);
- if (rs->state == rs_listening) {
- if (events & EPOLLIN) {
- (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ if (rs->state == rs_listening)
+ {
+ if (events & EPOLLIN)
+ {
+ (void) rr_notify_event (rs->rr_epoll_pdata, events);
}
- return 0;
+ return 0;
}
- rr_rs_handle_rs(rs);
+ rr_rs_handle_rs (rs);
- return 0;
+ return 0;
}
-
-#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */
+#endif /* #ifndef HANDLE__RSOCKET_RS_H_ */
#ifndef ADPT__RSOCKET_RS_H_
#define ADPT__RSOCKET_RS_H_
-inline static int rr_rs_evfd(struct rsocket *rs)
+inline static int
+rr_rs_evfd (struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM) {
- if (rs->state >= rs_connected)
- return rs->cm_id->recv_cq_channel->fd;
- else
- return rs->cm_id->channel->fd;
- } else {
- return rs->epfd;
+ if (rs->type == SOCK_STREAM)
+ {
+ if (rs->state >= rs_connected)
+ return rs->cm_id->recv_cq_channel->fd;
+ else
+ return rs->cm_id->channel->fd;
+ }
+ else
+ {
+ return rs->epfd;
}
- return -1;
+ return -1;
}
-int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent)
+int
+rr_rs_ep_add (int fd, void *pdata, uint32_t * revent)
{
- int ref;
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err(EBADF);
-
- ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1);
- if (1 == ref) {
- rs->rr_epoll_fd = rr_rs_evfd(rs);
- (void)rr_ep_add(rs->rr_epoll_fd, rs->index);
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err (EBADF);
+
+ ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1);
+ if (1 == ref)
+ {
+ rs->rr_epoll_fd = rr_rs_evfd (rs);
+ (void) rr_ep_add (rs->rr_epoll_fd, rs->index);
}
- (void)rr_rs_handle_rs(rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+ (void) rr_rs_handle_rs (rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG ("*revent=0x%x\n", *revent);
+ return 0;
}
-int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent)
+int
+rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent)
{
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err(EBADF);
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err (EBADF);
- if (rs->rr_epoll_ref <= 0)
- return _err(ENOENT);
+ if (rs->rr_epoll_ref <= 0)
+ return _err (ENOENT);
- (void)rr_rs_handle_rs(rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+ (void) rr_rs_handle_rs (rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG ("*revent=0x%x\n", *revent);
+ return 0;
}
-int rr_rs_ep_del(int fd)
+int
+rr_rs_ep_del (int fd)
{
- int ref;
- struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
- RR_DBG("(%d(%p))\n", fd, rs);
-
- if (!rs)
- return _err(EBADF);
-
- ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1);
- if (0 == ref) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_fd = -1;
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ RR_DBG ("(%d(%p))\n", fd, rs);
+
+ if (!rs)
+ return _err (EBADF);
+
+ ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1);
+ if (0 == ref)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_fd = -1;
}
- return 0;
+ return 0;
}
-#endif/* #ifndef ADPT__RSOCKET_RS_H_ */
+#endif /* #ifndef ADPT__RSOCKET_RS_H_ */
-inline static void rr_rs_connected(struct rsocket *rs)
+inline static void
+rr_rs_connected (struct rsocket *rs)
{
- RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
- rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state);
+ RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
+ rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state);
- if (!(rs->state & rs_connected)) {
- rr_rs_notify_tcp(rs);
- return;
+ if (!(rs->state & rs_connected))
+ {
+ rr_rs_notify_tcp (rs);
+ return;
}
- if (rs->rr_epoll_ref) {
- int evfd = rr_rs_evfd(rs);
+ if (rs->rr_epoll_ref)
+ {
+ int evfd = rr_rs_evfd (rs);
- if (evfd != rs->rr_epoll_fd) {
- (void)rr_ep_del(rs->rr_epoll_fd);
- rs->rr_epoll_fd = evfd;
- (void)rr_ep_add(evfd, rs->index);
+ if (evfd != rs->rr_epoll_fd)
+ {
+ (void) rr_ep_del (rs->rr_epoll_fd);
+ rs->rr_epoll_fd = evfd;
+ (void) rr_ep_add (evfd, rs->index);
}
- rr_rs_handle_tcp(rs);
+ rr_rs_handle_tcp (rs);
}
}
-int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags)
+int
+raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags)
{
- int ret, fd;
- struct rsocket *rs;
-
- RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
- fd = raccept(socket, addr, addrlen);
- RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
- if (fd < 0)
- return fd;
-
- rs = (struct rsocket*)idm_lookup(&idm, fd);
- if (!rs) {
- RR_ERR("panic\n");
- return -1;
+ int ret, fd;
+ struct rsocket *rs;
+
+ RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
+ fd = raccept (socket, addr, addrlen);
+ RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
+ if (fd < 0)
+ return fd;
+
+ rs = (struct rsocket *) idm_lookup (&idm, fd);
+ if (!rs)
+ {
+ RR_ERR ("panic\n");
+ return -1;
}
- if (flags & SOCK_NONBLOCK) {
- if (0 == (rs->fd_flags & O_NONBLOCK)) {
- RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL));
- ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
- if (0 == ret)
- rs->fd_flags |= O_NONBLOCK;
+ if (flags & SOCK_NONBLOCK)
+ {
+ if (0 == (rs->fd_flags & O_NONBLOCK))
+ {
+ RR_DBG ("orig flag:%x\n",
+ GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL));
+ ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+ if (0 == ret)
+ rs->fd_flags |= O_NONBLOCK;
}
}
- if (flags & SOCK_CLOEXEC) {
- RR_LOG("ignore flag:SOCK_CLOEXEC\n");
+ if (flags & SOCK_CLOEXEC)
+ {
+ RR_LOG ("ignore flag:SOCK_CLOEXEC\n");
}
- return fd;
+ return fd;
}
-
-#endif/* #ifndef _RSOCKET_RS_C_ */
-
+#endif /* #ifndef _RSOCKET_RS_C_ */
diff --git a/stacks/rsocket/src/rsocket_sapi.h b/stacks/rsocket/src/rsocket_sapi.h
index a9ca932..21a7481 100644
--- a/stacks/rsocket/src/rsocket_sapi.h
+++ b/stacks/rsocket/src/rsocket_sapi.h
@@ -14,25 +14,22 @@
* limitations under the License.
*/
-RR_SAPI(socket)
-RR_SAPI(close)
-RR_SAPI(bind)
-RR_SAPI(connect)
-RR_SAPI(getpeername)
-RR_SAPI(getsockname)
-
-RR_SAPI(fcntl)
-RR_SAPI(setsockopt)
-RR_SAPI(getsockopt)
-
-RR_SAPI(read)
-RR_SAPI(write)
-
-RR_SAPI(sendmsg)
-RR_SAPI(recvfrom)
-
-RR_SAPI(poll)
-
-RR_SAPI(epoll_create)
-RR_SAPI(epoll_ctl)
-RR_SAPI(epoll_wait)
+/* *INDENT-OFF* */
+RR_SAPI (socket)
+RR_SAPI (close)
+RR_SAPI (bind)
+RR_SAPI (connect)
+RR_SAPI (getpeername)
+RR_SAPI (getsockname)
+RR_SAPI (fcntl)
+RR_SAPI (setsockopt)
+RR_SAPI (getsockopt)
+RR_SAPI (read)
+RR_SAPI (write)
+RR_SAPI (sendmsg)
+RR_SAPI (recvfrom)
+RR_SAPI (poll)
+RR_SAPI (epoll_create)
+RR_SAPI (epoll_ctl)
+RR_SAPI (epoll_wait)
+/* *INDENT-ON* */