aboutsummaryrefslogtreecommitdiffstats
path: root/thirdparty/apps/testapp/cps
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/apps/testapp/cps')
-rw-r--r--thirdparty/apps/testapp/cps/cps.c833
-rw-r--r--thirdparty/apps/testapp/cps/cps.h283
-rw-r--r--thirdparty/apps/testapp/cps/cps_c.c394
-rw-r--r--thirdparty/apps/testapp/cps/cps_s.c320
4 files changed, 1830 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/cps/cps.c b/thirdparty/apps/testapp/cps/cps.c
new file mode 100644
index 0000000..377d0de
--- /dev/null
+++ b/thirdparty/apps/testapp/cps/cps.c
@@ -0,0 +1,833 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "cps.h"
+
+void *cps_s_thread (void *arg);
+void *cps_c_thread (void *arg);
+
+static const char *const cps_stat_name[CPS_CNT_NUM] = {
+ [CPS_CNT_CONN_MAX] = "max",
+ [CPS_CNT_CONN_NUM] = "conn",
+
+ [CPS_CNT_GTFD] = "GREATER-THAN-MAX-FD",
+ [CPS_CNT_SOCKET_ERR] = "socket-err",
+ [CPS_CNT_BIND_ERR] = "bind-err",
+ [CPS_CNT_CONNECT_ERR] = "connect-err",
+ [CPS_CNT_REUSEADDR_ERR] = "reuseaddr-err",
+ [CPS_CNT_NODELAY_ERR] = "nodelay-err",
+ [CPS_CNT_NONBLOCK_ERR] = "nonblock-err",
+ [CPS_CNT_ACCEPT_ERR] = "accept-err",
+ [CPS_CNT_SEND_ERR] = "send-err",
+ [CPS_CNT_RECV_ERR] = "recv-err",
+ [CPS_CNT_EPOLL_ERR] = "epoll-err",
+ [CPS_CNT_ERR_EVENT] = "err-event",
+ [CPS_CNT_FD_ERR] = "cid-err",
+};
+
+struct cps_var cps = { 0 };
+
+void
+cps_title ()
+{
+ out ("------------------------------------------------------------\n");
+ out (" cps test %s\n", cps.client ? "client" : "server");
+ out
+ (" total server: %d total thread: %d report interval: %ds request:%d response: %d CPU NUM: %d\n",
+ cps.server_num, cps.thread_num, cps.interval, cps.req_len, cps.res_len,
+ cps.CPU_NUM);
+ if (cps.client)
+ {
+ out (" total client: %d test time: %ds defalut rate:%lu\n",
+ cps.client_num, cps.test_time, cps.rate);
+ }
+ else
+ {
+ }
+ out ("------------------------------------------------------------\n");
+}
+
+inline static char *
+cps_tip (char *pos, char tip)
+{
+ *pos++ = '|';
+ *pos++ = tip;
+ *pos++ = ':';
+ *pos++ = ' ';
+ return pos;
+}
+
+inline static char *
+cps_fmt (char *pos, uint64_t val, int *size)
+{
+ int s = r_uint (pos, val, *size);
+ pos += s;
+ *pos++ = ' ';
+ if (s > *size)
+ *size = s;
+ return pos;
+}
+
+inline static char *
+cps_format (char *pos, char tip, uint64_t val, uint64_t nsec, int *size)
+{
+ pos = cps_tip (pos, tip);
+
+ if (cps.more)
+ pos = cps_fmt (pos, val, size);
+
+ pos = cps_fmt (pos, lb_gdiv (val, nsec), size + 1);
+
+ return pos;
+}
+
+void
+cps_output (int index, struct cps_stat *stat, uint64_t nsec)
+{
+ struct fmtsize
+ {
+ int s_init[2];
+ int s_conn[3];
+ int s_recv[3];
+ int s_send[3];
+ int s_fail[2];
+ };
+ static struct fmtsize size = { 0 };
+ static int space_line = 0;
+ static char buf[512];
+
+ char *pos = buf;
+ int i, cnt_num = 0;
+
+ if (!stat->rec[CPS_REC_INIT] && !stat->rec[CPS_REC_CONN] &&
+ !stat->rec[CPS_REC_RECV] && !stat->rec[CPS_REC_SEND]
+ && !stat->rec[CPS_REC_FAIL])
+ {
+ for (i = 0; i < CPS_CNT_NUM; ++i)
+ {
+ if (stat->cnt[i])
+ break;
+ }
+ if (i >= CPS_CNT_NUM)
+ {
+ if (index < 0 && space_line++ == 0)
+ {
+ out ("\n");
+ memset (&size, 0, sizeof (size));
+ }
+ return;
+ }
+ }
+
+ space_line = 0;
+
+ if (index < 0)
+ pos += sprintf (pos, " sum ");
+ else
+ pos += sprintf (pos, " %3d ", index);
+
+ pos =
+ cps_format (pos, (cps.client ? 'C' : 'A'), stat->rec[CPS_REC_INIT], nsec,
+ size.s_init);
+ pos = cps_format (pos, 'E', stat->rec[CPS_REC_CONN], nsec, size.s_conn);
+ if (cps.client)
+ {
+ pos = cps_format (pos, 'S', stat->rec[CPS_REC_SEND], nsec, size.s_send);
+ pos = cps_format (pos, 'R', stat->rec[CPS_REC_RECV], nsec, size.s_recv);
+ }
+ else
+ {
+ pos = cps_format (pos, 'R', stat->rec[CPS_REC_RECV], nsec, size.s_recv);
+ pos = cps_format (pos, 'S', stat->rec[CPS_REC_SEND], nsec, size.s_send);
+ }
+ pos = cps_format (pos, 'F', stat->rec[CPS_REC_FAIL], nsec, size.s_fail);
+
+ pos = cps_tip (pos, 'T');
+ pos =
+ cps_fmt (pos,
+ lb_sdiv (stat->rec[CPS_REC_CONN_TIME], stat->rec[CPS_REC_CONN]),
+ &size.s_conn[2]);
+ if (cps.client)
+ {
+ pos =
+ cps_fmt (pos,
+ lb_sdiv (stat->rec[CPS_REC_SEND_TIME],
+ stat->rec[CPS_REC_SEND]), &size.s_send[2]);
+ pos =
+ cps_fmt (pos,
+ lb_sdiv (stat->rec[CPS_REC_RECV_TIME],
+ stat->rec[CPS_REC_RECV]), &size.s_recv[2]);
+ }
+ else
+ {
+ pos =
+ cps_fmt (pos,
+ lb_sdiv (stat->rec[CPS_REC_RECV_TIME],
+ stat->rec[CPS_REC_RECV]), &size.s_recv[2]);
+ pos =
+ cps_fmt (pos,
+ lb_sdiv (stat->rec[CPS_REC_SEND_TIME],
+ stat->rec[CPS_REC_SEND]), &size.s_send[2]);
+ }
+
+ *pos++ = '|';
+ *pos = 0;
+
+ out ("%s", buf);
+
+ for (i = 0; i < CPS_CNT_NUM; ++i)
+ {
+ if (stat->cnt[i])
+ {
+ if (cnt_num++ == 0)
+ out (" { %s:%s", cps_stat_name[i], f_uint (stat->cnt[i]));
+ else
+ out (" %s:%s", cps_stat_name[i], f_uint (stat->cnt[i]));
+ }
+ }
+
+ if (cnt_num)
+ out (" }\n");
+ else
+ out ("\n");
+
+ for (i = 1; i < CPS_ERR_NUM; ++i)
+ {
+ if (stat->err[i])
+ out ("<E%d:%s> %s\n", i, f_uint (stat->err[i]), strerror (i));
+ }
+ if (stat->err[0])
+ out ("<E-:%s> Other error\n", f_uint (stat->err[0]));
+}
+
+void
+cps_close ()
+{
+ cps.run_state = CPS_CLOSING;
+}
+
+void
+cps_timer (uint64_t nsec)
+{
+ const static struct timespec delay = {.tv_sec = 0,.tv_nsec =
+ CPS_DELAY_MS * 1000 * 1000
+ };
+
+ int i, j;
+ struct cps_stat sum = { 0 };
+ struct cps_stat *curr = cps.curr;
+
+ cps.curr = cps.next;
+ cps.next = curr;
+
+ /*wait for cps.curr use */
+ (void) nanosleep (&delay, NULL);
+
+ for (i = 0; i < cps.thread_num; ++i, ++curr)
+ {
+ struct cps_thread *thread = cps.thread[i];
+
+ curr->cnt[CPS_CNT_CONN_NUM] = thread->conn_num;
+ if (cps.verbose)
+ cps_output (thread->index, curr, nsec);
+
+ for (j = 0; j < CPS_REC_NUM; ++j)
+ {
+ sum.rec[j] += curr->rec[j];
+ curr->rec[j] = 0;
+ }
+
+ for (j = 0; j < CPS_CNT_NUM; ++j)
+ {
+ sum.cnt[j] += curr->cnt[j];
+ curr->cnt[j] = 0;
+ }
+
+ for (j = 0; j < CPS_ERR_NUM; ++j)
+ {
+ sum.err[j] += curr->err[j];
+ curr->err[j] = 0;
+ }
+ }
+
+ cps_output (-1, &sum, nsec);
+}
+
+int
+cps_loop ()
+{
+ const static struct timespec timeout = {.tv_sec = 0,.tv_nsec =
+ CPS_TIMER_MS * 1000 * 1000
+ };
+
+ struct timespec begin, from;
+ time_t next_time = cps.interval;
+
+ LB_TIME (begin);
+ from = begin;
+
+ while (cps.run_state == CPS_RUNNING)
+ {
+ struct timespec now;
+
+ (void) nanosleep (&timeout, NULL);
+
+ LB_TIME (now);
+
+ if (cps.client)
+ {
+ if (LB_CMP_S (now, begin, cps.test_time))
+ cps_close ();
+ }
+
+ if (!LB_CMP_S (now, begin, next_time))
+ continue;
+
+ cps_timer (LB_SUB_NS (now, from));
+
+ from = now;
+ next_time += cps.interval;
+ }
+
+ while (cps.run_state == CPS_CLOSING && cps.active_thread)
+ {
+ (void) nanosleep (&timeout, NULL);
+ }
+
+ return 0;
+}
+
+int
+cps_start ()
+{
+ int i;
+ void *(*proc) (void *);
+ const char *name;
+
+ cps.conn =
+ (struct cps_conn *) malloc (sizeof (struct cps_conn) * CPS_MAX_FD);
+ ERR_RETURN (!cps.conn, -1, "Out of memory\n");
+
+ if (cps.thread_num <= 0)
+ {
+ struct cps_thread *thread = cps_new_thread ();
+ ERR_RETURN (!thread, -1, "Out of memory\n");
+
+ cps.server_num = 1;
+ thread->server_num = 1;
+ thread->s_addr[0].sin_family = AF_INET;
+ thread->s_addr[0].sin_port = htons (CPS_PORT_DEF);
+ if (cps.client)
+ {
+ cps.client_num = 1;
+ thread->client_num = 1;
+ thread->c_addr_num = 1;
+ thread->s_addr[0].sin_addr.s_addr = htonl (0x7F000001);
+ }
+ else
+ {
+ thread->s_addr[0].sin_addr.s_addr = INADDR_ANY;
+ }
+ }
+ else if (cps.client)
+ {
+ for (i = 0; i < cps.thread_num; ++i)
+ {
+ if (cps.thread[i]->client_num)
+ continue;
+ cps.thread[i]->client_num = 1;
+ cps.thread[i]->c_addr[0].ip = INADDR_ANY;
+ cps.thread[i]->c_addr[0].ip_num = 1;
+ cps.client_num++;
+ }
+ }
+
+ if (cps.req_len <= 0)
+ cps.req_len = CPS_REQ_DEF;
+ if (cps.res_len <= 0)
+ cps.res_len = CPS_RES_DEF;
+ if (cps.evnum <= 0)
+ cps.evnum = CPS_EVNUM_DEF;
+ if (cps.interval <= 0)
+ cps.interval = CPS_INTERVAL_DEF;
+ if (cps.test_time <= 0)
+ cps.test_time = CPS_TIME_DEF;
+ if (cps.rate == 0)
+ cps.rate = CPS_RATE_DEF;
+
+ cps.curr = cps.records[0];
+ cps.next = cps.records[1];
+
+ if (cps.client)
+ {
+ proc = cps_c_thread;
+ name = "client";
+ }
+ else
+ {
+ proc = cps_s_thread;
+ name = "server";
+ }
+
+ cps_title ();
+
+ for (i = 0; i < cps.thread_num; ++i)
+ {
+ if (cps.thread[i]->rate == 0)
+ cps.thread[i]->rate = cps.rate;
+
+ cps.thread[i]->epfd = _epoll_create (CPS_EPSIZE);
+ ERR_RETURN (cps.thread[i]->epfd < 0, -1, "epoll_create(%d)=%d:%d\n",
+ CPS_EPSIZE, cps.thread[i]->epfd, errno);
+
+ cps.thread[i]->tid =
+ lb_thread (proc, cps.thread[i], "cps-%s-%d", name, i);
+ ERR_RETURN (cps.thread[i]->tid == 0, -1, "Create thread %s-%d failed",
+ name, i);
+
+ if (cps.thread[i]->core >= 0)
+ {
+ int ret = lb_setcpu (cps.thread[i]->tid, cps.thread[i]->core);
+ WRN (ret != 0, "Bind core error thread:%d\n", i);
+ }
+
+ __sync_fetch_and_add (&cps.active_thread, 1);
+ }
+
+ cps.run_state = CPS_RUNNING;
+ futex_wake (&cps.run_state, cps.thread_num);
+
+ return 0;
+}
+
+void
+cps_exit ()
+{
+ int i;
+
+ cps.run_state = CPS_EXIT;
+
+ for (i = 0; i < cps.thread_num; ++i)
+ {
+ int fd;
+ struct cps_thread *thread = cps.thread[i];
+
+ if (!thread)
+ continue;
+
+ if (thread->tid)
+ pthread_join (thread->tid, NULL);
+
+ if (thread->epfd >= 0)
+ _close (thread->epfd);
+
+ for (fd = thread->server; fd >= 0; fd = CPS_CONN (fd)->next)
+ _close (fd);
+
+ for (fd = thread->conn; fd >= 0; fd = CPS_CONN (fd)->next)
+ _close (fd);
+
+ cps.thread[i] = NULL;
+ free (thread);
+ }
+
+ if (cps.conn)
+ free (cps.conn);
+}
+
+void
+cps_break (int s)
+{
+ DBG (" SIGNALED %d running:%d\n", s, cps.run_state);
+ out ("\n");
+
+ if (cps.run_state == CPS_INIT || cps.run_state == CPS_RUNNING)
+ cps_close ();
+ else if (cps.run_state != CPS_EXIT)
+ cps_exit ();
+ else
+ exit (1);
+}
+
+void
+cps_sigpipe (int s)
+{
+ DBG ("SIGPIPE\n");
+}
+
+int
+cps_init ()
+{
+ struct sigaction s = { 0 };
+
+ (void) sigemptyset (&s.sa_mask);
+
+ s.sa_flags = SA_NODEFER;
+ s.sa_handler = (void *) cps_break;
+ (void) sigaction (SIGINT, &s, NULL);
+ (void) sigaction (SIGQUIT, &s, NULL);
+
+ s.sa_handler = cps_sigpipe;
+ (void) sigaction (SIGPIPE, &s, NULL);
+
+// lb_sigsegv_setup();
+
+ cps.CPU_NUM = get_nprocs ();
+
+ if (cps.CPU_NUM <= 0)
+ cps.CPU_NUM = 1;
+
+ return 0;
+}
+
+#ifndef EXEC_CPS_C_
+#define EXEC_CPS_C_
+
+#define CPS_OPTIONS "d:e:T:t:r:ci:" DBGOPT "mvh"
+
+static const struct option cps_options[] = {
+ {"data", 1, 0, 'd'},
+ {"interval", 1, 0, 'i'},
+ {"evnum", 1, 0, 'e'},
+ {"client", 0, 0, 'c'},
+ {"thread", 1, 0, 't'},
+ {"rate", 1, 0, 'r'},
+ {"time", 1, 0, 'T'},
+ DBGOPT_LONG {"more", 0, 0, 'm'},
+ {"verbose", 0, 0, 'v'},
+ {"help", 0, 0, 'h'},
+ {0, 0, 0, 0}
+};
+
+enum
+{
+ CPSOPT_SERVER = 0,
+ CPSOPT_S,
+ CPSOPT_CLIENT,
+ CPSOPT_C,
+ CPSOPT_RATE,
+ CPSOPT_CORE,
+ CPSOPT_CF,
+ CPSOPT_SF,
+};
+
+char *const cps_tokens[] = {
+ [CPSOPT_SERVER] = "server",
+ [CPSOPT_S] = "s",
+ [CPSOPT_CLIENT] = "client",
+ [CPSOPT_C] = "c",
+ [CPSOPT_RATE] = "rate",
+ [CPSOPT_CORE] = "core",
+ [CPSOPT_CF] = "cf",
+ [CPSOPT_SF] = "sf",
+ NULL
+};
+
+void
+cps_usage (const char *name)
+{
+ out ("USAGE: %s [OPTIONS] [SERVER-ADDRESS] # %s version\n", name,
+ VERSION_NAME);
+ out (" Options:\n");
+ out
+ (" -i, --interval=# report time(default: %ds max:%ds)\n",
+ CPS_INTERVAL_DEF, CPS_INTERVAL_MAX);
+ out
+ (" -c, --client server address list for one thread\n");
+ out
+ (" -e, --evnum epoll event number(default:%d max:%d)\n",
+ CPS_EVNUM_DEF, CPS_EVNUM_MAX);
+ out
+ (" -T, --time=# C test time(default: %ds max:%ds)\n",
+ CPS_TIME_DEF, CPS_TIME_MAX);
+ out
+ (" -d, --data=#[:#] C request and response data length(default:%d:%d max:%d)\n",
+ CPS_REQ_DEF, CPS_RES_DEF, CPS_DATA_MAX);
+ out
+ (" -r, --rate=#[k|m|w] C global connect rate per each thread(CPS, default: %d max:%d)\n",
+ CPS_RATE_DEF, CPS_RATE_MAX);
+ out
+ (" -t, --thread=CONFIG set one net and thread(max: %d)\n",
+ CPS_THREAD_MAX);
+ out
+ (" server=X.X.X.X:P server address set(max: %d)\n",
+ CPS_SERVER_MAX);
+ out
+ (" core=# bind to core\n");
+ out
+ (" client=X.X.X.X C client ip address set(max: %d max ip: %d)\n",
+ CPS_CLIENT_IAS_MAX, CPS_CLIENT_MAX);
+ out
+ (" rate=# C set connect rate for this thread(default: use global set)\n");
+ out
+ (" cf C client loop first(default: both)\n");
+ out
+ (" sf C server loop first(default: both)\n");
+#ifdef DEBUG
+ out
+ (" -D, --debug show debug information\n");
+#endif
+ out
+ (" -m, --more show more statistics\n");
+ out
+ (" -v, --verbose show thread statistics\n");
+ out (" -h, --help help\n");
+ out (" IMPORTANT:\n");
+ out
+ (" socket() EMFILE(%d) error: ulimit -n 1048576\n",
+ EMFILE);
+ out
+ (" bind() EADDRINUSE(%d) error: echo 1 > /proc/sys/net/ipv4/tcp_tw_recycle\n",
+ EADDRINUSE);
+ out
+ (" connect() EADDRNOTAVAIL(%d) error: echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse\n",
+ EADDRNOTAVAIL);
+ out
+ (" echo \"3000 65534\" > /proc/sys/net/ipv4/ip_local_port_range\n");
+}
+
+inline static uint64_t
+cps_p_rate (const char *arg)
+{
+ uint64_t rate = p_uint (arg, CPS_RATE_MAX, &arg);
+
+ if (!arg)
+ return (uint64_t) - 1ul;
+
+ switch (*arg)
+ {
+ case 'm': /* fall through */
+ case 'M':
+ rate *= 100; /* fall through */
+ case 'w': /* fall through */
+ case 'W':
+ rate *= 10; /* fall through */
+ case 'k': /* fall through */
+ case 'K':
+ rate *= 1000;
+ arg++;
+ break;
+ }
+
+ if (*arg)
+ return (uint64_t) - 1ul;
+
+ return rate;
+}
+
+int
+cps_opts (char *opts)
+{
+ struct cps_thread *thread;
+ struct inaddrs *client;
+ struct sockaddr_in *server;
+
+ ERR_RETURN (cps.thread_num >= CPS_THREAD_MAX, -1,
+ "Too many thread, max %d\n", CPS_THREAD_MAX);
+
+ thread = cps_new_thread ();
+ ERR_RETURN (!thread, -1, "Out of memory\n");
+ server = thread->s_addr;
+ client = thread->c_addr;
+
+ while (*opts)
+ {
+ char *value;
+ const char *end;
+ int ret = getsubopt (&opts, cps_tokens, &value);
+ switch (ret)
+ {
+ case CPSOPT_SERVER:
+ case CPSOPT_S:
+ {
+ int i;
+ uint64_t num;
+ struct inaddrs addr;
+
+ end = p_addr_set (value, &addr, PA_DEF_PORT | CPS_PORT_DEF);
+ ERR_RETURN (!end
+ || *end, -1, "Invalid server address '%s'.\n", value);
+
+ num = (uint64_t) addr.ip_num * addr.port_num;
+ ERR_RETURN (num > CPS_SERVER_MAX - thread->server_num, -1,
+ "Too many server, max %d\n", CPS_SERVER_MAX);
+
+ for (i = 0; i < addr.ip_num; ++i)
+ {
+ uint32_t ip = addr.ip + i;
+ uint16_t j;
+ for (j = 0; j < addr.port_num; ++j)
+ {
+ server->sin_family = AF_INET;
+ server->sin_addr.s_addr = htonl (ip);
+ server->sin_port = htons (addr.port + j);
+ server++;
+ }
+ }
+ thread->server_num += num;
+ break;
+ }
+ case CPSOPT_CLIENT:
+ case CPSOPT_C:
+ {
+ ERR_RETURN (thread->c_addr_num >= CPS_CLIENT_IAS_MAX, -1,
+ "Too many client set, max %d\n", CPS_CLIENT_IAS_MAX);
+
+ end = p_addr_set (value, client, PA_NO_PORT);
+ ERR_RETURN (!end
+ || *end, -1, "Invalid client address '%s'.\n", value);
+ ERR_RETURN (client->ip_num > CPS_CLIENT_MAX - thread->client_num,
+ -1, "Too many client, max %d\n", CPS_CLIENT_MAX);
+
+ client->port = 0;
+ client->port_num = 1;
+ thread->c_addr_num++;
+ thread->client_num += client->ip_num;
+ client++;
+ break;
+ }
+ case CPSOPT_RATE:
+ thread->rate = cps_p_rate (value);
+ ERR_RETURN (thread->rate > CPS_RATE_MAX, -1,
+ "Invalid thread rate '%s'\n", value);
+ break;
+
+ case CPSOPT_CF:
+ thread->loop = CPS_LOOP_CF;
+ break;
+ case CPSOPT_SF:
+ thread->loop = CPS_LOOP_SF;
+ break;
+
+ case CPSOPT_CORE:
+ thread->core = (int) p_int (value, cps.CPU_NUM - 1, &end);
+ ERR_RETURN (!end || *end
+ || thread->core <= 0, -1, "Invalid bind core '%s'.\n",
+ value);
+ break;
+
+ default:
+ ERR_RETURN (1, -1, "Unknown thread option '%s'\n", value);
+ }
+ }
+
+ ERR_RETURN (!thread->server_num, -1, "No server set for net %d\n",
+ thread->index);
+
+ cps.server_num += thread->server_num;
+ cps.client_num += thread->client_num;
+ return 0;
+}
+
+int
+cps_args (int argc, char *argv[])
+{
+ int opt, index;
+
+ while (EOF !=
+ (opt = getopt_long (argc, argv, CPS_OPTIONS, cps_options, &index)))
+ {
+ const char *end;
+
+ switch (opt)
+ {
+ case 't':
+ if (cps_opts (optarg))
+ return -1;
+ break;
+
+ case 'c':
+ cps.client = 1;
+ break;
+
+ case 'd':
+ cps.req_len = (int) p_int (optarg, CPS_DATA_MAX, &end);
+ ERR_RETURN (!end, -1, "Invalid data length '%s'\n", optarg);
+ if (*end == ':')
+ {
+ end++;
+ cps.res_len = (int) p_int (end, CPS_DATA_MAX, &end);
+ ERR_RETURN (!end, -1, "Invalid response data length '%s'\n",
+ optarg);
+ }
+ else
+ {
+ cps.res_len = cps.req_len;
+ }
+ ERR_RETURN (*end != 0, -1, "Invalid data length '%s'\n", optarg);
+ break;
+
+ case 'i':
+ cps.interval = (int) p_int (optarg, CPS_INTERVAL_MAX, &end);
+ ERR_RETURN (!end || *end, -1, "Invalid interval '%s'\n", optarg);
+ break;
+ case 'e':
+ cps.evnum = (int) p_int (optarg, CPS_EVNUM_MAX, &end);
+ ERR_RETURN (!end
+ || *end, -1, "Invalid event number '%s'\n", optarg);
+ break;
+ case 'T':
+ cps.test_time = (int) p_int (optarg, CPS_TIME_MAX, &end);
+ ERR_RETURN (!end || *end, -1, "Invalid test time '%s'\n", optarg);
+ break;
+ case 'r':
+ cps.rate = cps_p_rate (optarg);
+ ERR_RETURN (cps.rate > CPS_RATE_MAX, -1, "Invalid rate '%s'\n",
+ optarg);
+ break;
+ case 'v':
+ cps.verbose = 1;
+ break;
+ case 'm':
+ cps.more = 1;
+ break;
+
+#ifdef DEBUG
+ case 'D':
+ enable_debug = 1;
+ break;
+#endif
+ case 'h':
+ cps_usage (argv[0]);
+ exit (0);
+ case '?':
+ err ("Invalid arguments\n");
+ return -1;
+ default:
+ err ("Unknown option '%c'.\n", opt);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#endif /* #ifndef EXEC_CPS_C_ */
+
+int
+main (int argc, char *argv[])
+{
+ int ret;
+
+ if (cps_init ())
+ return 1;
+
+ cps_args (argc, argv) || cps_start () || cps_loop ();
+
+ cps_exit ();
+ return 0;
+}
diff --git a/thirdparty/apps/testapp/cps/cps.h b/thirdparty/apps/testapp/cps/cps.h
new file mode 100644
index 0000000..9b949f3
--- /dev/null
+++ b/thirdparty/apps/testapp/cps/cps.h
@@ -0,0 +1,283 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _CPS_H_
+#define _CPS_H_
+
+#define CPS_FRAG_NUM 64
+#define CPS_FRAG_NS (20 * 1000 * 1000) /* 20 ms */
+#define CPS_FRAG_LOOP (CPS_FRAG_NS * CPS_FRAG_NUM) /* 1.28 s */
+
+#define CPS_TIMER_MS 100
+#define CPS_DELAY_MS 5
+
+#define CPS_MAX_FD (16 * 1024 * 1024) /* 10M */
+#define CPS_CONN_MAX (256 * 1024)
+
+#define CPS_EPSIZE (1 * 1000) /* =1k */
+#define CPS_EPWAIT_MS 200 /* ms */
+
+#define CPS_THREAD_MAX 128
+#define CPS_SERVER_MAX 32
+#define CPS_CLIENT_MAX 256
+#define CPS_CLIENT_IAS_MAX 32
+
+#define CPS_PORT_DEF 58166
+#define CPS_EVNUM_DEF 256
+#define CPS_EVNUM_MAX 1024
+#define CPS_TIME_DEF 300
+#define CPS_TIME_MAX (60 * 60 * 24 * 7) /* 604800s = 1 week */
+#define CPS_RATE_DEF 10000 /* 1w */
+#define CPS_RATE_MAX (100 * 1000 * 1000) /* 100m */
+#define CPS_REQ_DEF 1
+#define CPS_RES_DEF 1
+#define CPS_DATA_MAX 4096
+#define CPS_INTERVAL_DEF 10 /* s */
+#define CPS_INTERVAL_MAX 3600 /* s */
+
+#define CPS_ERR_NUM 256
+
+#define CPS_CONN_SID (-1)
+#define CPS_EV_DATA(sid, fd) (((uint64_t)(uint32_t)(sid) << 32) | (uint64_t)(uint32_t)(fd))
+#define CPS_EV_FD(u64) ((int)(uint32_t)(u64))
+#define CPS_EV_SID(u64) ((int)(uint32_t)((u64) >> 32))
+
+enum
+{
+ CPS_LOOP_BOTH = 0,
+ CPS_LOOP_CF,
+ CPS_LOOP_SF,
+};
+
+enum
+{
+ CPS_ERROR = -2,
+ CPS_EXIT = -1,
+
+ CPS_INIT = 0,
+
+ CPS_RUNNING = 1,
+ CPS_CLOSING = 2,
+};
+
+enum
+{
+ CPS_CNT_CONN_MAX,
+ CPS_CNT_CONN_NUM,
+
+ CPS_CNT_GTFD,
+ CPS_CNT_SOCKET_ERR,
+ CPS_CNT_BIND_ERR,
+ CPS_CNT_ACCEPT_ERR,
+ CPS_CNT_CONNECT_ERR,
+ CPS_CNT_REUSEADDR_ERR,
+ CPS_CNT_NODELAY_ERR,
+ CPS_CNT_NONBLOCK_ERR,
+ CPS_CNT_SEND_ERR,
+ CPS_CNT_RECV_ERR,
+ CPS_CNT_EPOLL_ERR,
+ CPS_CNT_ERR_EVENT,
+ CPS_CNT_FD_ERR,
+
+ CPS_CNT_NUM
+};
+
+#define CPS_CNT_ITEM(thread, id) (cps.curr[(thread)->index].cnt[(id)])
+
+#define CPS_CNT_INC(thread, id) (++CPS_CNT_ITEM((thread), (id)))
+#define CPS_CNT_INC_E(thread, id, e) do { \
+ struct cps_stat *_stat = cps.curr + (thread)->index; \
+ ++_stat->cnt[(id)]; \
+ if ((e) >= CPS_ERR_NUM) ++_stat->err[0];\
+ else ++_stat->err[(e)]; \
+} while(0)
+
+enum
+{
+ CPS_REC_INIT,
+
+ CPS_REC_CONN,
+ CPS_REC_CONN_TIME,
+
+ CPS_REC_RECV,
+ CPS_REC_RECV_TIME,
+
+ CPS_REC_SEND,
+ CPS_REC_SEND_TIME,
+
+ CPS_REC_FAIL,
+
+ CPS_REC_NUM
+};
+
+#define CPS_REC_INC(thread, id) (++cps.curr[(thread)->index].rec[(id)])
+
+#define CPS_REC_TIMED_INC(thread, id, last) do { \
+ struct timespec _time; \
+ struct cps_stat *_stat = cps.curr + (thread)->index; \
+ LB_TIME(_time); \
+ _stat->rec[(id)]++; \
+ _stat->rec[(id) + 1] += LB_SUB_NS(_time, (last)); \
+ (last) = _time; \
+} while (0)
+
+struct cps_conn
+{
+ union
+ {
+ int size;
+ int sid;
+ };
+ int next;
+ int *prev;
+ struct timespec last;
+
+ struct timespec create_time;
+};
+
+struct cps_stat
+{
+ uint64_t cnt[CPS_CNT_NUM];
+ uint64_t rec[CPS_REC_NUM];
+ uint64_t err[CPS_ERR_NUM];
+};
+
+struct cps_thread
+{
+ int epfd;
+ int index;
+ int core;
+ int loop;
+
+ int server;
+ int conn;
+ int conn_num;
+
+ int server_num;
+ int client_num;
+ int c_addr_num;
+
+ uint64_t rate;
+ pthread_t tid;
+
+ struct sockaddr_in s_addr[CPS_SERVER_MAX];
+ struct inaddrs c_addr[CPS_CLIENT_MAX];
+ struct epoll_event event[CPS_EVNUM_MAX];
+};
+
+struct cps_var
+{
+ int run_state;
+ int CPU_NUM;
+
+ int verbose;
+ int more;
+ int client;
+ int evnum;
+
+ int req_len;
+ int res_len;
+
+ int test_time;
+ int interval;
+
+ int active_thread;
+ int thread_num;
+ int server_num;
+ int client_num;
+
+ uint64_t rate;
+
+ struct cps_stat *curr;
+ struct cps_stat *next;
+
+ struct cps_conn *conn;
+ struct cps_thread *thread[CPS_THREAD_MAX];
+
+ struct cps_stat records[2][CPS_THREAD_MAX];
+};
+
+extern struct cps_var cps;
+
+inline static struct cps_conn *
+CPS_CONN (int fd)
+{
+ return cps.conn + fd;
+}
+
+inline static void
+cps_add_server (struct cps_thread *thread, int fd, int sid)
+{
+ struct cps_conn *conn = CPS_CONN (fd);
+
+ conn->sid = sid;
+ conn->next = thread->server;
+ conn->prev = &thread->server;
+ LB_TIME (conn->last);
+ if (thread->server >= 0)
+ CPS_CONN (thread->server)->prev = &conn->next;
+ thread->server = fd;
+}
+
+inline static void
+cps_add_conn (struct cps_thread *thread, int fd, int size,
+ struct timespec *begin)
+{
+ struct cps_conn *conn = CPS_CONN (fd);
+
+ conn->size = size;
+ conn->last = *begin;
+ conn->next = thread->conn;
+ conn->prev = &thread->conn;
+ if (thread->conn >= 0)
+ CPS_CONN (thread->conn)->prev = &conn->next;
+ thread->conn = fd;
+
+ if (++thread->conn_num > CPS_CNT_ITEM (thread, CPS_CNT_CONN_MAX))
+ CPS_CNT_ITEM (thread, CPS_CNT_CONN_MAX) = thread->conn_num;
+ CPS_REC_TIMED_INC (thread, CPS_REC_CONN, conn->last);
+}
+
+inline static void
+cps_rem_conn (struct cps_thread *thread, int fd, struct cps_conn *conn)
+{
+ --thread->conn_num;
+
+ *conn->prev = conn->next;
+ if (conn->next >= 0)
+ CPS_CONN (conn->next)->prev = conn->prev;
+}
+
+inline static struct cps_thread *
+cps_new_thread ()
+{
+ struct cps_thread *thread = calloc (1, sizeof (struct cps_thread));
+
+ if (thread)
+ {
+ thread->index = cps.thread_num;
+ thread->epfd = -1;
+ thread->core = -1;
+ thread->conn = -1;
+ thread->server = -1;
+
+ cps.thread[cps.thread_num++] = thread;
+ }
+
+ return thread;
+}
+
+#endif /* #ifndef _CPS_H_ */
diff --git a/thirdparty/apps/testapp/cps/cps_c.c b/thirdparty/apps/testapp/cps/cps_c.c
new file mode 100644
index 0000000..dfcc341
--- /dev/null
+++ b/thirdparty/apps/testapp/cps/cps_c.c
@@ -0,0 +1,394 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "cps.h"
+
+struct cps_frag
+{
+ struct timespec begin;
+ uint64_t count;
+};
+
+struct cps_run
+{
+ struct cps_frag frag[CPS_FRAG_NUM];
+ uint64_t total;
+ uint32_t fid;
+ int sid, ci, ii;
+ struct sockaddr_in addr;
+};
+
+inline static void
+cps_c_next (struct cps_thread *thread, struct cps_run *run)
+{
+ if (thread->loop == CPS_LOOP_CF)
+ {
+ if (++run->ii >= thread->c_addr[run->ci].ip_num)
+ {
+ run->ii = 0;
+ if (++run->ci == thread->client_num)
+ {
+ run->ci = 0;
+ if (++run->sid >= thread->server_num)
+ run->sid = 0;
+ }
+ }
+ run->addr.sin_addr.s_addr =
+ htonl (thread->c_addr[run->ci].ip + run->ii);
+ }
+ else if (thread->loop == CPS_LOOP_SF)
+ {
+ if (++run->sid >= thread->server_num)
+ {
+ run->sid = 0;
+ if (++run->ii >= thread->c_addr[run->ci].ip_num)
+ {
+ run->ii = 0;
+ if (++run->ci >= thread->client_num)
+ run->ci = 0;
+ }
+ run->addr.sin_addr.s_addr =
+ htonl (thread->c_addr[run->ci].ip + run->ii);
+ }
+ }
+ else
+ {
+ if (++run->sid == thread->server_num)
+ run->sid = 0;
+ if (++run->ii >= thread->c_addr[run->ci].ip_num)
+ {
+ run->ii = 0;
+ if (++run->ci == thread->client_num)
+ run->ci = 0;
+ }
+ run->addr.sin_addr.s_addr =
+ htonl (thread->c_addr[run->ci].ip + run->ii);
+ }
+}
+
+inline static int
+cps_c_trigger (struct cps_thread *thread, struct cps_run *run)
+{
+ uint64_t nsec, num;
+ struct timespec now;
+ struct cps_frag *frag = &run->frag[run->fid % CPS_FRAG_NUM];
+ struct cps_frag *from = &run->frag[(run->fid + 1) % CPS_FRAG_NUM];
+
+ LB_TIME (now);
+
+ if (LB_CMP_NS (now, frag->begin, CPS_FRAG_NS))
+ {
+ /* move to next fragment */
+ frag = from;
+ from = &run->frag[++run->fid % CPS_FRAG_NUM];
+ run->total -= frag->count;
+ frag->count = 0;
+ frag->begin = now;
+ }
+
+ nsec = LB_SUB_NS (now, from->begin);
+ num = thread->rate * nsec / NSOFS;
+ if (num >= run->total)
+ {
+ run->total++;
+ frag->count++;
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+cps_c_create (struct cps_thread *thread, struct cps_run *run)
+{
+ int fd, ret;
+ struct timespec begin;
+ struct epoll_event event;
+
+ CPS_REC_INC (thread, CPS_REC_INIT);
+ LB_TIME (begin);
+
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (fd < 0)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_SOCKET_ERR, errno);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ DBG ("->socket(...)=%d:%d\n", fd, errno);
+ return -1;
+ }
+
+ if (fd >= CPS_MAX_FD)
+ {
+ CPS_CNT_INC (thread, CPS_CNT_GTFD);
+ goto ERR;
+ }
+
+ ret = set_reuseaddr (fd, 1);
+ if (ret)
+ CPS_CNT_INC_E (thread, CPS_CNT_REUSEADDR_ERR, errno);
+
+ ret = _bind (fd, (struct sockaddr *) &run->addr, sizeof (run->addr));
+ if (ret)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_BIND_ERR, errno);
+ DBG ("->bind(%d, %s)=%d:%d\n", fd, f_inaddr (&run->addr), ret, errno);
+ goto ERR;
+ }
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ CPS_CNT_INC_E (thread, CPS_CNT_NODELAY_ERR, errno);
+
+ ret = set_nonblock (fd);
+ if (ret)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_NONBLOCK_ERR, errno);
+ goto ERR;
+ }
+
+ ret =
+ _connect (fd, (struct sockaddr *) &thread->s_addr[run->sid],
+ sizeof (thread->s_addr[run->sid]));
+ if (ret)
+ {
+ const int e = errno;
+ if (e != EINPROGRESS)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_CONNECT_ERR, e);
+ DBG ("->connect(%d, %s)=%d:%d\n", fd,
+ f_inaddr (&thread->s_addr[run->sid]), ret, errno);
+ goto ERR;
+ }
+ }
+
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd);
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event);
+ if (ret)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno);
+ DBG ("->epoll_ctl(%d, ADD, %d)=%d:%d\n", thread->epfd, fd, ret, errno);
+ goto ERR;
+ }
+
+ cps_add_conn (thread, fd, 0, &begin);
+
+ return 0;
+
+ERR:
+ _close (fd);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ return -1;
+}
+
+int
+cps_c_io (struct cps_thread *thread, int fd, uint32_t events)
+{
+ int ret;
+ static char buf[CPS_DATA_MAX];
+
+// struct cps_server *server = &thread->server[sid];
+ struct cps_conn *conn = CPS_CONN (fd);
+
+ if (events & EPOLLERR)
+ {
+ CPS_CNT_INC (thread, CPS_CNT_ERR_EVENT);
+ DBG ("(%d, %d, %x) EPOLLERR\n", thread->index, fd, events);
+ goto ERR;
+ }
+
+ if (conn->size >= 0)
+ {
+ if (0 == (events & EPOLLOUT))
+ return 0;
+
+ while (1)
+ {
+ if (cps.run_state <= CPS_INIT)
+ return -1;
+
+ ret = _send (fd, buf, cps.req_len - conn->size, 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= cps.req_len)
+ {
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd);
+ conn->size = -cps.res_len;
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_MOD, fd, &event);
+ if (ret)
+ {
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno);
+ DBG ("->epoll_ctl(%d, MOD, %d)=%d:%d\n", thread->epfd,
+ fd, ret, errno);
+ goto ERR;
+ }
+ CPS_REC_TIMED_INC (thread, CPS_REC_SEND, conn->last);
+ break;
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return 0;
+ if (e == EINTR)
+ continue;
+ CPS_CNT_INC_E (thread, CPS_CNT_SEND_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_SEND_ERR);
+ }
+ DBG ("->send(%d,, %d)=%d:%d\n", fd, cps.req_len - conn->size,
+ ret, errno);
+ goto ERR;
+ }
+ }
+ }
+
+ if (0 == (events & EPOLLIN))
+ return 0;
+
+ while (cps.run_state > CPS_INIT)
+ {
+ ret = _recv (fd, buf, -conn->size, 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= 0)
+ {
+ /* receive success */
+ _close (fd);
+ CPS_REC_TIMED_INC (thread, CPS_REC_RECV, conn->last);
+ cps_rem_conn (thread, fd, conn);
+ return 0;
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return 0; /*wait event */
+ if (e == EINTR) /* The receive was interrupted by delivery of a signal... */
+ continue; /*recv again */
+ CPS_CNT_INC_E (thread, CPS_CNT_RECV_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_RECV_ERR);
+ }
+ DBG ("->recv(%d,, %d)=%d:%d\n", fd, -conn->size, ret, errno);
+ goto ERR; /* ret == 0 and not block meaning error */
+ }
+ }
+
+ DBG ("(%d, %d) cannot run there\n", thread->index, fd);
+
+ERR:
+ _close (fd);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ cps_rem_conn (thread, fd, conn);
+ return -1;
+}
+
+void *
+cps_c_thread (void *arg)
+{
+ int i, num = 0;
+ struct cps_run run = { 0 };
+ struct cps_thread *thread = (struct cps_thread *) arg;
+ struct epoll_event *event = thread->event;
+
+ run.addr.sin_family = AF_INET;
+ run.addr.sin_port = htons (0);
+
+ out
+ ("[%d] initialize thread %ld client:%d server:%d rate:%lu core:%d epfd:%d\n",
+ thread->index, pthread_self (), thread->client_num, thread->server_num,
+ thread->rate, thread->core, thread->epfd);
+
+ futex_wait (&cps.run_state, CPS_INIT);
+
+ LB_TIME (run.frag[0].begin);
+ for (i = 1; i < CPS_FRAG_NUM; ++i)
+ run.frag[i].begin = run.frag[0].begin;
+
+ while (1)
+ {
+ /* open 1 connect */
+ if (cps.run_state == CPS_RUNNING)
+ {
+ if (cps_c_trigger (thread, &run))
+ {
+ cps_c_next (thread, &run);
+ cps_c_create (thread, &run);
+ }
+ }
+ else if (cps.run_state == CPS_CLOSING)
+ {
+ if (thread->conn_num <= 0)
+ break;
+ }
+ else
+ {
+ break;
+ }
+
+ /* process 1 event */
+ if (num > 0)
+ {
+ int fd = CPS_EV_FD (event->data.u64);
+ DBG ("epoll event:{sid:%d fd:%d e:%x}\n",
+ CPS_EV_SID (event->data.u64), fd, event->events);
+
+ if ((uint32_t) fd >= CPS_MAX_FD)
+ {
+ CPS_CNT_INC (thread, CPS_CNT_FD_ERR);
+ }
+ else
+ {
+ (void) cps_c_io (thread, fd, event->events);
+ }
+
+ num--;
+ event++;
+ }
+
+ /* wait events */
+ if (num <= 0)
+ {
+ event = thread->event;
+ num = _epoll_wait (thread->epfd, event, cps.evnum, 0); /* no wait */
+ if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR)
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, e);
+ }
+ }
+
+ }
+
+ __sync_fetch_and_sub (&cps.active_thread, 1);
+ return NULL;
+}
diff --git a/thirdparty/apps/testapp/cps/cps_s.c b/thirdparty/apps/testapp/cps/cps_s.c
new file mode 100644
index 0000000..57d41c7
--- /dev/null
+++ b/thirdparty/apps/testapp/cps/cps_s.c
@@ -0,0 +1,320 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "cps.h"
+
+inline static void
+cps_s_close (struct cps_thread *thread)
+{
+ int fd;
+ struct epoll_event ev = { 0 };
+
+ for (fd = thread->server; fd >= 0; fd = CPS_CONN (fd)->next)
+ {
+ (void) _epoll_ctl (thread->epfd, EPOLL_CTL_DEL, fd, &ev);
+ _close (fd);
+ }
+
+ thread->server = -1;
+}
+
+int
+cps_s_listen (struct cps_thread *thread)
+{
+ int i;
+
+ for (i = 0; i < thread->server_num; ++i)
+ {
+ int fd, ret;
+ struct timespec dummy;
+ struct epoll_event event;
+ struct sockaddr_in *server = &thread->s_addr[i];
+
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno);
+
+ ret = _bind (fd, (struct sockaddr *) server, sizeof (*server));
+ ERR_RETURN (ret, -1, "bind(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = set_nonblock (fd);
+ ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = _listen (fd, SOMAXCONN);
+ ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = CPS_EV_DATA (i, fd);
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event);
+ ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret,
+ errno);
+
+ out ("[%d.%d:%d] listen on %s\n", thread->index, i, fd,
+ f_inaddr (server));
+
+ cps_add_server (thread, fd, i);
+ }
+
+ return 0;
+}
+
+int
+cps_s_accept (struct cps_thread *thread, int server_fd)
+{
+ while (cps.run_state == CPS_RUNNING)
+ {
+ int fd, ret;
+ struct timespec begin;
+ struct epoll_event event;
+#if defined(DEBUG)
+ struct sockaddr_in addr;
+ socklen_t len = sizeof (addr);
+#endif
+
+ LB_TIME (begin);
+
+#if defined(DEBUG) && 0
+ fd =
+ _accept4 (server_fd, (struct sockaddr *) &addr, &len, SOCK_NONBLOCK);
+#else
+ fd = _accept4 (server_fd, NULL, NULL, SOCK_NONBLOCK);
+#endif
+ if (fd < 0)
+ {
+ int e = errno;
+ if (e == EAGAIN || e == EWOULDBLOCK)
+ return 0;
+ DBG ("->accept4(%d)=%d:%d\n", server_fd, fd, e);
+ CPS_CNT_INC_E (thread, CPS_CNT_ACCEPT_ERR, errno);
+ return -1;
+ }
+
+ CPS_REC_INC (thread, CPS_REC_INIT);
+// DBG("(%d, %d) -> accepted(%d) %d: %s", thread->index, sid, server->fd, fd, f_inaddr(&addr));
+
+ if (fd >= CPS_MAX_FD)
+ {
+ _close (fd);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ continue;
+ }
+
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ CPS_CNT_INC_E (thread, CPS_CNT_NODELAY_ERR, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd);
+ ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event);
+ if (ret)
+ {
+ _close (fd);
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ DBG ("epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret, errno);
+ continue;
+ }
+
+ cps_add_conn (thread, fd, -cps.req_len, &begin);
+ }
+
+ return 0;
+}
+
+int
+cps_s_io (struct cps_thread *thread, int fd, uint32_t events)
+{
+ static char buf[CPS_DATA_MAX];
+
+ int ret;
+ struct cps_conn *conn = CPS_CONN (fd);
+
+ DBG ("(%d, %d, %x) conn:{size:%d next:%d prev:%p:%d}\n",
+ thread->index, fd, events, conn->size, conn->next, conn->prev,
+ *conn->prev);
+
+ if (events & EPOLLERR)
+ {
+ CPS_CNT_INC (thread, CPS_CNT_ERR_EVENT);
+ DBG ("(%d, %d, 0x%x)\n", thread->index, fd, events);
+ goto ERR;
+ }
+
+ if (0 == (events & EPOLLIN))
+ return 0;
+
+ while (1)
+ {
+ if (cps.run_state <= CPS_INIT)
+ goto ERR;
+
+ ret = _recv (fd, buf, sizeof (buf), 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= 0)
+ {
+ CPS_REC_TIMED_INC (thread, CPS_REC_RECV, conn->last);
+ break; /* receive success */
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return 0;
+ if (e == EINTR)
+ continue;
+ CPS_CNT_INC_E (thread, CPS_CNT_RECV_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_RECV_ERR);
+ }
+ DBG ("->recv(%d,, %ld)=%d:%d\n", fd, sizeof (buf), ret, errno);
+ goto ERR;
+ }
+ }
+
+ conn->size = 0;
+
+ while (cps.run_state > CPS_INIT)
+ {
+ ret = _send (fd, buf, cps.res_len - conn->size, 0);
+ if (ret > 0)
+ {
+ conn->size += ret;
+ if (conn->size >= cps.res_len)
+ {
+ _close (fd);
+ CPS_REC_TIMED_INC (thread, CPS_REC_SEND, conn->last);
+ cps_rem_conn (thread, fd, conn);
+ return 0;
+ }
+ }
+ else
+ {
+ if (ret < 0)
+ {
+ const int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN || e == EINTR)
+ continue;
+ CPS_CNT_INC_E (thread, CPS_CNT_SEND_ERR, e);
+ }
+ else
+ {
+ CPS_CNT_INC (thread, CPS_CNT_SEND_ERR);
+ }
+ DBG ("->send(%d,, %d)=%d:%d\n", fd, cps.res_len - conn->size, ret,
+ errno);
+ goto ERR;
+ }
+ }
+
+ return 0;
+
+ERR:
+ _close (fd);
+ cps_rem_conn (thread, fd, conn);
+ CPS_REC_INC (thread, CPS_REC_FAIL);
+ return -1;
+}
+
+void *
+cps_s_thread (void *arg)
+{
+ int num = 0;
+ struct cps_thread *thread = (struct cps_thread *) arg;
+ struct epoll_event *event = thread->event;
+
+ out ("[%d] initialize thread %ld server:%d core:%d epfd:%d\n",
+ thread->index, pthread_self (), thread->server_num, thread->core,
+ thread->epfd);
+
+ if (cps_s_listen (thread))
+ {
+ cps.run_state = CPS_ERROR;
+ return NULL;
+ }
+
+ futex_wait (&cps.run_state, CPS_INIT);
+
+ while (1)
+ {
+
+ if (num > 0)
+ {
+ int sid = CPS_EV_SID (event->data.u64);
+ int fd = CPS_EV_FD (event->data.u64);
+ DBG ("epoll evnet{sid:%d fd:%d event:%x}\n", sid, fd,
+ event->events);
+ if (sid >= 0)
+ {
+ if (event->events & EPOLLIN)
+ {
+ (void) cps_s_accept (thread, fd);
+ }
+ if (event->events & EPOLLERR)
+ {
+ wrn ("Error event for server %d\n", fd);
+ }
+ }
+ else
+ {
+ if (fd >= CPS_MAX_FD)
+ {
+ err ("Error connection index %d\n", fd);
+ }
+ else
+ {
+ (void) cps_s_io (thread, fd, event->events);
+ }
+ }
+
+ num--;
+ event++;
+ }
+
+ if (num <= 0)
+ {
+ if (cps.run_state == CPS_CLOSING)
+ {
+ if (thread->server >= 0)
+ cps_s_close (thread);
+ if (thread->conn_num <= 0)
+ break;
+ }
+ else if (cps.run_state <= CPS_INIT)
+ {
+ break;
+ }
+
+ event = thread->event;
+ num = _epoll_wait (thread->epfd, event, cps.evnum, CPS_EPWAIT_MS);
+ if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, e);
+ }
+ }
+ }
+
+ __sync_fetch_and_sub (&cps.active_thread, 1);
+ return NULL;
+}