diff options
author | charan makkina <charan795m@gmail.com> | 2019-04-30 17:40:53 +0530 |
---|---|---|
committer | charan makkina <charan795m@gmail.com> | 2019-05-20 18:14:40 +0530 |
commit | a826fe833d3f2a8fe2673fa05811fe1a22baf045 (patch) | |
tree | da11a17c46ca9b8a002a52a290628574fa3f5eda /stacks/rsocket | |
parent | 3e6bf7b64eea418c59959c18750261b815b2892c (diff) |
Feature: 19.04 part 1
Change-Id: Ibba924b8deca1f246b9dcb12d89d085b6fd33046
Signed-off-by: charan makkina <charan795m@gmail.com>
Diffstat (limited to 'stacks/rsocket')
-rw-r--r-- | stacks/rsocket/CMakeLists.txt | 42 | ||||
-rw-r--r-- | stacks/rsocket/build/.gitkeep | 0 | ||||
-rw-r--r-- | stacks/rsocket/configure/module_config.json | 10 | ||||
-rw-r--r-- | stacks/rsocket/configure/rd_config.json | 31 | ||||
-rw-r--r-- | stacks/rsocket/doc/README.md | 8 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.c | 258 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_adpt.h | 23 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rdma.h | 40 | ||||
-rw-r--r-- | stacks/rsocket/src/rsocket_rs.c | 422 |
9 files changed, 442 insertions, 392 deletions
diff --git a/stacks/rsocket/CMakeLists.txt b/stacks/rsocket/CMakeLists.txt index 6ba868b..6516961 100644 --- a/stacks/rsocket/CMakeLists.txt +++ b/stacks/rsocket/CMakeLists.txt @@ -13,40 +13,62 @@ # See the License for the specific language governing permissions and # limitations under the License. ######################################################################### +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.11) +#PROJECT(nStack) +SET(CMAKE_SKIP_RPATH TRUE) +SET(CMAKE_C_COMPILER "gcc") +SET(OS_RELEASE "" CACHE STRING "User-specified OS release.") +SET(EXECUTABLE_PATH ${CMAKE_CURRENT_LIST_DIR}/../../release/bin) +SET(LIB_PATH_STATIC ${CMAKE_SOURCE_DIR}/build) +SET(LIB_PATH_SHARED ${CMAKE_CURRENT_LIST_DIR}/../../release/lib64) +SET(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${LIB_PATH_STATIC}) +SET(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${EXECUTABLE_PATH}) +SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${LIB_PATH_SHARED}) + +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g -fPIE -pie -fPIC -m64 -mssse3 -std=gnu89") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wshadow -Wfloat-equal -Wformat=2") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fstack-protector -fstack-protector-all") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wl,-z,relro,-z,now -Wl,--disable-new-dtags") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wl,-z,noexecstack -mcmodel=medium") SET(rdmacm_dir librdmacm-1.1.0) -SET(dmm_inc_dir ${DMM_REL_INC_DIR}) +SET(dmm_inc_dir ${CMAKE_CURRENT_LIST_DIR}/../../release/include/) +SET(DMM_REL_INC_DIR ${dmm_inc_dir}) +SET(dmm_src_inc_dir ${CMAKE_SOURCE_DIR}/src/include/) SET(RSOCKET_DEBUG 1) ######################## - SET(rdmacm_url https://github.com/ofiwg/librdmacm/archive/v1.1.0.tar.gz) - +if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz") + SET(RDMA_DOWNLOAD_CMD tar -xvf ${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz) +else() + SET(RDMA_DOWNLOAD_CMD wget --no-check-certificate ${rdmacm_url} && tar -xvf ${CMAKE_CURRENT_LIST_DIR}/v1.1.0.tar.gz) +endif() INCLUDE(ExternalProject) ExternalProject_Add( rdmacm - URL ${rdmacm_url} SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/${rdmacm_dir} DOWNLOAD_DIR ${CMAKE_CURRENT_LIST_DIR} + DOWNLOAD_COMMAND ${RDMA_DOWNLOAD_CMD} PATCH_COMMAND patch -p1 -i ../rsocket.patch CONFIGURE_COMMAND ./autogen.sh && ./configure dmm_inc_dir=${DMM_REL_INC_DIR} RSOCKET_DEBUG=${RSOCKET_DEBUG} BUILD_IN_SOURCE 1 BUILD_COMMAND make INSTALL_COMMAND cp -f libdmm_rdmacm.a ${LIB_PATH_STATIC}/ - DEPENDS DPDK ) -set_target_properties(rdmacm PROPERTIES EXCLUDE_FROM_ALL TRUE) ######################## -SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2 -g -fPIC -m64 -pthread") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g -fPIC -m64 -pthread") ADD_LIBRARY(dmm_rsocket SHARED src/rsocket_adpt.c) ADD_DEFINITIONS(-D_GNU_SOURCE -DRSOCKET_DEBUG=${RSOCKET_DEBUG}) -INCLUDE_DIRECTORIES(${DMM_REL_INC_DIR}) +INCLUDE_DIRECTORIES( + ${DMM_REL_INC_DIR} ${dmm_src_inc_dir} +) INCLUDE_DIRECTORIES(./src ${rdmacm_dir} ${rdmacm_dir}/include ${rdmacm_dir}/src) TARGET_LINK_LIBRARIES(dmm_rsocket @@ -56,6 +78,4 @@ TARGET_LINK_LIBRARIES(dmm_rsocket ibverbs pthread dl rt ) -ADD_DEPENDENCIES(dmm_rsocket rdmacm DPDK) - -set_target_properties(dmm_rsocket PROPERTIES EXCLUDE_FROM_ALL TRUE) +ADD_DEPENDENCIES(dmm_rsocket rdmacm) diff --git a/stacks/rsocket/build/.gitkeep b/stacks/rsocket/build/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/stacks/rsocket/build/.gitkeep diff --git a/stacks/rsocket/configure/module_config.json b/stacks/rsocket/configure/module_config.json index 736de24..2df82cd 100644 --- a/stacks/rsocket/configure/module_config.json +++ b/stacks/rsocket/configure/module_config.json @@ -3,17 +3,27 @@ "module_list": [ { "stack_name": "kernel", /*stack name*/ + "function_name": "kernel_stack_register", /*function name*/ "libname": "./", /*library name, if loadtype is static, this maybe null, else must give a library name*/ + "loadtype": "static", /*library load type: static or dynamic*/ "deploytype": "1", /*deploy model type:model type1, model type2, model type3. Indicating single or multi process deployment. Used during shared memory initialization.*/ + "maxfd": "1024", /*the max fd supported*/ + "minfd": "0", /*the min fd supported*/ + "priorty": "1", /*priorty when executing, reserv*/ "stackid": "0", /*stack id, this must be ordered and not be repeated*/ }, { "stack_name": "rsocket", + "function_name": "rsocket_stack_register", "libname": "libdmm_rsocket.so", + "loadtype": "dynmic", "deploytype": "1", + "maxfd": "1024", + "minfd": "0", + "priorty": "1", "stackid": "1", }, ] diff --git a/stacks/rsocket/configure/rd_config.json b/stacks/rsocket/configure/rd_config.json index 5c6f861..d89afcf 100644 --- a/stacks/rsocket/configure/rd_config.json +++ b/stacks/rsocket/configure/rd_config.json @@ -1,10 +1,25 @@ { - "ip_route": [ - { - "subnet": "192.168.1.1/24", - "stack_name": "rsocket", - }, - ], - "prot_route": [ - ], + "modules": [ + { + "name": "kernel", + "ip_route": [ + "127.0.0.1/24" + ], + "protocol_route": [ + ], + "type_route": [ + ] + }, + { + "name": "rsocket", + "ip_route": [ + "192.168.21.1/24" + + ], + "protocol_route": [ + ], + "type_route": [ + ] + } + ] } diff --git a/stacks/rsocket/doc/README.md b/stacks/rsocket/doc/README.md index 6cdacfb..237db68 100644 --- a/stacks/rsocket/doc/README.md +++ b/stacks/rsocket/doc/README.md @@ -34,10 +34,10 @@ dmm/release/lib64/libdmm_rsocket.so ```sh #export LD_LIBRARY_PATH=${dmm}/release/lib64 #export LD_PRELOAD=${dmm}/release/lib64/libnStackAPI.so - #export NSTACK_MOD_CFG_FILE=${dmm}/stacks/rsocket/configure/module_config.json - #export NSTACK_MOD_CFG_RD=${dmm}/stacks/rsocket/configure/rd_config.json + #export NSTACK_MOD_CFG_FILE=${dmm}/stacks/rsocket/config/module_config.json + #export NSTACK_MOD_CFG_RD=${dmm}/stacks/rsocket/config/rd_config.json ``` -- Steps 2: Modify rd_config.json(located at dmm/stacks/rsocket/configure/) +- Steps 2: Modify rd_config.json(located at dmm/stacks/rsocket/config/) ```sh #vim rd_config.json //set "subnet": "192.168.21.1/24" @@ -107,4 +107,4 @@ all using GSAPI macro control. https://wiki.fd.io/view/DMM https://github.com/ofiwg/librdmacm/blob/master/docs/rsocket https://github.com/rsocket/rsocket -http://www.mellanox.com/page/products_dyn?product_family=26 +http://www.mellanox.com/page/products_dyn?product_family=26
\ No newline at end of file diff --git a/stacks/rsocket/src/rsocket_adpt.c b/stacks/rsocket/src/rsocket_adpt.c index 2839909..9496e96 100644 --- a/stacks/rsocket/src/rsocket_adpt.c +++ b/stacks/rsocket/src/rsocket_adpt.c @@ -19,10 +19,11 @@ #include <errno.h> #include <dlfcn.h> -#include "nstack_dmm_api.h" - +#include "nstack_callback_ops.h" #include "rsocket_adpt.h" #include "rdma/rsocket.h" +#include "nstack_epoll_api.h" +#include "nstack_rd_api.h" #define RR_EVFD(u64) ((int)((u64) >> 32)) #define RR_RSFD(u64) ((int)((u64) & 0xFFFFFFFF)) @@ -36,77 +37,69 @@ rr_sapi_t g_sapi = { 0 }; int g_rr_log_level = -1; -int -rr_notify_event (void *pdata, int events) -{ - int ret; - - ret = g_rr_var.event_cb (pdata, events); - - RR_DBG ("event_cb(%p, 0x%x)=%d,%d\n", pdata, events, ret, errno); +void *rrd_table = NULL; - return ret; +void rr_notify_event(void *pdata, int events) +{ + g_rr_var.event_cb(pdata, events, EVENT_INFORM_APP); } -int -rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd) +int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd) { - int ret; - struct epoll_event event; - event.events = events; - event.data.u64 = RR_DATA (evfd, rsfd); - ret = GSAPI (epoll_ctl) (g_rr_var.epfd, op, evfd, &event); - return ret; + int ret; + struct epoll_event event; + event.events = events; + event.data.u64 = RR_DATA(evfd, rsfd); + ret = GSAPI(epoll_ctl) (g_rr_var.epfd, op, evfd, &event); + return ret; } -static void * -rr_epoll_thread (void *arg) +static void *rr_epoll_thread(void *arg) { - int i, ret, e; - struct epoll_event events[RR_EV_NUM]; + int i, ret, e; + struct epoll_event events[RR_EV_NUM]; - while (1) + while (1) { - ret = GSAPI (epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100); - e = errno; + ret = GSAPI(epoll_wait) (g_rr_var.epfd, events, RR_EV_NUM, 100); + e = errno; - for (i = 0; i < ret; ++i) + for (i = 0; i < ret; ++i) { - if (rr_rs_handle (RR_RSFD (events[i].data.u64), events[i].events)) + if (rr_rs_handle(RR_RSFD(events[i].data.u64), events[i].events)) { - (void) rr_ep_del (RR_EVFD (events[i].data.u64)); + (void) rr_ep_del(RR_EVFD(events[i].data.u64)); } } - if (ret < 0) + if (ret < 0) { - RR_STAT_INC (RR_STAT_EPW_ERR); - if (e == EINTR) + RR_STAT_INC(RR_STAT_EPW_ERR); + if (e == EINTR) { - RR_STAT_INC (RR_STAT_EPW_EINTR); + RR_STAT_INC(RR_STAT_EPW_EINTR); } - else if (e == ETIMEDOUT) + else if (e == ETIMEDOUT) { - RR_STAT_INC (RR_STAT_EPW_ETIMEOUT); + RR_STAT_INC(RR_STAT_EPW_ETIMEOUT); } - else + else { - RR_ERR ("epoll_wait()=%d:%d\n", ret, errno); + RR_ERR("epoll_wait()=%d:%d\n", ret, errno); } } } - return NULL; + return NULL; } -static int -rr_init_sapi () +static int rr_init_sapi() { - void *handle = dlopen ("libc.so.6", RTLD_NOW | RTLD_GLOBAL); - if (!handle) + void *handle = dlopen("libc.so.6", RTLD_NOW | RTLD_GLOBAL); + if (!handle) { - RR_ERR ("dlopen(libc.so.6):NULL\n"); - return -1; + RR_ERR("dlopen(libc.so.6):NULL\n"); + return -1; } #define RR_SAPI(name) \ @@ -116,129 +109,148 @@ rr_init_sapi () #include "rsocket_sapi.h" #undef RR_SAPI - return 0; + return 0; } -static void -rr_init_log () +static void rr_init_log() { - int level; - char *log; + int level; + char *log; - if (g_rr_log_level >= 0) - return; + if (g_rr_log_level >= 0) + return; - log = getenv ("RSOCKET_LOG"); - if (!log || !log[0]) + log = getenv("RSOCKET_LOG"); + if (!log || !log[0]) { - g_rr_log_level = RR_LOG_OFF; - return; + g_rr_log_level = RR_LOG_OFF; + return; } - level = atoi (log); - if (level < 0 || level > 99999) + level = atoi(log); + if (level < 0 || level > 99999) { - g_rr_log_level = RR_LOG_OFF; - return; + g_rr_log_level = RR_LOG_OFF; + return; } - g_rr_log_level = level; + g_rr_log_level = level; } -static int -rsocket_init () +void *rsocket_get_ip_shmem() { - int ret; + return rrd_table; +} - rr_init_log (); +static int rsocket_init() +{ + int ret; + rrd_table = nstack_local_rd_malloc(); + if (!rrd_table) + { + RR_ERR("rsocket rd table create failed!"); + return -1; + } - if (rr_init_sapi ()) + if (nstack_rd_parse("rsocket", rrd_table)) { - return -1; + RR_WRN("no rd data got!"); + RR_WRN("rsocket parse rd data failed"); + nstack_rd_table_clear(rrd_table); + return -1; } - g_rr_var.epfd = GSAPI (epoll_create) (1); - if (g_rr_var.epfd < 0) - return g_rr_var.epfd; + rr_init_log(); - ret = - pthread_create (&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL); - if (ret) + if (rr_init_sapi()) { - GSAPI (close) (g_rr_var.epfd); - g_rr_var.epfd = -1; - return ret; + return -1; } - (void) pthread_setname_np (g_rr_var.epoll_threadid, "rsocket_epoll"); - return 0; -} + g_rr_var.epfd = GSAPI(epoll_create) (1); -int -rsocket_exit () -{ - if (g_rr_var.epfd >= 0) + if (g_rr_var.epfd < 0) { - (void) GSAPI (close) (g_rr_var.epfd); - g_rr_var.epfd = -1; + return g_rr_var.epfd; } - return 0; + ret = + pthread_create(&g_rr_var.epoll_threadid, NULL, rr_epoll_thread, NULL); + if (ret) + { + GSAPI(close) (g_rr_var.epfd); + g_rr_var.epfd = -1; + return ret; + } + (void) pthread_setname_np(g_rr_var.epoll_threadid, "rsocket_epoll"); + + return 0; } -unsigned int -rsocket_ep_ctl (int epFD, int proFD, int ctl_ops, struct epoll_event *event, - void *pdata) +int rsocket_exit() { - int ret; - struct eventpoll *ep; - unsigned int revents = 0; + if (g_rr_var.epfd >= 0) + { + (void) GSAPI(close) (g_rr_var.epfd); + g_rr_var.epfd = -1; + } - RR_DBG ("(%d, %d, %d, 0x%x, %p)\n", epFD, proFD, ctl_ops, event->events, - pdata); + return 0; +} - switch (ctl_ops) +void *rsocket_ep_ctl(int proFD, int ctl_ops, void *pdata, void *event) +{ + int ret; + unsigned int revents = 0; + switch (ctl_ops) { - case nstack_ep_triggle_add: - ret = rr_rs_ep_add (proFD, pdata, &revents); - if (ret) - return -1; - return revents; - - case nstack_ep_triggle_mod: - ret = rr_rs_ep_mod (proFD, pdata, &revents); - if (ret) - return -1; - return revents; - case nstack_ep_triggle_del: - return rr_rs_ep_del (proFD); + case nstack_ep_triggle_add: + ret = rr_rs_ep_add(proFD, pdata, &revents); + if (ret) + return NULL; + *(int *) event = revents; + return pdata; + case nstack_ep_triggle_mod: + ret = rr_rs_ep_mod(proFD, pdata, &revents); + if (ret) + return NULL; + *(int *) event = revents; + return pdata; + case nstack_ep_triggle_del: + rr_rs_ep_del(proFD); } - return _err (EPERM); + return pdata; + } -int -rsocket_stack_register (nstack_proc_cb * proc_fun, - nstack_event_cb * event_ops) +int rsocket_getEvt(int fd) { - rr_init_log (); + return rr_getEvt(fd); +} + +int rsocket_stack_register(nstack_socket_ops * ops, + nstack_event_ops * event_ops, + nstack_proc_ops * proc_fun) +{ + rr_init_log(); #define NSTACK_MK_DECL(ret, fn, args) \ do { \ - proc_fun->socket_ops.pf##fn = dlsym(event_ops->handle, "r"#fn); \ - if (!proc_fun->socket_ops.pf##fn) \ + ops->pf##fn = dlsym(event_ops->handle, "r"#fn); \ + if (!ops->pf##fn) \ RR_LOG("socket API '" #fn "' not found\n"); \ } while (0) -#include "declare_syscalls.h" +#include "declare_syscalls.h.tmpl" #undef NSTACK_MK_DECL - proc_fun->extern_ops.module_init = rsocket_init; - proc_fun->extern_ops.ep_ctl = rsocket_ep_ctl; - proc_fun->extern_ops.ep_getevt = NULL; - proc_fun->extern_ops.module_init_child = rsocket_init; - - g_rr_var.type = event_ops->type; - g_rr_var.event_cb = event_ops->event_cb; + proc_fun->module_init = rsocket_init; + proc_fun->ep_triggle = rsocket_ep_ctl; + proc_fun->ep_getEvt = rsocket_getEvt; + proc_fun->get_ip_shmem = rsocket_get_ip_shmem; + proc_fun->fork_init_child = rsocket_init; + g_rr_var.type = event_ops->type; + g_rr_var.event_cb = event_ops->event_cb; - return 0; + return 0; } diff --git a/stacks/rsocket/src/rsocket_adpt.h b/stacks/rsocket/src/rsocket_adpt.h index 9c53330..37a2e88 100644 --- a/stacks/rsocket/src/rsocket_adpt.h +++ b/stacks/rsocket/src/rsocket_adpt.h @@ -22,11 +22,11 @@ enum { - RR_STAT_EPW_ERR, - RR_STAT_EPW_EINTR, - RR_STAT_EPW_ETIMEOUT, + RR_STAT_EPW_ERR, + RR_STAT_EPW_EINTR, + RR_STAT_EPW_ETIMEOUT, - RR_STAT_NUM + RR_STAT_NUM }; #define RR_STAT_ADD(id, num) __sync_add_and_fetch(&g_rr_var.stat[(id)], num) @@ -38,17 +38,16 @@ enum typedef struct rsocket_var { - pthread_t epoll_threadid; + pthread_t epoll_threadid; - int epfd; - int type; - int (*event_cb) (void *pdata, int events); + int epfd; + int type; + void (*event_cb) (void *pdata, int events, int postFlag); - uint64_t stat[RR_STAT_NUM]; + uint64_t stat[RR_STAT_NUM]; } rsocket_var_t; extern rsocket_var_t g_rr_var; - -int rr_rs_handle (int fd, uint32_t events); - +int rsocket_getEvt(int fd); +int rr_rs_handle(int fd, uint32_t events); #endif /* #ifndef _RSOCKET_ADPT_H_ */ diff --git a/stacks/rsocket/src/rsocket_rdma.h b/stacks/rsocket/src/rsocket_rdma.h index 75f4268..af66285 100644 --- a/stacks/rsocket/src/rsocket_rdma.h +++ b/stacks/rsocket/src/rsocket_rdma.h @@ -31,11 +31,11 @@ enum { - RR_LOG_OFF = 0x00, - RR_LOG_ERR = 0x01, - RR_LOG_WRN = 0x02, - RR_LOG_LOG = 0x03, - RR_LOG_DBG = 0x04, + RR_LOG_OFF = 0x00, + RR_LOG_ERR = 0x01, + RR_LOG_WRN = 0x02, + RR_LOG_LOG = 0x03, + RR_LOG_DBG = 0x04, }; #define RR_OUT(level, name, fmt, arg...) do { \ @@ -57,13 +57,13 @@ enum #define _err(err_no) ((errno = (err_no)), -1) -int rr_rs_ep_add (int fd, void *pdata, uint32_t * revent); -int rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent); -int rr_rs_ep_del (int fd); +int rr_rs_ep_add(int fd, void *pdata, uint32_t * revent); +int rr_rs_ep_mod(int fd, void *pdata, uint32_t * revent); +int rr_rs_ep_del(int fd); -uint32_t rr_rs_poll (int fd, uint32_t revents); - -int rr_notify_event (void *pdata, int events); +uint32_t rr_rs_poll(int fd, uint32_t revents); +int rr_getEvt(int fd); +void rr_notify_event(void *pdata, int events); typedef struct rr_socket_api { @@ -76,21 +76,19 @@ extern rr_sapi_t g_sapi; #define GSAPI(name) g_sapi.n_##name -int rr_epoll_ctl (int op, int evfd, uint32_t events, int rsfd); +int rr_epoll_ctl(int op, int evfd, uint32_t events, int rsfd); -inline static int -rr_ep_add (int evfd, int rsfd) +inline static int rr_ep_add(int evfd, int rsfd) { - return rr_epoll_ctl (EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, - rsfd); + return rr_epoll_ctl(EPOLL_CTL_ADD, evfd, EPOLLET | EPOLLIN | EPOLLOUT, + rsfd); } -inline static int -rr_ep_del (int evfd) +inline static int rr_ep_del(int evfd) { - if (evfd < 0) - return 0; - return rr_epoll_ctl (EPOLL_CTL_DEL, evfd, 0, 0); + if (evfd < 0) + return 0; + return rr_epoll_ctl(EPOLL_CTL_DEL, evfd, 0, 0); } #endif /* #ifndef _RSOCKET_RDMA_H_ */ diff --git a/stacks/rsocket/src/rsocket_rs.c b/stacks/rsocket/src/rsocket_rs.c index 0f4e73f..79c4914 100644 --- a/stacks/rsocket/src/rsocket_rs.c +++ b/stacks/rsocket/src/rsocket_rs.c @@ -17,221 +17,222 @@ #ifndef _RSOCKET_RS_C_ #define _RSOCKET_RS_C_ -inline static void -rr_rs_init (struct rsocket *rs) +inline static void rr_rs_init(struct rsocket *rs) { - RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } -inline static void -rr_rs_dest (struct rsocket *rs) +inline static void rr_rs_dest(struct rsocket *rs) { - RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); + RR_DBG("(rs:%p{index:%d})\n", rs, rs->index); - if (rs->rr_epoll_ref) + if (rs->rr_epoll_ref) { - (void) rr_ep_del (rs->rr_epoll_fd); - rs->rr_epoll_ref = 0; - rs->rr_epoll_fd = -1; - rs->rr_epoll_pdata = NULL; + (void) rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_ref = 0; + rs->rr_epoll_fd = -1; + rs->rr_epoll_pdata = NULL; } } #ifndef POLL__RSOCKET_RS_H_ #define POLL__RSOCKET_RS_H_ -static inline uint32_t -rr_rs_poll_tcp (struct rsocket *rs) +static inline uint32_t rr_rs_poll_tcp(struct rsocket *rs) { - uint32_t events = 0; - if (rs->state & rs_connected) + uint32_t events = 0; + if (rs->state & rs_connected) { - if (rs_have_rdata (rs)) - events |= EPOLLIN; - if (rs_can_send (rs)) - events |= EPOLLOUT; + if (rs_have_rdata(rs)) + events |= EPOLLIN; + if (rs_can_send(rs)) + events |= EPOLLOUT; } - if (rs->state & (rs_error | rs_connect_error)) - events |= EPOLLERR; - if (rs->state & rs_disconnected) - events |= EPOLLHUP; - return events; + if (rs->state & (rs_error | rs_connect_error)) + events |= EPOLLERR; + if (rs->state & rs_disconnected) + events |= EPOLLHUP; + return events; } -static inline uint32_t -rr_rs_poll_udp (struct rsocket *rs) +static inline uint32_t rr_rs_poll_udp(struct rsocket *rs) { - uint32_t events = 0; - if (rs_have_rdata (rs)) - events |= EPOLLIN; - if (ds_can_send (rs)) - events |= EPOLLOUT; - if (rs->state & rs_error) - events |= EPOLLERR; - return events; + uint32_t events = 0; + if (rs_have_rdata(rs)) + events |= EPOLLIN; + if (ds_can_send(rs)) + events |= EPOLLOUT; + if (rs->state & rs_error) + events |= EPOLLERR; + return events; } -static inline uint32_t -rr_rs_poll_both (struct rsocket *rs) +static inline uint32_t rr_rs_poll_both(struct rsocket *rs) { - if (rs->type == SOCK_STREAM) - return rr_rs_poll_tcp (rs); + if (rs->type == SOCK_STREAM) + return rr_rs_poll_tcp(rs); - if (rs->type == SOCK_DGRAM) - return rr_rs_poll_udp (rs); + if (rs->type == SOCK_DGRAM) + return rr_rs_poll_udp(rs); - return 0; + return 0; } -uint32_t -rr_rs_poll (int fd, uint32_t revents) +uint32_t rr_rs_poll(int fd, uint32_t revents) { - struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); - if (!rs) - return 0; + if (!rs) + return 0; - if (rs->state == rs_listening) - return revents; + if (rs->state == rs_listening) + return revents; - return rr_rs_poll_both (rs); + return rr_rs_poll_both(rs); } +int rr_getEvt(int fd) +{ + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); + if (rs->state == rs_listening) + { + return EPOLLIN; + } + if (!rs) + { + return 0; + } + return rr_rs_poll_both(rs); +} #endif /* #ifndef POLL__RSOCKET_RS_H_ */ -static inline void -rr_rs_notify_tcp (struct rsocket *rs) +static inline void rr_rs_notify_tcp(struct rsocket *rs) { - if (rs->rr_epoll_ref) + if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_tcp (rs); - if (events) - (void) rr_notify_event (rs->rr_epoll_pdata, events); + uint32_t events = rr_rs_poll_tcp(rs); + if (events) + (void) rr_notify_event(rs->rr_epoll_pdata, events); } } -static inline void -rr_rs_notify_udp (struct rsocket *rs) +static inline void rr_rs_notify_udp(struct rsocket *rs) { - if (rs->rr_epoll_ref) + if (rs->rr_epoll_ref) { - uint32_t events = rr_rs_poll_udp (rs); - if (events) - (void) rr_notify_event (rs->rr_epoll_pdata, events); + uint32_t events = rr_rs_poll_udp(rs); + if (events) + (void) rr_notify_event(rs->rr_epoll_pdata, events); } } #ifndef HANDLE__RSOCKET_RS_H_ #define HANDLE__RSOCKET_RS_H_ -inline static void -rr_rs_handle_tcp (struct rsocket *rs) +inline static void rr_rs_handle_tcp(struct rsocket *rs) { - int ret; + int ret; - RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state); + RR_DBG("(%d)@ state:0x%x\n", rs->index, rs->state); - if (!(rs->state & (rs_connected | rs_opening))) - return; + if (!(rs->state & (rs_connected | rs_opening))) + return; - fastlock_acquire (&rs->cq_wait_lock); - ret = rs_get_cq_event (rs); - RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); - fastlock_release (&rs->cq_wait_lock); + fastlock_acquire(&rs->cq_wait_lock); + ret = rs_get_cq_event(rs); + RR_DBG("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); + fastlock_release(&rs->cq_wait_lock); - fastlock_acquire (&rs->cq_lock); + fastlock_acquire(&rs->cq_lock); - if (rs->state & rs_connected) + if (rs->state & rs_connected) { - rs_update_credits (rs); - ret = rs_poll_cq (rs); - RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", - rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); + rs_update_credits(rs); + ret = rs_poll_cq(rs); + RR_DBG("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", + rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); } - if (rs->rr_epoll_ref && rs->cq_armed < 1) + if (rs->rr_epoll_ref && rs->cq_armed < 1) { - ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0); - RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); - if (0 == ret) - __sync_fetch_and_add (&rs->cq_armed, 1); + ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + RR_DBG("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); + if (0 == ret) + __sync_fetch_and_add(&rs->cq_armed, 1); } - if (rs->state & rs_connected) + if (rs->state & rs_connected) { - ret = rs_poll_cq (rs); - RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); - rs_update_credits (rs); + ret = rs_poll_cq(rs); + RR_DBG("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); + rs_update_credits(rs); } - fastlock_release (&rs->cq_lock); + fastlock_release(&rs->cq_lock); - RR_DBG ("(%d)=\n", rs->index); + RR_DBG("(%d)=\n", rs->index); } -inline static void -rr_rs_handle_udp (struct rsocket *rs) +inline static void rr_rs_handle_udp(struct rsocket *rs) { - fastlock_acquire (&rs->cq_wait_lock); - ds_get_cq_event (rs); - fastlock_release (&rs->cq_wait_lock); + fastlock_acquire(&rs->cq_wait_lock); + ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); - fastlock_acquire (&rs->cq_lock); - ds_poll_cqs (rs); - if (rs->rr_epoll_ref && !rs->cq_armed) + fastlock_acquire(&rs->cq_lock); + ds_poll_cqs(rs); + if (rs->rr_epoll_ref && !rs->cq_armed) { - ds_req_notify_cqs (rs); - rs->cq_armed = 1; + ds_req_notify_cqs(rs); + rs->cq_armed = 1; } - fastlock_release (&rs->cq_lock); + fastlock_release(&rs->cq_lock); } -inline static void -rr_rs_handle_rs (struct rsocket *rs) +inline static void rr_rs_handle_rs(struct rsocket *rs) { - if (rs->state & rs_opening) + if (rs->state & rs_opening) { - int ret = rs_do_connect (rs); - RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); - return; + int ret = rs_do_connect(rs); + RR_DBG("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); + return; } - if (rs->type == SOCK_STREAM) + if (rs->type == SOCK_STREAM) { - rr_rs_handle_tcp (rs); + rr_rs_handle_tcp(rs); } - if (rs->type == SOCK_DGRAM) + if (rs->type == SOCK_DGRAM) { - rr_rs_handle_udp (rs); + rr_rs_handle_udp(rs); } } -int -rr_rs_handle (int fd, uint32_t events) +int rr_rs_handle(int fd, uint32_t events) { - struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); - RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); + RR_DBG("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); - if (!rs) - return _err (EBADF); + if (!rs) + return _err(EBADF); - if (rs->state == rs_listening) + if (rs->state == rs_listening) { - if (events & EPOLLIN) + if (events & EPOLLIN) { - (void) rr_notify_event (rs->rr_epoll_pdata, events); + (void) rr_notify_event(rs->rr_epoll_pdata, events); } - return 0; + return 0; } - rr_rs_handle_rs (rs); + rr_rs_handle_rs(rs); - return 0; + return 0; } #endif /* #ifndef HANDLE__RSOCKET_RS_H_ */ @@ -239,155 +240,150 @@ rr_rs_handle (int fd, uint32_t events) #ifndef ADPT__RSOCKET_RS_H_ #define ADPT__RSOCKET_RS_H_ -inline static int -rr_rs_evfd (struct rsocket *rs) +inline static int rr_rs_evfd(struct rsocket *rs) { - if (rs->type == SOCK_STREAM) + if (rs->type == SOCK_STREAM) { - if (rs->state >= rs_connected) - return rs->cm_id->recv_cq_channel->fd; - else - return rs->cm_id->channel->fd; + if (rs->state >= rs_connected) + return rs->cm_id->recv_cq_channel->fd; + else + return rs->cm_id->channel->fd; } - else + else { - return rs->epfd; + return rs->epfd; } - return -1; + return -1; } -int -rr_rs_ep_add (int fd, void *pdata, uint32_t * revent) +int rr_rs_ep_add(int fd, void *pdata, uint32_t * revent) { - int ref; - struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - RR_DBG ("(%d(%p),)\n", fd, rs); - if (!rs) - return _err (EBADF); - - ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1); - if (1 == ref) + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); + RR_DBG("(%d(%p),)\n", fd, rs); + if (!rs) + return _err(EBADF); + + ref = __sync_add_and_fetch(&rs->rr_epoll_ref, 1); + if (1 == ref) { - rs->rr_epoll_fd = rr_rs_evfd (rs); - (void) rr_ep_add (rs->rr_epoll_fd, rs->index); + rs->rr_epoll_fd = rr_rs_evfd(rs); + (void) rr_ep_add(rs->rr_epoll_fd, rs->index); } - (void) rr_rs_handle_rs (rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); + (void) rr_rs_handle_rs(rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG ("*revent=0x%x\n", *revent); - return 0; + RR_DBG("*revent=0x%x\n", *revent); + return 0; } -int -rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent) +int rr_rs_ep_mod(int fd, void *pdata, uint32_t * revent) { - struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - RR_DBG ("(%d(%p),)\n", fd, rs); - if (!rs) - return _err (EBADF); + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); + RR_DBG("(%d(%p),)\n", fd, rs); + if (!rs) + return _err(EBADF); - if (rs->rr_epoll_ref <= 0) - return _err (ENOENT); + if (rs->rr_epoll_ref <= 0) + return _err(ENOENT); - (void) rr_rs_handle_rs (rs); - *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); + (void) rr_rs_handle_rs(rs); + *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both(rs); - rs->rr_epoll_pdata = pdata; + rs->rr_epoll_pdata = pdata; - RR_DBG ("*revent=0x%x\n", *revent); - return 0; + RR_DBG("*revent=0x%x\n", *revent); + return 0; } -int -rr_rs_ep_del (int fd) +int rr_rs_ep_del(int fd) { - int ref; - struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); - RR_DBG ("(%d(%p))\n", fd, rs); + int ref; + struct rsocket *rs = (struct rsocket *) idm_lookup(&idm, fd); + RR_DBG("(%d(%p))\n", fd, rs); - if (!rs) - return _err (EBADF); + if (!rs) + return _err(EBADF); - ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1); - if (0 == ref) + ref = __sync_sub_and_fetch(&rs->rr_epoll_ref, 1); + if (0 == ref) { - (void) rr_ep_del (rs->rr_epoll_fd); - rs->rr_epoll_fd = -1; + (void) rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_fd = -1; } - return 0; + return 0; } #endif /* #ifndef ADPT__RSOCKET_RS_H_ */ -inline static void -rr_rs_connected (struct rsocket *rs) +inline static void rr_rs_connected(struct rsocket *rs) { - RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, - rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state); + RR_DBG("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, + rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd(rs), rs->state); - if (!(rs->state & rs_connected)) + if (!(rs->state & rs_connected)) { - rr_rs_notify_tcp (rs); - return; + rr_rs_notify_tcp(rs); + return; } - if (rs->rr_epoll_ref) + if (rs->rr_epoll_ref) { - int evfd = rr_rs_evfd (rs); + int evfd = rr_rs_evfd(rs); - if (evfd != rs->rr_epoll_fd) + if (evfd != rs->rr_epoll_fd) { - (void) rr_ep_del (rs->rr_epoll_fd); - rs->rr_epoll_fd = evfd; - (void) rr_ep_add (evfd, rs->index); + (void) rr_ep_del(rs->rr_epoll_fd); + rs->rr_epoll_fd = evfd; + (void) rr_ep_add(evfd, rs->index); } - rr_rs_handle_tcp (rs); + rr_rs_handle_tcp(rs); } } -int -raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags) +int raccept4(int socket, struct sockaddr *addr, socklen_t * addrlen, + int flags) { - int ret, fd; - struct rsocket *rs; + int ret, fd; + struct rsocket *rs; - RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); - fd = raccept (socket, addr, addrlen); - RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); - if (fd < 0) - return fd; + RR_DBG("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); + fd = raccept(socket, addr, addrlen); + RR_DBG("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); + if (fd < 0) + return fd; - rs = (struct rsocket *) idm_lookup (&idm, fd); - if (!rs) + rs = (struct rsocket *) idm_lookup(&idm, fd); + if (!rs) { - RR_ERR ("panic\n"); - return -1; + RR_ERR("panic\n"); + return -1; } - if (flags & SOCK_NONBLOCK) + if (flags & SOCK_NONBLOCK) { - if (0 == (rs->fd_flags & O_NONBLOCK)) + if (0 == (rs->fd_flags & O_NONBLOCK)) { - RR_DBG ("orig flag:%x\n", - GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL)); - ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); - if (0 == ret) - rs->fd_flags |= O_NONBLOCK; + RR_DBG("orig flag:%x\n", + GSAPI(fcntl) (rs->cm_id->channel->fd, F_GETFL)); + ret = GSAPI(fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + if (0 == ret) + rs->fd_flags |= O_NONBLOCK; } } - if (flags & SOCK_CLOEXEC) + if (flags & SOCK_CLOEXEC) { - RR_LOG ("ignore flag:SOCK_CLOEXEC\n"); + RR_LOG("ignore flag:SOCK_CLOEXEC\n"); } - return fd; + return fd; } #endif /* #ifndef _RSOCKET_RS_C_ */ |