/* * * Copyright (c) 2018 Huawei Technologies Co.,Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _RSOCKET_RS_C_ #define _RSOCKET_RS_C_ inline static void rr_rs_init (struct rsocket *rs) { RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); rs->rr_epoll_ref = 0; rs->rr_epoll_fd = -1; rs->rr_epoll_pdata = NULL; } inline static void rr_rs_dest (struct rsocket *rs) { RR_DBG ("(rs:%p{index:%d})\n", rs, rs->index); if (rs->rr_epoll_ref) { (void) rr_ep_del (rs->rr_epoll_fd); rs->rr_epoll_ref = 0; rs->rr_epoll_fd = -1; rs->rr_epoll_pdata = NULL; } } #ifndef POLL__RSOCKET_RS_H_ #define POLL__RSOCKET_RS_H_ static inline uint32_t rr_rs_poll_tcp (struct rsocket *rs) { uint32_t events = 0; if (rs->state & rs_connected) { if (rs_have_rdata (rs)) events |= EPOLLIN; if (rs_can_send (rs)) events |= EPOLLOUT; } if (rs->state & (rs_error | rs_connect_error)) events |= EPOLLERR; if (rs->state & rs_disconnected) events |= EPOLLHUP; return events; } static inline uint32_t rr_rs_poll_udp (struct rsocket *rs) { uint32_t events = 0; if (rs_have_rdata (rs)) events |= EPOLLIN; if (ds_can_send (rs)) events |= EPOLLOUT; if (rs->state & rs_error) events |= EPOLLERR; return events; } static inline uint32_t rr_rs_poll_both (struct rsocket *rs) { if (rs->type == SOCK_STREAM) return rr_rs_poll_tcp (rs); if (rs->type == SOCK_DGRAM) return rr_rs_poll_udp (rs); return 0; } uint32_t rr_rs_poll (int fd, uint32_t revents) { struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); if (!rs) return 0; if (rs->state == rs_listening) return revents; return rr_rs_poll_both (rs); } #endif /* #ifndef POLL__RSOCKET_RS_H_ */ static inline void rr_rs_notify_tcp (struct rsocket *rs) { if (rs->rr_epoll_ref) { uint32_t events = rr_rs_poll_tcp (rs); if (events) (void) rr_notify_event (rs->rr_epoll_pdata, events); } } static inline void rr_rs_notify_udp (struct rsocket *rs) { if (rs->rr_epoll_ref) { uint32_t events = rr_rs_poll_udp (rs); if (events) (void) rr_notify_event (rs->rr_epoll_pdata, events); } } #ifndef HANDLE__RSOCKET_RS_H_ #define HANDLE__RSOCKET_RS_H_ inline static void rr_rs_handle_tcp (struct rsocket *rs) { int ret; RR_DBG ("(%d)@ state:0x%x\n", rs->index, rs->state); if (!(rs->state & (rs_connected | rs_opening))) return; fastlock_acquire (&rs->cq_wait_lock); ret = rs_get_cq_event (rs); RR_DBG ("rs_get_cq_event({%d})=%d,%d\n", rs->index, ret, errno); fastlock_release (&rs->cq_wait_lock); fastlock_acquire (&rs->cq_lock); if (rs->state & rs_connected) { rs_update_credits (rs); ret = rs_poll_cq (rs); RR_DBG ("rs_poll_cq({%d})=%d,%d {ref:%d, armed:%d}\n", rs->index, ret, errno, rs->rr_epoll_ref, rs->cq_armed); } if (rs->rr_epoll_ref && rs->cq_armed < 1) { ret = ibv_req_notify_cq (rs->cm_id->recv_cq, 0); RR_DBG ("ibv_req_notify_cq({%d})=%d,%d\n", rs->index, ret, errno); if (0 == ret) __sync_fetch_and_add (&rs->cq_armed, 1); } if (rs->state & rs_connected) { ret = rs_poll_cq (rs); RR_DBG ("rs_poll_cq({%d})=%d,%d\n", rs->index, ret, errno); rs_update_credits (rs); } fastlock_release (&rs->cq_lock); RR_DBG ("(%d)=\n", rs->index); } inline static void rr_rs_handle_udp (struct rsocket *rs) { fastlock_acquire (&rs->cq_wait_lock); ds_get_cq_event (rs); fastlock_release (&rs->cq_wait_lock); fastlock_acquire (&rs->cq_lock); ds_poll_cqs (rs); if (rs->rr_epoll_ref && !rs->cq_armed) { ds_req_notify_cqs (rs); rs->cq_armed = 1; } fastlock_release (&rs->cq_lock); } inline static void rr_rs_handle_rs (struct rsocket *rs) { if (rs->state & rs_opening) { int ret = rs_do_connect (rs); RR_DBG ("rs_do_connect(%p{%d}):%d:%d\n", rs, rs->index, ret, errno); return; } if (rs->type == SOCK_STREAM) { rr_rs_handle_tcp (rs); } if (rs->type == SOCK_DGRAM) { rr_rs_handle_udp (rs); } } int rr_rs_handle (int fd, uint32_t events) { struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); RR_DBG ("(fd:%d, events:0x%x):rs:%p\n", fd, events, rs); if (!rs) return _err (EBADF); if (rs->state == rs_listening) { if (events & EPOLLIN) { (void) rr_notify_event (rs->rr_epoll_pdata, events); } return 0; } rr_rs_handle_rs (rs); return 0; } #endif /* #ifndef HANDLE__RSOCKET_RS_H_ */ #ifndef ADPT__RSOCKET_RS_H_ #define ADPT__RSOCKET_RS_H_ inline static int rr_rs_evfd (struct rsocket *rs) { if (rs->type == SOCK_STREAM) { if (rs->state >= rs_connected) return rs->cm_id->recv_cq_channel->fd; else return rs->cm_id->channel->fd; } else { return rs->epfd; } return -1; } int rr_rs_ep_add (int fd, void *pdata, uint32_t * revent) { int ref; struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); RR_DBG ("(%d(%p),)\n", fd, rs); if (!rs) return _err (EBADF); ref = __sync_add_and_fetch (&rs->rr_epoll_ref, 1); if (1 == ref) { rs->rr_epoll_fd = rr_rs_evfd (rs); (void) rr_ep_add (rs->rr_epoll_fd, rs->index); } (void) rr_rs_handle_rs (rs); *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); rs->rr_epoll_pdata = pdata; RR_DBG ("*revent=0x%x\n", *revent); return 0; } int rr_rs_ep_mod (int fd, void *pdata, uint32_t * revent) { struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); RR_DBG ("(%d(%p),)\n", fd, rs); if (!rs) return _err (EBADF); if (rs->rr_epoll_ref <= 0) return _err (ENOENT); (void) rr_rs_handle_rs (rs); *revent = rs->state == rs_listening ? 0 : rr_rs_poll_both (rs); rs->rr_epoll_pdata = pdata; RR_DBG ("*revent=0x%x\n", *revent); return 0; } int rr_rs_ep_del (int fd) { int ref; struct rsocket *rs = (struct rsocket *) idm_lookup (&idm, fd); RR_DBG ("(%d(%p))\n", fd, rs); if (!rs) return _err (EBADF); ref = __sync_sub_and_fetch (&rs->rr_epoll_ref, 1); if (0 == ref) { (void) rr_ep_del (rs->rr_epoll_fd); rs->rr_epoll_fd = -1; } return 0; } #endif /* #ifndef ADPT__RSOCKET_RS_H_ */ inline static void rr_rs_connected (struct rsocket *rs) { RR_DBG ("rsfd:%d ref:%d evfd:%d->%d state:0x%x\n", rs->index, rs->rr_epoll_ref, rs->rr_epoll_fd, rr_rs_evfd (rs), rs->state); if (!(rs->state & rs_connected)) { rr_rs_notify_tcp (rs); return; } if (rs->rr_epoll_ref) { int evfd = rr_rs_evfd (rs); if (evfd != rs->rr_epoll_fd) { (void) rr_ep_del (rs->rr_epoll_fd); rs->rr_epoll_fd = evfd; (void) rr_ep_add (evfd, rs->index); } rr_rs_handle_tcp (rs); } } int raccept4 (int socket, struct sockaddr *addr, socklen_t * addrlen, int flags) { int ret, fd; struct rsocket *rs; RR_DBG ("(%d, %p, %p, %d)@\n", socket, addr, addrlen, flags); fd = raccept (socket, addr, addrlen); RR_DBG ("(%d, , , %d):%d:%d\n", socket, flags, fd, errno); if (fd < 0) return fd; rs = (struct rsocket *) idm_lookup (&idm, fd); if (!rs) { RR_ERR ("panic\n"); return -1; } if (flags & SOCK_NONBLOCK) { if (0 == (rs->fd_flags & O_NONBLOCK)) { RR_DBG ("orig flag:%x\n", GSAPI (fcntl) (rs->cm_id->channel->fd, F_GETFL)); ret = GSAPI (fcntl) (rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); if (0 == ret) rs->fd_flags |= O_NONBLOCK; } } if (flags & SOCK_CLOEXEC) { RR_LOG ("ignore flag:SOCK_CLOEXEC\n"); } return fd; } #endif /* #ifndef _RSOCKET_RS_C_ */