aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket/src
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket/src')
-rw-r--r--stacks/rsocket/src/rsocket_adpt.c218
-rw-r--r--stacks/rsocket/src/rsocket_adpt.h55
-rw-r--r--stacks/rsocket/src/rsocket_in.h33
-rw-r--r--stacks/rsocket/src/rsocket_rdma.h97
-rw-r--r--stacks/rsocket/src/rsocket_rs.c354
-rw-r--r--stacks/rsocket/src/rsocket_sapi.h38
6 files changed, 795 insertions, 0 deletions
diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c
new file mode 100644
index 0000000..eda7fd7
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_adpt.c
@@ -0,0 +1,218 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <dlfcn.h>
+
+#include "nstack_dmm_api.h"
+
+#include "rsocket_adpt.h"
+#include "rdma/rsocket.h"
+
+
+#define RR_EVFD(u64) ((int)((u64) >> 32))
+#define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF))
+#define RR_DATA(evfd, rsfd) ((((uint64_t)(evfd)) << 32) | (uint64_t)(uint32_t)(rsfd))
+
+#define RR_EV_NUM 64
+
+rsocket_var_t g_rr_var = {0};
+
+rr_sapi_t g_sapi = {0};
+
+int g_rr_log_level = -1;
+
+
+int rr_notify_event(void *pdata, int events)
+{
+ int ret;
+
+ ret = g_rr_var.event_cb(pdata, events);
+
+ RR_DBG("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno);
+
+ return ret;
+}
+
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd)
+{
+ int ret;
+ struct epoll_event event;
+ event.events = events;
+ event.data.u64 = RR_DATA(evfd, rsfd);
+ ret = GSAPI(epoll_ctl)(g_rr_var.epfd, op, evfd, &event);
+ return ret;
+}
+
+static void *rr_epoll_thread(void *arg)
+{
+ int i, ret, e;
+ struct epoll_event events[RR_EV_NUM];
+
+ while (1) {
+ ret = GSAPI(epoll_wait)(g_rr_var.epfd, events, RR_EV_NUM, 100);
+ e = errno;
+
+ for (i = 0; i < ret; ++i) {
+ if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) {
+ (void)rr_ep_del(RR_EVFD(events[i].data.u64));
+ }
+ }
+
+ if (ret < 0) {
+ RR_STAT_INC(RR_STAT_EPW_ERR);
+ if (e == EINTR) {
+ RR_STAT_INC(RR_STAT_EPW_EINTR);
+ } else if ( e == ETIMEDOUT) {
+ RR_STAT_INC(RR_STAT_EPW_ETIMEOUT);
+ } else {
+ RR_ERR("epoll_wait()=%d:%d\n", ret, errno);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+
+static int rr_init_sapi()
+{
+ void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
+ if (!handle) {
+ RR_ERR("dlopen(libc.so.6):NULL\n");
+ return -1;
+ }
+
+#define RR_SAPI(name) \
+ GSAPI(name) = dlsym(handle, #name); \
+ if (!GSAPI(name)) \
+ RR_ERR("dlsym(" #name "):NULL\n");
+#include "rsocket_sapi.h"
+#undef RR_SAPI
+
+ return 0;
+}
+
+static void rr_init_log()
+{
+ int level;
+ char *log;
+
+ if (g_rr_log_level >= 0)
+ return;
+
+ log = getenv("RSOCKET_LOG");
+ if (!log || !log[0]) {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
+ }
+
+ level = atoi(log);
+ if (level < 0 || level > 99999) {
+ g_rr_log_level = RR_LOG_OFF;
+ return;
+ }
+
+ g_rr_log_level = level;
+}
+
+static int rsocket_init()
+{
+ int ret;
+
+ rr_init_log();
+
+ if (rr_init_sapi()) {
+ return -1;
+ }
+
+ g_rr_var.epfd = GSAPI(epoll_create)(1);
+ if (g_rr_var.epfd < 0)
+ return g_rr_var.epfd;
+
+ ret = pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
+ if (ret) {
+ GSAPI(close)(g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ return ret;
+ }
+ (void)pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll");
+
+ return 0;
+}
+
+int rsocket_exit()
+{
+ if (g_rr_var.epfd >= 0) {
+ (void)GSAPI(close)(g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ }
+
+ return 0;
+}
+
+unsigned int rsocket_ep_ctl(int epFD, int proFD, int ctl_ops, struct epoll_event * event, void *pdata)
+{
+ int ret;
+ struct eventpoll *ep;
+ unsigned int revents = 0;
+
+ RR_DBG("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, pdata);
+
+ switch (ctl_ops) {
+ case nstack_ep_triggle_add:
+ ret = rr_rs_ep_add(proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
+
+ case nstack_ep_triggle_mod:
+ ret = rr_rs_ep_mod(proFD, pdata, &revents);
+ if (ret)
+ return -1;
+ return revents;
+ case nstack_ep_triggle_del:
+ return rr_rs_ep_del(proFD);
+ }
+
+ return _err(EPERM);
+}
+
+int rsocket_stack_register(nstack_proc_cb *proc_fun, nstack_event_cb *event_ops)
+{
+ rr_init_log();
+
+#define NSTACK_MK_DECL(ret, fn, args) \
+ do { \
+ proc_fun->socket_ops.pf##fn = dlsym(event_ops->handle, "r"#fn); \
+ if (!proc_fun->socket_ops.pf##fn) \
+ RR_LOG("socket API '" #fn "' not found\n"); \
+ } while (0)
+#include "declare_syscalls.h"
+#undef NSTACK_MK_DECL
+
+ proc_fun->extern_ops.module_init = rsocket_init;
+ proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl;
+ proc_fun->extern_ops.ep_getevt = NULL;
+
+ g_rr_var.type = event_ops->type;
+ g_rr_var.event_cb = event_ops->event_cb;
+
+ return 0;
+}
+
diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h
new file mode 100644
index 0000000..43ec6cf
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_adpt.h
@@ -0,0 +1,55 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_ADPT_H_
+#define _RSOCKET_ADPT_H_
+
+#include "indexer.h"
+#include "rsocket_rdma.h"
+
+enum {
+ RR_STAT_EPW_ERR,
+ RR_STAT_EPW_EINTR,
+ RR_STAT_EPW_ETIMEOUT,
+
+ RR_STAT_NUM
+};
+
+#define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num)
+#define RR_STAT_SUB(id, num) __sync_sub_and_fetch(&g_rr_var.stat[(id)], num)
+#define RR_STAT_INC(id) RR_STAT_ADD((id), 1)
+#define RR_STAT_DEC(id) RR_STAT_SUB((id), 1)
+
+#define RSRDMA_EXIT 1
+
+typedef struct rsocket_var {
+ pthread_t epoll_threadid;
+
+ int epfd;
+ int type;
+ int (*event_cb) (void *pdata, int events);
+
+ uint64_t stat[RR_STAT_NUM];
+} rsocket_var_t;
+
+extern rsocket_var_t g_rr_var;
+
+
+int rr_rs_handle(int fd, uint32_t events);
+
+
+#endif/* #ifndef _RSOCKET_ADPT_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_in.h b/stacks/rsocket/src/rsocket_in.h
new file mode 100644
index 0000000..b4e1ae6
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_in.h
@@ -0,0 +1,33 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_IN_H_
+#define _RSOCKET_IN_H_
+
+#include "rsocket_rdma.h"
+
+inline static void rr_rs_init(struct rsocket *rs);
+inline static void rr_rs_dest(struct rsocket *rs);
+
+static inline void rr_rs_notify_tcp(struct rsocket *rs);
+static inline void rr_rs_notify_udp(struct rsocket *rs);
+
+inline static void rr_rs_handle_tcp(struct rsocket *rs);
+inline static int rr_rs_evfd(struct rsocket *rs);
+inline static void rr_rs_connected(struct rsocket *rs);
+
+#endif/* #ifndef _RSOCKET_IN_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h
new file mode 100644
index 0000000..076b6c9
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_rdma.h
@@ -0,0 +1,97 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_RDMA_H_
+#define _RSOCKET_RDMA_H_
+
+#include <stdint.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+
+enum {
+ RR_LOG_OFF = 0x00,
+ RR_LOG_ERR = 0x01,
+ RR_LOG_WRN = 0x02,
+ RR_LOG_LOG = 0x03,
+ RR_LOG_DBG = 0x04,
+};
+
+#define RR_OUT(level, name, fmt, arg...) do { \
+ extern int g_rr_log_level; \
+ if (g_rr_log_level >= level) { \
+ (void)printf("#RSOCKET:%s# %s:%d> " fmt "", \
+ name, __func__, __LINE__, ##arg); \
+ } \
+} while (0)
+
+#define RR_ERR(fmt, arg...) RR_OUT(RR_LOG_ERR, "ERR", fmt, ##arg)
+#define RR_WRN(fmt, arg...) RR_OUT(RR_LOG_WRN, "WRN", fmt, ##arg)
+#define RR_LOG(fmt, arg...) RR_OUT(RR_LOG_LOG, "LOG", fmt, ##arg)
+#if defined(RSOCKET_DEBUG) && RSOCKET_DEBUG > 0
+#define RR_DBG(fmt, arg...) RR_OUT(RR_LOG_DBG, "DBG", fmt, ##arg)
+#else
+#define RR_DBG(fmt, arg...) ((void)0)
+#endif
+
+
+#define _err(err_no) ((errno = (err_no)), -1)
+
+
+int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent);
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent);
+int rr_rs_ep_del(int fd);
+
+uint32_t rr_rs_poll(int fd, uint32_t revents);
+
+int rr_notify_event(void *pdata, int events);
+
+
+typedef struct rr_socket_api {
+ #define RR_SAPI(name) typeof(name) *n_##name;/* native api */
+ #include "rsocket_sapi.h"
+ #undef RR_SAPI
+} rr_sapi_t;
+
+extern rr_sapi_t g_sapi;
+
+#define GSAPI(name) g_sapi.n_##name
+
+
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd);
+
+inline static int rr_ep_add(int evfd, int rsfd)
+{
+ return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, rsfd);
+}
+
+inline static int rr_ep_del(int evfd)
+{
+ if (evfd < 0)
+ return 0;
+ return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0);
+}
+
+
+#endif/* #ifndef _RSOCKET_RDMA_H_ */
+
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c
new file mode 100644
index 0000000..ca92c3d
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_rs.c
@@ -0,0 +1,354 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _RSOCKET_RS_C_
+#define _RSOCKET_RS_C_
+
+
+inline static void rr_rs_init(struct rsocket *rs)
+{
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
+}
+
+inline static void rr_rs_dest(struct rsocket *rs)
+{
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
+
+ if (rs->rr_epoll_ref) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
+ }
+}
+
+#ifndef POLL__RSOCKET_RS_H_
+#define POLL__RSOCKET_RS_H_
+
+static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs)
+{
+ uint32_t events = 0;
+ if (rs->state & rs_connected) {
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (rs_can_send(rs))
+ events |= EPOLLOUT;
+ }
+ if (rs->state & (rs_error | rs_connect_error))
+ events |= EPOLLERR;
+ if (rs->state & rs_disconnected)
+ events |= EPOLLHUP;
+ return events;
+}
+
+static inline uint32_t rr_rs_poll_udp(struct rsocket *rs)
+{
+ uint32_t events = 0;
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (ds_can_send(rs))
+ events |= EPOLLOUT;
+ if (rs->state & rs_error)
+ events |= EPOLLERR;
+ return events;
+}
+
+static inline uint32_t rr_rs_poll_both(struct rsocket *rs)
+{
+ if (rs->type == SOCK_STREAM)
+ return rr_rs_poll_tcp(rs);
+
+ if (rs->type == SOCK_DGRAM)
+ return rr_rs_poll_udp(rs);
+
+ return 0;
+}
+
+uint32_t rr_rs_poll(int fd, uint32_t revents)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+
+ if (!rs)
+ return 0;
+
+ if (rs->state == rs_listening)
+ return revents;
+
+ return rr_rs_poll_both(rs);
+}
+
+#endif/* #ifndef POLL__RSOCKET_RS_H_ */
+
+
+static inline void rr_rs_notify_tcp(struct rsocket *rs)
+{
+ if (rs->rr_epoll_ref) {
+ uint32_t events = rr_rs_poll_tcp(rs);
+ if (events)
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+}
+
+static inline void rr_rs_notify_udp(struct rsocket *rs)
+{
+ if (rs->rr_epoll_ref) {
+ uint32_t events = rr_rs_poll_udp(rs);
+ if (events)
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+}
+
+#ifndef HANDLE__RSOCKET_RS_H_
+#define HANDLE__RSOCKET_RS_H_
+
+inline static void rr_rs_handle_tcp(struct rsocket *rs)
+{
+ int ret;
+
+ RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state);
+
+ if (!(rs->state & (rs_connected | rs_opening)))
+ return;
+
+ fastlock_acquire(&rs->cq_wait_lock);
+ ret = rs_get_cq_event(rs);
+ RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
+ fastlock_release(&rs->cq_wait_lock);
+
+ fastlock_acquire(&rs->cq_lock);
+
+ if (rs->state & rs_connected) {
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
+ rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
+ }
+
+ if (rs->rr_epoll_ref && rs->cq_armed < 1) {
+ ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ if (0 == ret)
+ __sync_fetch_and_add(&rs->cq_armed, 1);
+ }
+
+ if (rs->state & rs_connected) {
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ rs_update_credits(rs);
+ }
+
+ fastlock_release(&rs->cq_lock);
+
+ RR_DBG("(%d)=\n", rs->index);
+}
+
+inline static void rr_rs_handle_udp(struct rsocket *rs)
+{
+ fastlock_acquire(&rs->cq_wait_lock);
+ ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
+
+ fastlock_acquire(&rs->cq_lock);
+ ds_poll_cqs(rs);
+ if (rs->rr_epoll_ref && !rs->cq_armed) {
+ ds_req_notify_cqs(rs);
+ rs->cq_armed = 1;
+ }
+ fastlock_release(&rs->cq_lock);
+}
+
+inline static void rr_rs_handle_rs(struct rsocket *rs)
+{
+ if (rs->state & rs_opening) {
+ int ret = rs_do_connect(rs);
+ RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
+ return;
+ }
+
+ if (rs->type == SOCK_STREAM) {
+ rr_rs_handle_tcp(rs);
+ }
+
+ if (rs->type == SOCK_DGRAM) {
+ rr_rs_handle_udp(rs);
+ }
+}
+
+int rr_rs_handle(int fd, uint32_t events)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+
+ RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
+
+ if (!rs)
+ return _err(EBADF);
+
+ if (rs->state == rs_listening) {
+ if (events & EPOLLIN) {
+ (void)rr_notify_event(rs->rr_epoll_pdata, events);
+ }
+ return 0;
+ }
+
+ rr_rs_handle_rs(rs);
+
+ return 0;
+}
+
+
+#endif/* #ifndef HANDLE__RSOCKET_RS_H_ */
+
+#ifndef ADPT__RSOCKET_RS_H_
+#define ADPT__RSOCKET_RS_H_
+
+inline static int rr_rs_evfd(struct rsocket *rs)
+{
+ if (rs->type == SOCK_STREAM) {
+ if (rs->state >= rs_connected)
+ return rs->cm_id->recv_cq_channel->fd;
+ else
+ return rs->cm_id->channel->fd;
+ } else {
+ return rs->epfd;
+ }
+
+ return -1;
+}
+
+int rr_rs_ep_add(int fd, void *pdata, uint32_t *revent)
+{
+ int ref;
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
+
+ ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1);
+ if (1 == ref) {
+ rs->rr_epoll_fd = rr_rs_evfd(rs);
+ (void)rr_ep_add(rs->rr_epoll_fd, rs->index);
+ }
+
+ (void)rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+
+ rs->rr_epoll_pdata = pdata;
+
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
+}
+
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t *revent)
+{
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
+
+ if (rs->rr_epoll_ref <= 0)
+ return _err(ENOENT);
+
+ (void)rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
+
+ rs->rr_epoll_pdata = pdata;
+
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
+}
+
+int rr_rs_ep_del(int fd)
+{
+ int ref;
+ struct rsocket *rs = (struct rsocket*)idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p))\n", fd, rs);
+
+ if (!rs)
+ return _err(EBADF);
+
+ ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1);
+ if (0 == ref) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = -1;
+ }
+
+ return 0;
+}
+
+#endif/* #ifndef ADPT__RSOCKET_RS_H_ */
+
+inline static void rr_rs_connected(struct rsocket *rs)
+{
+ RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
+ rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state);
+
+ if (!(rs->state & rs_connected)) {
+ rr_rs_notify_tcp(rs);
+ return;
+ }
+
+ if (rs->rr_epoll_ref) {
+ int evfd = rr_rs_evfd(rs);
+
+ if (evfd != rs->rr_epoll_fd) {
+ (void)rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = evfd;
+ (void)rr_ep_add(evfd, rs->index);
+ }
+
+ rr_rs_handle_tcp(rs);
+ }
+}
+
+int raccept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags)
+{
+ int ret, fd;
+ struct rsocket *rs;
+
+ RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
+ fd = raccept(socket, addr, addrlen);
+ RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
+ if (fd < 0)
+ return fd;
+
+ rs = (struct rsocket*)idm_lookup(&idm, fd);
+ if (!rs) {
+ RR_ERR("panic\n");
+ return -1;
+ }
+
+ if (flags & SOCK_NONBLOCK) {
+ if (0 == (rs->fd_flags & O_NONBLOCK)) {
+ RR_DBG("orig flag:%x\n", GSAPI(fcntl)(rs->cm_id->channel->fd, F_GETFL));
+ ret = GSAPI(fcntl)(rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+ if (0 == ret)
+ rs->fd_flags |= O_NONBLOCK;
+ }
+ }
+
+ if (flags & SOCK_CLOEXEC) {
+ RR_LOG("ignore flag:SOCK_CLOEXEC\n");
+ }
+
+ return fd;
+}
+
+
+#endif/* #ifndef _RSOCKET_RS_C_ */
+
diff --git a/stacks/rsocket/src/rsocket_sapi.h b/stacks/rsocket/src/rsocket_sapi.h
new file mode 100644
index 0000000..a9ca932
--- /dev/null
+++ b/stacks/rsocket/src/rsocket_sapi.h
@@ -0,0 +1,38 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+RR_SAPI(socket)
+RR_SAPI(close)
+RR_SAPI(bind)
+RR_SAPI(connect)
+RR_SAPI(getpeername)
+RR_SAPI(getsockname)
+
+RR_SAPI(fcntl)
+RR_SAPI(setsockopt)
+RR_SAPI(getsockopt)
+
+RR_SAPI(read)
+RR_SAPI(write)
+
+RR_SAPI(sendmsg)
+RR_SAPI(recvfrom)
+
+RR_SAPI(poll)
+
+RR_SAPI(epoll_create)
+RR_SAPI(epoll_ctl)
+RR_SAPI(epoll_wait)