aboutsummaryrefslogtreecommitdiffstats
path: root/thirdparty/apps/testapp/cps/cps_s.c
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/apps/testapp/cps/cps_s.c')
-rw-r--r--thirdparty/apps/testapp/cps/cps_s.c320
1 files changed, 320 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/cps/cps_s.c b/thirdparty/apps/testapp/cps/cps_s.c
new file mode 100644
index 0000000..57d41c7
--- /dev/null
+++ b/thirdparty/apps/testapp/cps/cps_s.c
@@ -0,0 +1,320 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "cps.h"
+
+inline static void
+cps_s_close (struct cps_thread *thread)
+{
+ int fd;
+ struct epoll_event ev = { 0 };
+
+ for (fd = thread->server; fd >= 0; fd = CPS_CONN (fd)->next)
+ {
+ (void) _epoll_ctl (thread->epfd, EPOLL_CTL_DEL, fd, &ev);
+ _close (fd);
+ }
+
+ thread->server = -1;
+}
+
+int
+cps_s_listen (struct cps_thread *thread)
+{
+ int i;
+
+ for (i = 0; i < thread->server_num; ++i)
+ {
+ int fd, ret;
+ struct timespec dummy;
+ struct epoll_event event;
+ struct sockaddr_in *server = &thread->s_addr[i];
+
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno);
+
+ ret = _bind (fd, (struct sockaddr *) server, sizeof (*server));
+ ERR_RETURN (ret, -1, "bind(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = set_nonblock (fd);
+ ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = _listen (fd, SOMAXCONN);
+ ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = CPS_EV_DATA (i, fd);
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event);
+ ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret,
+ errno);
+
+ out ("[%d.%d:%d] listen on %s\n", thread->index, i, fd,
+ f_inaddr (server));
+
+ cps_add_server (thread, fd, i);
+ }
+
+ return 0;
+}
+
+int
+cps_s_accept (struct cps_thread *thread, int server_fd)
+{
+ while (cps.run_state == CPS_RUNNING)
+ {
+ int fd, ret;
+ struct timespec begin;
+ struct epoll_event event;
+#if defined(DEBUG)
+ struct sockaddr_in addr;
+ socklen_t len = sizeof (addr);
+#endif
+
+ LB_TIME (begin);
+
+#if defined(DEBUG) && 0
+ fd =
+ _accept4 (server_fd, (struct sockaddr *) &addr, &len, SOCK_NONBLOCK);
+#else
+ fd = _accept4 (server_fd, NULL, NULL, SOCK_NONBLOCK);
+#endif
+ if (fd < 0)
+ {
+ int e = errno;
+ if (e == EAGAIN || e == EWOULDBLOCK)
+ return 0;
+ DBG ("->accept4(%d)=%d:%d\n", server_fd, fd, e);
+ CPS_CNT_INC_E (thread, CPS_CNT_ACCEPT_ERR, errno);
+ return -1;
+ }
+
+ CPS_REC_INC (thread, CPS_REC_INIT);
+// DBG("(%d, %d) -> accepted(%d) %d: %s", thread->index, sid, server->fd, fd, f_inaddr(&addr));
+
+ if (fd >= CPS_MAX_FD)
+ {
+ _close (fd);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ continue;
+ }
+
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ CPS_CNT_INC_E (thread, CPS_CNT_NODELAY_ERR, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd);
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event);
+ if (ret)
+ {
+ _close (fd);
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ DBG ("epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret, errno);
+ continue;
+ }
+
+ cps_add_conn (thread, fd, -cps.req_len, &begin);
+ }
+
+ return 0;
+}
+
+int
+cps_s_io (struct cps_thread *thread, int fd, uint32_t events)
+{
+ static char buf[CPS_DATA_MAX];
+
+ int ret;
+ struct cps_conn *conn = CPS_CONN (fd);
+
+ DBG ("(%d, %d, %x) conn:{size:%d next:%d prev:%p:%d}\n",
+ thread->index, fd, events, conn->size, conn->next, conn->prev,
+ *conn->prev);
+
+ if (events & EPOLLERR)
+ {
+ CPS_CNT_INC (thread, CPS_CNT_ERR_EVENT);
+ DBG ("(%d, %d, 0x%x)\n", thread->index, fd, events);
+ goto ERR;
+ }
+
+ if (0 == (events & EPOLLIN))
+ return 0;
+
+ while (1)
+ {
+ if (cps.run_state <= CPS_INIT)
+ goto ERR;
+
+ ret = _recv (fd, buf, sizeof (buf), 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= 0)
+ {
+ CPS_REC_TIMED_INC (thread, CPS_REC_RECV, conn->last);
+ break; /* receive success */
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return 0;
+ if (e == EINTR)
+ continue;
+ CPS_CNT_INC_E (thread, CPS_CNT_RECV_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_RECV_ERR);
+ }
+ DBG ("->recv(%d,, %ld)=%d:%d\n", fd, sizeof (buf), ret, errno);
+ goto ERR;
+ }
+ }
+
+ conn->size = 0;
+
+ while (cps.run_state > CPS_INIT)
+ {
+ ret = _send (fd, buf, cps.res_len - conn->size, 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= cps.res_len)
+ {
+ _close (fd);
+ CPS_REC_TIMED_INC (thread, CPS_REC_SEND, conn->last);
+ cps_rem_conn (thread, fd, conn);
+ return 0;
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN || e == EINTR)
+ continue;
+ CPS_CNT_INC_E (thread, CPS_CNT_SEND_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_SEND_ERR);
+ }
+ DBG ("->send(%d,, %d)=%d:%d\n", fd, cps.res_len - conn->size, ret,
+ errno);
+ goto ERR;
+ }
+ }
+
+ return 0;
+
+ERR:
+ _close (fd);
+ cps_rem_conn (thread, fd, conn);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ return -1;
+}
+
+void *
+cps_s_thread (void *arg)
+{
+ int num = 0;
+ struct cps_thread *thread = (struct cps_thread *) arg;
+ struct epoll_event *event = thread->event;
+
+ out ("[%d] initialize thread %ld server:%d core:%d epfd:%d\n",
+ thread->index, pthread_self (), thread->server_num, thread->core,
+ thread->epfd);
+
+ if (cps_s_listen (thread))
+ {
+ cps.run_state = CPS_ERROR;
+ return NULL;
+ }
+
+ futex_wait (&cps.run_state, CPS_INIT);
+
+ while (1)
+ {
+
+ if (num > 0)
+ {
+ int sid = CPS_EV_SID (event->data.u64);
+ int fd = CPS_EV_FD (event->data.u64);
+ DBG ("epoll evnet{sid:%d fd:%d event:%x}\n", sid, fd,
+ event->events);
+ if (sid >= 0)
+ {
+ if (event->events & EPOLLIN)
+ {
+ (void) cps_s_accept (thread, fd);
+ }
+ if (event->events & EPOLLERR)
+ {
+ wrn ("Error event for server %d\n", fd);
+ }
+ }
+ else
+ {
+ if (fd >= CPS_MAX_FD)
+ {
+ err ("Error connection index %d\n", fd);
+ }
+ else
+ {
+ (void) cps_s_io (thread, fd, event->events);
+ }
+ }
+
+ num--;
+ event++;
+ }
+
+ if (num <= 0)
+ {
+ if (cps.run_state == CPS_CLOSING)
+ {
+ if (thread->server >= 0)
+ cps_s_close (thread);
+ if (thread->conn_num <= 0)
+ break;
+ }
+ else if (cps.run_state <= CPS_INIT)
+ {
+ break;
+ }
+
+ event = thread->event;
+ num = _epoll_wait (thread->epfd, event, cps.evnum, CPS_EPWAIT_MS);
+ if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, e);
+ }
+ }
+ }
+
+ __sync_fetch_and_sub (&cps.active_thread, 1);
+ return NULL;
+}