aboutsummaryrefslogtreecommitdiffstats
path: root/stacks/rsocket
diff options
context:
space:
mode:
Diffstat (limited to 'stacks/rsocket')
-rw-r--r--stacks/rsocket/CMakeLists.txt42
-rw-r--r--stacks/rsocket/build/.gitkeep0
-rw-r--r--stacks/rsocket/configure/module_config.json10
-rw-r--r--stacks/rsocket/configure/rd_config.json31
-rw-r--r--stacks/rsocket/doc/README.md8
-rw-r--r--stacks/rsocket/src/rsocket_adpt.c258
-rw-r--r--stacks/rsocket/src/rsocket_adpt.h23
-rw-r--r--stacks/rsocket/src/rsocket_rdma.h40
-rw-r--r--stacks/rsocket/src/rsocket_rs.c422
9 files changed, 442 insertions, 392 deletions
diff --git a/stacks/rsocket/CMakeLists.txt b/stacks/rsocket/CMakeLists.txt
index 6ba868b..6516961 100644
--- a/stacks/rsocket/CMakeLists.txt
+++ b/stacks/rsocket/CMakeLists.txt
@@ -13,40 +13,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#########################################################################
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.11)
+#PROJECT(nStack)
+SET(CMAKE_SKIP_RPATH TRUE)
+SET(CMAKE_C_COMPILER "gcc")
+SET(OS_RELEASE "" CACHE STRING "User-specified OS release.")
+SET(EXECUTABLE_PATH ${CMAKE_CURRENT_LIST_DIR}/../../release/bin)
+SET(LIB_PATH_STATIC ${CMAKE_SOURCE_DIR}/build)
+SET(LIB_PATH_SHARED ${CMAKE_CURRENT_LIST_DIR}/../../release/lib64)
+SET(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${LIB_PATH_STATIC})
+SET(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${EXECUTABLE_PATH})
+SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${LIB_PATH_SHARED})
+
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g -fPIE -pie -fPIC -m64 -mssse3 -std=gnu89")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wshadow -Wfloat-equal -Wformat=2")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fstack-protector -fstack-protector-all")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wl,-z,relro,-z,now -Wl,--disable-new-dtags")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wl,-z,noexecstack -mcmodel=medium")
SET(rdmacm_dir librdmacm-1.1.0)
-SET(dmm_inc_dir ${DMM_REL_INC_DIR})
+SET(dmm_inc_dir ${CMAKE_CURRENT_LIST_DIR}/../../release/include/)
+SET(DMM_REL_INC_DIR ${dmm_inc_dir})
+SET(dmm_src_inc_dir ${CMAKE_SOURCE_DIR}/src/include/)
SET(RSOCKET_DEBUG 1)
########################
-
SET(rdmacm_url https://github.com/ofiwg/librdmacm/archive/v1.1.0.tar.gz)
-
+if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz")
+ SET(RDMA_DOWNLOAD_CMD tar -xvf ${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz)
+else()
+ SET(RDMA_DOWNLOAD_CMD wget --no-check-certificate ${rdmacm_url} && tar -xvf ${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz)
+endif()
INCLUDE(ExternalProject)
ExternalProject_Add(
rdmacm
- URL ${rdmacm_url}
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/${rdmacm_dir}
DOWNLOAD_DIR ${CMAKE_CURRENT_LIST_DIR}
+ DOWNLOAD_COMMAND ${RDMA_DOWNLOAD_CMD}
PATCH_COMMAND patch -p1 -i ../rsocket.patch
CONFIGURE_COMMAND ./autogen.sh && ./configure dmm_inc_dir=${DMM_REL_INC_DIR} RSOCKET_DEBUG=${RSOCKET_DEBUG}
BUILD_IN_SOURCE 1
BUILD_COMMAND make
INSTALL_COMMAND cp -f libdmm_rdmacm.a ${LIB_PATH_STATIC}/
- DEPENDS DPDK
)
-set_target_properties(rdmacm PROPERTIES EXCLUDE_FROM_ALL TRUE)
########################
-SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2 -g -fPIC -m64 -pthread")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g -fPIC -m64 -pthread")
ADD_LIBRARY(dmm_rsocket SHARED src/rsocket_adpt.c)
ADD_DEFINITIONS(-D_GNU_SOURCE -DRSOCKET_DEBUG=${RSOCKET_DEBUG})
-INCLUDE_DIRECTORIES(${DMM_REL_INC_DIR})
+INCLUDE_DIRECTORIES(
+ ${DMM_REL_INC_DIR} ${dmm_src_inc_dir}
+)
INCLUDE_DIRECTORIES(./src ${rdmacm_dir} ${rdmacm_dir}/include ${rdmacm_dir}/src)
TARGET_LINK_LIBRARIES(dmm_rsocket
@@ -56,6 +78,4 @@ TARGET_LINK_LIBRARIES(dmm_rsocket
ibverbs pthread dl rt
)
-ADD_DEPENDENCIES(dmm_rsocket rdmacm DPDK)
-
-set_target_properties(dmm_rsocket PROPERTIES EXCLUDE_FROM_ALL TRUE)
+ADD_DEPENDENCIES(dmm_rsocket rdmacm)
diff --git a/stacks/rsocket/build/.gitkeep b/stacks/rsocket/build/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/stacks/rsocket/build/.gitkeep
diff --git a/stacks/rsocket/configure/module_config.json b/stacks/rsocket/configure/module_config.json
index 736de24..2df82cd 100644
--- a/stacks/rsocket/configure/module_config.json
+++ b/stacks/rsocket/configure/module_config.json
@@ -3,17 +3,27 @@
"module_list": [
{
"stack_name": "kernel", /*stack name*/
+ "function_name": "kernel_stack_register", /*function name*/
"libname": "./", /*library name, if loadtype is static, this maybe
null, else must give a library name*/
+ "loadtype": "static", /*library load type: static or dynamic*/
"deploytype": "1", /*deploy model type:model type1, model type2,
model type3. Indicating single or multi process
deployment. Used during shared memory initialization.*/
+ "maxfd": "1024", /*the max fd supported*/
+ "minfd": "0", /*the min fd supported*/
+ "priorty": "1", /*priorty when executing, reserv*/
"stackid": "0", /*stack id, this must be ordered and not be repeated*/
},
{
"stack_name": "rsocket",
+ "function_name": "rsocket_stack_register",
"libname": "libdmm_rsocket.so",
+ "loadtype": "dynmic",
"deploytype": "1",
+ "maxfd": "1024",
+ "minfd": "0",
+ "priorty": "1",
"stackid": "1",
},
]
diff --git a/stacks/rsocket/configure/rd_config.json b/stacks/rsocket/configure/rd_config.json
index 5c6f861..d89afcf 100644
--- a/stacks/rsocket/configure/rd_config.json
+++ b/stacks/rsocket/configure/rd_config.json
@@ -1,10 +1,25 @@
{
- "ip_route": [
- {
- "subnet": "192.168.1.1/24",
- "stack_name": "rsocket",
- },
- ],
- "prot_route": [
- ],
+ "modules": [
+ {
+ "name": "kernel",
+ "ip_route": [
+ "127.0.0.1/24"
+ ],
+ "protocol_route": [
+ ],
+ "type_route": [
+ ]
+ },
+ {
+ "name": "rsocket",
+ "ip_route": [
+ "192.168.21.1/24"
+
+ ],
+ "protocol_route": [
+ ],
+ "type_route": [
+ ]
+ }
+ ]
}
diff --git a/stacks/rsocket/doc/README.md b/stacks/rsocket/doc/README.md
index 6cdacfb..237db68 100644
--- a/stacks/rsocket/doc/README.md
+++ b/stacks/rsocket/doc/README.md
@@ -34,10 +34,10 @@ dmm/release/lib64/libdmm_rsocket.so
```sh
#export LD_LIBRARY_PATH=${dmm}/release/lib64
#export LD_PRELOAD=${dmm}/release/lib64/libnStackAPI.so
- #export NSTACK_MOD_CFG_FILE=${dmm}/stacks/rsocket/configure/module_config.json
- #export NSTACK_MOD_CFG_RD=${dmm}/stacks/rsocket/configure/rd_config.json
+ #export NSTACK_MOD_CFG_FILE=${dmm}/stacks/rsocket/config/module_config.json
+ #export NSTACK_MOD_CFG_RD=${dmm}/stacks/rsocket/config/rd_config.json
```
-- Steps 2: Modify rd_config.json(located at dmm/stacks/rsocket/configure/)
+- Steps 2: Modify rd_config.json(located at dmm/stacks/rsocket/config/)
```sh
#vim rd_config.json
//set "subnet": "192.168.21.1/24"
@@ -107,4 +107,4 @@ all using GSAPI macro control.
https://wiki.fd.io/view/DMM
https://github.com/ofiwg/librdmacm/blob/master/docs/rsocket
https://github.com/rsocket/rsocket
-http://www.mellanox.com/page/products_dyn?product_family=26
+http://www.mellanox.com/page/products_dyn?product_family=26 \ No newline at end of file
diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c
index 2839909..9496e96 100644
--- a/stacks/rsocket/src/rsocket_adpt.c
+++ b/stacks/rsocket/src/rsocket_adpt.c
@@ -19,10 +19,11 @@
#include <errno.h>
#include <dlfcn.h>
-#include "nstack_dmm_api.h"
-
+#include "nstack_callback_ops.h"
#include "rsocket_adpt.h"
#include "rdma/rsocket.h"
+#include "nstack_epoll_api.h"
+#include "nstack_rd_api.h"
#define RR_EVFD(u64) ((int)((u64) >> 32))
#define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF))
@@ -36,77 +37,69 @@ rr_sapi_t g_sapi = { 0 };
int g_rr_log_level = -1;
-int
-rr_notify_event (void *pdata, int events)
-{
- int ret;
-
- ret = g_rr_var.event_cb (pdata, events);
-
- RR_DBG ("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno);
+void *rrd_table = NULL;
- return ret;
+void rr_notify_event(void *pdata, int events)
+{
+ g_rr_var.event_cb(pdata, events, EVENT_INFORM_APP);
}
-int
-rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd)
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd)
{
- int ret;
- struct epoll_event event;
- event.events = events;
- event.data.u64 = RR_DATA (evfd, rsfd);
- ret = GSAPI (epoll_ctl) (g_rr_var.epfd, op, evfd, &event);
- return ret;
+ int ret;
+ struct epoll_event event;
+ event.events = events;
+ event.data.u64 = RR_DATA(evfd, rsfd);
+ ret = GSAPI(epoll_ctl) (g_rr_var.epfd, op, evfd, &event);
+ return ret;
}
-static void *
-rr_epoll_thread (void *arg)
+static void *rr_epoll_thread(void *arg)
{
- int i, ret, e;
- struct epoll_event events[RR_EV_NUM];
+ int i, ret, e;
+ struct epoll_event events[RR_EV_NUM];
- while (1)
+ while (1)
{
- ret = GSAPI (epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100);
- e = errno;
+ ret = GSAPI(epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100);
+ e = errno;
- for (i = 0; i < ret; ++i)
+ for (i = 0; i < ret; ++i)
{
- if (rr_rs_handle (RR_RSFD (events[i].data.u64), events[i].events))
+ if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events))
{
- (void) rr_ep_del (RR_EVFD (events[i].data.u64));
+ (void) rr_ep_del(RR_EVFD(events[i].data.u64));
}
}
- if (ret < 0)
+ if (ret < 0)
{
- RR_STAT_INC (RR_STAT_EPW_ERR);
- if (e == EINTR)
+ RR_STAT_INC(RR_STAT_EPW_ERR);
+ if (e == EINTR)
{
- RR_STAT_INC (RR_STAT_EPW_EINTR);
+ RR_STAT_INC(RR_STAT_EPW_EINTR);
}
- else if (e == ETIMEDOUT)
+ else if (e == ETIMEDOUT)
{
- RR_STAT_INC (RR_STAT_EPW_ETIMEOUT);
+ RR_STAT_INC(RR_STAT_EPW_ETIMEOUT);
}
- else
+ else
{
- RR_ERR ("epoll_wait()=%d:%d\n", ret, errno);
+ RR_ERR("epoll_wait()=%d:%d\n", ret, errno);
}
}
}
- return NULL;
+ return NULL;
}
-static int
-rr_init_sapi ()
+static int rr_init_sapi()
{
- void *handle = dlopen ("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
- if (!handle)
+ void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL);
+ if (!handle)
{
- RR_ERR ("dlopen(libc.so.6):NULL\n");
- return -1;
+ RR_ERR("dlopen(libc.so.6):NULL\n");
+ return -1;
}
#define RR_SAPI(name) \
@@ -116,129 +109,148 @@ rr_init_sapi ()
#include "rsocket_sapi.h"
#undef RR_SAPI
- return 0;
+ return 0;
}
-static void
-rr_init_log ()
+static void rr_init_log()
{
- int level;
- char *log;
+ int level;
+ char *log;
- if (g_rr_log_level >= 0)
- return;
+ if (g_rr_log_level >= 0)
+ return;
- log = getenv ("RSOCKET_LOG");
- if (!log || !log[0])
+ log = getenv("RSOCKET_LOG");
+ if (!log || !log[0])
{
- g_rr_log_level = RR_LOG_OFF;
- return;
+ g_rr_log_level = RR_LOG_OFF;
+ return;
}
- level = atoi (log);
- if (level < 0 || level > 99999)
+ level = atoi(log);
+ if (level < 0 || level > 99999)
{
- g_rr_log_level = RR_LOG_OFF;
- return;
+ g_rr_log_level = RR_LOG_OFF;
+ return;
}
- g_rr_log_level = level;
+ g_rr_log_level = level;
}
-static int
-rsocket_init ()
+void *rsocket_get_ip_shmem()
{
- int ret;
+ return rrd_table;
+}
- rr_init_log ();
+static int rsocket_init()
+{
+ int ret;
+ rrd_table = nstack_local_rd_malloc();
+ if (!rrd_table)
+ {
+ RR_ERR("rsocket rd table create failed!");
+ return -1;
+ }
- if (rr_init_sapi ())
+ if (nstack_rd_parse("rsocket", rrd_table))
{
- return -1;
+ RR_WRN("no rd data got!");
+ RR_WRN("rsocket parse rd data failed");
+ nstack_rd_table_clear(rrd_table);
+ return -1;
}
- g_rr_var.epfd = GSAPI (epoll_create) (1);
- if (g_rr_var.epfd < 0)
- return g_rr_var.epfd;
+ rr_init_log();
- ret =
- pthread_create (&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
- if (ret)
+ if (rr_init_sapi())
{
- GSAPI (close) (g_rr_var.epfd);
- g_rr_var.epfd = -1;
- return ret;
+ return -1;
}
- (void) pthread_setname_np (g_rr_var.epoll_threadid, "rsocket_epoll");
- return 0;
-}
+ g_rr_var.epfd = GSAPI(epoll_create) (1);
-int
-rsocket_exit ()
-{
- if (g_rr_var.epfd >= 0)
+ if (g_rr_var.epfd < 0)
{
- (void) GSAPI (close) (g_rr_var.epfd);
- g_rr_var.epfd = -1;
+ return g_rr_var.epfd;
}
- return 0;
+ ret =
+ pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL);
+ if (ret)
+ {
+ GSAPI(close) (g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ return ret;
+ }
+ (void) pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll");
+
+ return 0;
}
-unsigned int
-rsocket_ep_ctl (int epFD, int proFD, int ctl_ops, struct epoll_event *event,
- void *pdata)
+int rsocket_exit()
{
- int ret;
- struct eventpoll *ep;
- unsigned int revents = 0;
+ if (g_rr_var.epfd >= 0)
+ {
+ (void) GSAPI(close) (g_rr_var.epfd);
+ g_rr_var.epfd = -1;
+ }
- RR_DBG ("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events,
- pdata);
+ return 0;
+}
- switch (ctl_ops)
+void *rsocket_ep_ctl(int proFD, int ctl_ops, void *pdata, void *event)
+{
+ int ret;
+ unsigned int revents = 0;
+ switch (ctl_ops)
{
- case nstack_ep_triggle_add:
- ret = rr_rs_ep_add (proFD, pdata, &revents);
- if (ret)
- return -1;
- return revents;
-
- case nstack_ep_triggle_mod:
- ret = rr_rs_ep_mod (proFD, pdata, &revents);
- if (ret)
- return -1;
- return revents;
- case nstack_ep_triggle_del:
- return rr_rs_ep_del (proFD);
+ case nstack_ep_triggle_add:
+ ret = rr_rs_ep_add(proFD, pdata, &revents);
+ if (ret)
+ return NULL;
+ *(int *) event = revents;
+ return pdata;
+ case nstack_ep_triggle_mod:
+ ret = rr_rs_ep_mod(proFD, pdata, &revents);
+ if (ret)
+ return NULL;
+ *(int *) event = revents;
+ return pdata;
+ case nstack_ep_triggle_del:
+ rr_rs_ep_del(proFD);
}
- return _err (EPERM);
+ return pdata;
+
}
-int
-rsocket_stack_register (nstack_proc_cb * proc_fun,
- nstack_event_cb * event_ops)
+int rsocket_getEvt(int fd)
{
- rr_init_log ();
+ return rr_getEvt(fd);
+}
+
+int rsocket_stack_register(nstack_socket_ops * ops,
+ nstack_event_ops * event_ops,
+ nstack_proc_ops * proc_fun)
+{
+ rr_init_log();
#define NSTACK_MK_DECL(ret, fn, args) \
do { \
- proc_fun->socket_ops.pf##fn = dlsym(event_ops->handle, "r"#fn); \
- if (!proc_fun->socket_ops.pf##fn) \
+ ops->pf##fn = dlsym(event_ops->handle, "r"#fn); \
+ if (!ops->pf##fn) \
RR_LOG("socket API '" #fn "' not found\n"); \
} while (0)
-#include "declare_syscalls.h"
+#include "declare_syscalls.h.tmpl"
#undef NSTACK_MK_DECL
- proc_fun->extern_ops.module_init = rsocket_init;
- proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl;
- proc_fun->extern_ops.ep_getevt = NULL;
- proc_fun->extern_ops.module_init_child = rsocket_init;
-
- g_rr_var.type = event_ops->type;
- g_rr_var.event_cb = event_ops->event_cb;
+ proc_fun->module_init = rsocket_init;
+ proc_fun->ep_triggle = rsocket_ep_ctl;
+ proc_fun->ep_getEvt = rsocket_getEvt;
+ proc_fun->get_ip_shmem = rsocket_get_ip_shmem;
+ proc_fun->fork_init_child = rsocket_init;
+ g_rr_var.type = event_ops->type;
+ g_rr_var.event_cb = event_ops->event_cb;
- return 0;
+ return 0;
}
diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h
index 9c53330..37a2e88 100644
--- a/stacks/rsocket/src/rsocket_adpt.h
+++ b/stacks/rsocket/src/rsocket_adpt.h
@@ -22,11 +22,11 @@
enum
{
- RR_STAT_EPW_ERR,
- RR_STAT_EPW_EINTR,
- RR_STAT_EPW_ETIMEOUT,
+ RR_STAT_EPW_ERR,
+ RR_STAT_EPW_EINTR,
+ RR_STAT_EPW_ETIMEOUT,
- RR_STAT_NUM
+ RR_STAT_NUM
};
#define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num)
@@ -38,17 +38,16 @@ enum
typedef struct rsocket_var
{
- pthread_t epoll_threadid;
+ pthread_t epoll_threadid;
- int epfd;
- int type;
- int (*event_cb) (void *pdata, int events);
+ int epfd;
+ int type;
+ void (*event_cb) (void *pdata, int events, int postFlag);
- uint64_t stat[RR_STAT_NUM];
+ uint64_t stat[RR_STAT_NUM];
} rsocket_var_t;
extern rsocket_var_t g_rr_var;
-
-int rr_rs_handle (int fd, uint32_t events);
-
+int rsocket_getEvt(int fd);
+int rr_rs_handle(int fd, uint32_t events);
#endif /* #ifndef _RSOCKET_ADPT_H_ */
diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h
index 75f4268..af66285 100644
--- a/stacks/rsocket/src/rsocket_rdma.h
+++ b/stacks/rsocket/src/rsocket_rdma.h
@@ -31,11 +31,11 @@
enum
{
- RR_LOG_OFF = 0x00,
- RR_LOG_ERR = 0x01,
- RR_LOG_WRN = 0x02,
- RR_LOG_LOG = 0x03,
- RR_LOG_DBG = 0x04,
+ RR_LOG_OFF = 0x00,
+ RR_LOG_ERR = 0x01,
+ RR_LOG_WRN = 0x02,
+ RR_LOG_LOG = 0x03,
+ RR_LOG_DBG = 0x04,
};
#define RR_OUT(level, name, fmt, arg...) do { \
@@ -57,13 +57,13 @@ enum
#define _err(err_no) ((errno = (err_no)), -1)
-int rr_rs_ep_add (int fd, void *pdata, uint32_t * revent);
-int rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent);
-int rr_rs_ep_del (int fd);
+int rr_rs_ep_add(int fd, void *pdata, uint32_t * revent);
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t * revent);
+int rr_rs_ep_del(int fd);
-uint32_t rr_rs_poll (int fd, uint32_t revents);
-
-int rr_notify_event (void *pdata, int events);
+uint32_t rr_rs_poll(int fd, uint32_t revents);
+int rr_getEvt(int fd);
+void rr_notify_event(void *pdata, int events);
typedef struct rr_socket_api
{
@@ -76,21 +76,19 @@ extern rr_sapi_t g_sapi;
#define GSAPI(name) g_sapi.n_##name
-int rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd);
+int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd);
-inline static int
-rr_ep_add (int evfd, int rsfd)
+inline static int rr_ep_add(int evfd, int rsfd)
{
- return rr_epoll_ctl (EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT,
- rsfd);
+ return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT,
+ rsfd);
}
-inline static int
-rr_ep_del (int evfd)
+inline static int rr_ep_del(int evfd)
{
- if (evfd < 0)
- return 0;
- return rr_epoll_ctl (EPOLL_CTL_DEL, evfd, 0, 0);
+ if (evfd < 0)
+ return 0;
+ return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0);
}
#endif /* #ifndef _RSOCKET_RDMA_H_ */
diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c
index 0f4e73f..79c4914 100644
--- a/stacks/rsocket/src/rsocket_rs.c
+++ b/stacks/rsocket/src/rsocket_rs.c
@@ -17,221 +17,222 @@
#ifndef _RSOCKET_RS_C_
#define _RSOCKET_RS_C_
-inline static void
-rr_rs_init (struct rsocket *rs)
+inline static void rr_rs_init(struct rsocket *rs)
{
- RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
-inline static void
-rr_rs_dest (struct rsocket *rs)
+inline static void rr_rs_dest(struct rsocket *rs)
{
- RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index);
+ RR_DBG("(rs:%p{index:%d})\n", rs, rs->index);
- if (rs->rr_epoll_ref)
+ if (rs->rr_epoll_ref)
{
- (void) rr_ep_del (rs->rr_epoll_fd);
- rs->rr_epoll_ref = 0;
- rs->rr_epoll_fd = -1;
- rs->rr_epoll_pdata = NULL;
+ (void) rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_ref = 0;
+ rs->rr_epoll_fd = -1;
+ rs->rr_epoll_pdata = NULL;
}
}
#ifndef POLL__RSOCKET_RS_H_
#define POLL__RSOCKET_RS_H_
-static inline uint32_t
-rr_rs_poll_tcp (struct rsocket *rs)
+static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs->state & rs_connected)
+ uint32_t events = 0;
+ if (rs->state & rs_connected)
{
- if (rs_have_rdata (rs))
- events |= EPOLLIN;
- if (rs_can_send (rs))
- events |= EPOLLOUT;
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (rs_can_send(rs))
+ events |= EPOLLOUT;
}
- if (rs->state & (rs_error | rs_connect_error))
- events |= EPOLLERR;
- if (rs->state & rs_disconnected)
- events |= EPOLLHUP;
- return events;
+ if (rs->state & (rs_error | rs_connect_error))
+ events |= EPOLLERR;
+ if (rs->state & rs_disconnected)
+ events |= EPOLLHUP;
+ return events;
}
-static inline uint32_t
-rr_rs_poll_udp (struct rsocket *rs)
+static inline uint32_t rr_rs_poll_udp(struct rsocket *rs)
{
- uint32_t events = 0;
- if (rs_have_rdata (rs))
- events |= EPOLLIN;
- if (ds_can_send (rs))
- events |= EPOLLOUT;
- if (rs->state & rs_error)
- events |= EPOLLERR;
- return events;
+ uint32_t events = 0;
+ if (rs_have_rdata(rs))
+ events |= EPOLLIN;
+ if (ds_can_send(rs))
+ events |= EPOLLOUT;
+ if (rs->state & rs_error)
+ events |= EPOLLERR;
+ return events;
}
-static inline uint32_t
-rr_rs_poll_both (struct rsocket *rs)
+static inline uint32_t rr_rs_poll_both(struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM)
- return rr_rs_poll_tcp (rs);
+ if (rs->type == SOCK_STREAM)
+ return rr_rs_poll_tcp(rs);
- if (rs->type == SOCK_DGRAM)
- return rr_rs_poll_udp (rs);
+ if (rs->type == SOCK_DGRAM)
+ return rr_rs_poll_udp(rs);
- return 0;
+ return 0;
}
-uint32_t
-rr_rs_poll (int fd, uint32_t revents)
+uint32_t rr_rs_poll(int fd, uint32_t revents)
{
- struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
- if (!rs)
- return 0;
+ if (!rs)
+ return 0;
- if (rs->state == rs_listening)
- return revents;
+ if (rs->state == rs_listening)
+ return revents;
- return rr_rs_poll_both (rs);
+ return rr_rs_poll_both(rs);
}
+int rr_getEvt(int fd)
+{
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
+ if (rs->state == rs_listening)
+ {
+ return EPOLLIN;
+ }
+ if (!rs)
+ {
+ return 0;
+ }
+ return rr_rs_poll_both(rs);
+}
#endif /* #ifndef POLL__RSOCKET_RS_H_ */
-static inline void
-rr_rs_notify_tcp (struct rsocket *rs)
+static inline void rr_rs_notify_tcp(struct rsocket *rs)
{
- if (rs->rr_epoll_ref)
+ if (rs->rr_epoll_ref)
{
- uint32_t events = rr_rs_poll_tcp (rs);
- if (events)
- (void) rr_notify_event (rs->rr_epoll_pdata, events);
+ uint32_t events = rr_rs_poll_tcp(rs);
+ if (events)
+ (void) rr_notify_event(rs->rr_epoll_pdata, events);
}
}
-static inline void
-rr_rs_notify_udp (struct rsocket *rs)
+static inline void rr_rs_notify_udp(struct rsocket *rs)
{
- if (rs->rr_epoll_ref)
+ if (rs->rr_epoll_ref)
{
- uint32_t events = rr_rs_poll_udp (rs);
- if (events)
- (void) rr_notify_event (rs->rr_epoll_pdata, events);
+ uint32_t events = rr_rs_poll_udp(rs);
+ if (events)
+ (void) rr_notify_event(rs->rr_epoll_pdata, events);
}
}
#ifndef HANDLE__RSOCKET_RS_H_
#define HANDLE__RSOCKET_RS_H_
-inline static void
-rr_rs_handle_tcp (struct rsocket *rs)
+inline static void rr_rs_handle_tcp(struct rsocket *rs)
{
- int ret;
+ int ret;
- RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state);
+ RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state);
- if (!(rs->state & (rs_connected | rs_opening)))
- return;
+ if (!(rs->state & (rs_connected | rs_opening)))
+ return;
- fastlock_acquire (&rs->cq_wait_lock);
- ret = rs_get_cq_event (rs);
- RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
- fastlock_release (&rs->cq_wait_lock);
+ fastlock_acquire(&rs->cq_wait_lock);
+ ret = rs_get_cq_event(rs);
+ RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno);
+ fastlock_release(&rs->cq_wait_lock);
- fastlock_acquire (&rs->cq_lock);
+ fastlock_acquire(&rs->cq_lock);
- if (rs->state & rs_connected)
+ if (rs->state & rs_connected)
{
- rs_update_credits (rs);
- ret = rs_poll_cq (rs);
- RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
- rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n",
+ rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed);
}
- if (rs->rr_epoll_ref && rs->cq_armed < 1)
+ if (rs->rr_epoll_ref && rs->cq_armed < 1)
{
- ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0);
- RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
- if (0 == ret)
- __sync_fetch_and_add (&rs->cq_armed, 1);
+ ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ if (0 == ret)
+ __sync_fetch_and_add(&rs->cq_armed, 1);
}
- if (rs->state & rs_connected)
+ if (rs->state & rs_connected)
{
- ret = rs_poll_cq (rs);
- RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
- rs_update_credits (rs);
+ ret = rs_poll_cq(rs);
+ RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno);
+ rs_update_credits(rs);
}
- fastlock_release (&rs->cq_lock);
+ fastlock_release(&rs->cq_lock);
- RR_DBG ("(%d)=\n", rs->index);
+ RR_DBG("(%d)=\n", rs->index);
}
-inline static void
-rr_rs_handle_udp (struct rsocket *rs)
+inline static void rr_rs_handle_udp(struct rsocket *rs)
{
- fastlock_acquire (&rs->cq_wait_lock);
- ds_get_cq_event (rs);
- fastlock_release (&rs->cq_wait_lock);
+ fastlock_acquire(&rs->cq_wait_lock);
+ ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
- fastlock_acquire (&rs->cq_lock);
- ds_poll_cqs (rs);
- if (rs->rr_epoll_ref && !rs->cq_armed)
+ fastlock_acquire(&rs->cq_lock);
+ ds_poll_cqs(rs);
+ if (rs->rr_epoll_ref && !rs->cq_armed)
{
- ds_req_notify_cqs (rs);
- rs->cq_armed = 1;
+ ds_req_notify_cqs(rs);
+ rs->cq_armed = 1;
}
- fastlock_release (&rs->cq_lock);
+ fastlock_release(&rs->cq_lock);
}
-inline static void
-rr_rs_handle_rs (struct rsocket *rs)
+inline static void rr_rs_handle_rs(struct rsocket *rs)
{
- if (rs->state & rs_opening)
+ if (rs->state & rs_opening)
{
- int ret = rs_do_connect (rs);
- RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
- return;
+ int ret = rs_do_connect(rs);
+ RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno);
+ return;
}
- if (rs->type == SOCK_STREAM)
+ if (rs->type == SOCK_STREAM)
{
- rr_rs_handle_tcp (rs);
+ rr_rs_handle_tcp(rs);
}
- if (rs->type == SOCK_DGRAM)
+ if (rs->type == SOCK_DGRAM)
{
- rr_rs_handle_udp (rs);
+ rr_rs_handle_udp(rs);
}
}
-int
-rr_rs_handle (int fd, uint32_t events)
+int rr_rs_handle(int fd, uint32_t events)
{
- struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
- RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
+ RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs);
- if (!rs)
- return _err (EBADF);
+ if (!rs)
+ return _err(EBADF);
- if (rs->state == rs_listening)
+ if (rs->state == rs_listening)
{
- if (events & EPOLLIN)
+ if (events & EPOLLIN)
{
- (void) rr_notify_event (rs->rr_epoll_pdata, events);
+ (void) rr_notify_event(rs->rr_epoll_pdata, events);
}
- return 0;
+ return 0;
}
- rr_rs_handle_rs (rs);
+ rr_rs_handle_rs(rs);
- return 0;
+ return 0;
}
#endif /* #ifndef HANDLE__RSOCKET_RS_H_ */
@@ -239,155 +240,150 @@ rr_rs_handle (int fd, uint32_t events)
#ifndef ADPT__RSOCKET_RS_H_
#define ADPT__RSOCKET_RS_H_
-inline static int
-rr_rs_evfd (struct rsocket *rs)
+inline static int rr_rs_evfd(struct rsocket *rs)
{
- if (rs->type == SOCK_STREAM)
+ if (rs->type == SOCK_STREAM)
{
- if (rs->state >= rs_connected)
- return rs->cm_id->recv_cq_channel->fd;
- else
- return rs->cm_id->channel->fd;
+ if (rs->state >= rs_connected)
+ return rs->cm_id->recv_cq_channel->fd;
+ else
+ return rs->cm_id->channel->fd;
}
- else
+ else
{
- return rs->epfd;
+ return rs->epfd;
}
- return -1;
+ return -1;
}
-int
-rr_rs_ep_add (int fd, void *pdata, uint32_t * revent)
+int rr_rs_ep_add(int fd, void *pdata, uint32_t * revent)
{
- int ref;
- struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- RR_DBG ("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err (EBADF);
-
- ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1);
- if (1 == ref)
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
+
+ ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1);
+ if (1 == ref)
{
- rs->rr_epoll_fd = rr_rs_evfd (rs);
- (void) rr_ep_add (rs->rr_epoll_fd, rs->index);
+ rs->rr_epoll_fd = rr_rs_evfd(rs);
+ (void) rr_ep_add(rs->rr_epoll_fd, rs->index);
}
- (void) rr_rs_handle_rs (rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
+ (void) rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG ("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
}
-int
-rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent)
+int rr_rs_ep_mod(int fd, void *pdata, uint32_t * revent)
{
- struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- RR_DBG ("(%d(%p),)\n", fd, rs);
- if (!rs)
- return _err (EBADF);
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p),)\n", fd, rs);
+ if (!rs)
+ return _err(EBADF);
- if (rs->rr_epoll_ref <= 0)
- return _err (ENOENT);
+ if (rs->rr_epoll_ref <= 0)
+ return _err(ENOENT);
- (void) rr_rs_handle_rs (rs);
- *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs);
+ (void) rr_rs_handle_rs(rs);
+ *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs);
- rs->rr_epoll_pdata = pdata;
+ rs->rr_epoll_pdata = pdata;
- RR_DBG ("*revent=0x%x\n", *revent);
- return 0;
+ RR_DBG("*revent=0x%x\n", *revent);
+ return 0;
}
-int
-rr_rs_ep_del (int fd)
+int rr_rs_ep_del(int fd)
{
- int ref;
- struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd);
- RR_DBG ("(%d(%p))\n", fd, rs);
+ int ref;
+ struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd);
+ RR_DBG("(%d(%p))\n", fd, rs);
- if (!rs)
- return _err (EBADF);
+ if (!rs)
+ return _err(EBADF);
- ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1);
- if (0 == ref)
+ ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1);
+ if (0 == ref)
{
- (void) rr_ep_del (rs->rr_epoll_fd);
- rs->rr_epoll_fd = -1;
+ (void) rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = -1;
}
- return 0;
+ return 0;
}
#endif /* #ifndef ADPT__RSOCKET_RS_H_ */
-inline static void
-rr_rs_connected (struct rsocket *rs)
+inline static void rr_rs_connected(struct rsocket *rs)
{
- RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
- rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state);
+ RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index,
+ rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state);
- if (!(rs->state & rs_connected))
+ if (!(rs->state & rs_connected))
{
- rr_rs_notify_tcp (rs);
- return;
+ rr_rs_notify_tcp(rs);
+ return;
}
- if (rs->rr_epoll_ref)
+ if (rs->rr_epoll_ref)
{
- int evfd = rr_rs_evfd (rs);
+ int evfd = rr_rs_evfd(rs);
- if (evfd != rs->rr_epoll_fd)
+ if (evfd != rs->rr_epoll_fd)
{
- (void) rr_ep_del (rs->rr_epoll_fd);
- rs->rr_epoll_fd = evfd;
- (void) rr_ep_add (evfd, rs->index);
+ (void) rr_ep_del(rs->rr_epoll_fd);
+ rs->rr_epoll_fd = evfd;
+ (void) rr_ep_add(evfd, rs->index);
}
- rr_rs_handle_tcp (rs);
+ rr_rs_handle_tcp(rs);
}
}
-int
-raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags)
+int raccept4(int socket, struct sockaddr *addr, socklen_t * addrlen,
+ int flags)
{
- int ret, fd;
- struct rsocket *rs;
+ int ret, fd;
+ struct rsocket *rs;
- RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
- fd = raccept (socket, addr, addrlen);
- RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
- if (fd < 0)
- return fd;
+ RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags);
+ fd = raccept(socket, addr, addrlen);
+ RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno);
+ if (fd < 0)
+ return fd;
- rs = (struct rsocket *) idm_lookup (&idm, fd);
- if (!rs)
+ rs = (struct rsocket *) idm_lookup(&idm, fd);
+ if (!rs)
{
- RR_ERR ("panic\n");
- return -1;
+ RR_ERR("panic\n");
+ return -1;
}
- if (flags & SOCK_NONBLOCK)
+ if (flags & SOCK_NONBLOCK)
{
- if (0 == (rs->fd_flags & O_NONBLOCK))
+ if (0 == (rs->fd_flags & O_NONBLOCK))
{
- RR_DBG ("orig flag:%x\n",
- GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL));
- ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
- if (0 == ret)
- rs->fd_flags |= O_NONBLOCK;
+ RR_DBG("orig flag:%x\n",
+ GSAPI(fcntl) (rs->cm_id->channel->fd, F_GETFL));
+ ret = GSAPI(fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+ if (0 == ret)
+ rs->fd_flags |= O_NONBLOCK;
}
}
- if (flags & SOCK_CLOEXEC)
+ if (flags & SOCK_CLOEXEC)
{
- RR_LOG ("ignore flag:SOCK_CLOEXEC\n");
+ RR_LOG("ignore flag:SOCK_CLOEXEC\n");
}
- return fd;
+ return fd;
}
#endif /* #ifndef _RSOCKET_RS_C_ */