diff options
Diffstat (limited to 'thirdparty/apps/testapp/cps')
-rw-r--r-- | thirdparty/apps/testapp/cps/cps.c | 833 | ||||
-rw-r--r-- | thirdparty/apps/testapp/cps/cps.h | 283 | ||||
-rw-r--r-- | thirdparty/apps/testapp/cps/cps_c.c | 394 | ||||
-rw-r--r-- | thirdparty/apps/testapp/cps/cps_s.c | 320 |
4 files changed, 1830 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/cps/cps.c b/thirdparty/apps/testapp/cps/cps.c new file mode 100644 index 0000000..377d0de --- /dev/null +++ b/thirdparty/apps/testapp/cps/cps.c @@ -0,0 +1,833 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "cps.h" + +void *cps_s_thread (void *arg); +void *cps_c_thread (void *arg); + +static const char *const cps_stat_name[CPS_CNT_NUM] = { + [CPS_CNT_CONN_MAX] = "max", + [CPS_CNT_CONN_NUM] = "conn", + + [CPS_CNT_GTFD] = "GREATER-THAN-MAX-FD", + [CPS_CNT_SOCKET_ERR] = "socket-err", + [CPS_CNT_BIND_ERR] = "bind-err", + [CPS_CNT_CONNECT_ERR] = "connect-err", + [CPS_CNT_REUSEADDR_ERR] = "reuseaddr-err", + [CPS_CNT_NODELAY_ERR] = "nodelay-err", + [CPS_CNT_NONBLOCK_ERR] = "nonblock-err", + [CPS_CNT_ACCEPT_ERR] = "accept-err", + [CPS_CNT_SEND_ERR] = "send-err", + [CPS_CNT_RECV_ERR] = "recv-err", + [CPS_CNT_EPOLL_ERR] = "epoll-err", + [CPS_CNT_ERR_EVENT] = "err-event", + [CPS_CNT_FD_ERR] = "cid-err", +}; + +struct cps_var cps = { 0 }; + +void +cps_title () +{ + out ("------------------------------------------------------------\n"); + out (" cps test %s\n", cps.client ? "client" : "server"); + out + (" total server: %d total thread: %d report interval: %ds request:%d response: %d CPU NUM: %d\n", + cps.server_num, cps.thread_num, cps.interval, cps.req_len, cps.res_len, + cps.CPU_NUM); + if (cps.client) + { + out (" total client: %d test time: %ds defalut rate:%lu\n", + cps.client_num, cps.test_time, cps.rate); + } + else + { + } + out ("------------------------------------------------------------\n"); +} + +inline static char * +cps_tip (char *pos, char tip) +{ + *pos++ = '|'; + *pos++ = tip; + *pos++ = ':'; + *pos++ = ' '; + return pos; +} + +inline static char * +cps_fmt (char *pos, uint64_t val, int *size) +{ + int s = r_uint (pos, val, *size); + pos += s; + *pos++ = ' '; + if (s > *size) + *size = s; + return pos; +} + +inline static char * +cps_format (char *pos, char tip, uint64_t val, uint64_t nsec, int *size) +{ + pos = cps_tip (pos, tip); + + if (cps.more) + pos = cps_fmt (pos, val, size); + + pos = cps_fmt (pos, lb_gdiv (val, nsec), size + 1); + + return pos; +} + +void +cps_output (int index, struct cps_stat *stat, uint64_t nsec) +{ + struct fmtsize + { + int s_init[2]; + int s_conn[3]; + int s_recv[3]; + int s_send[3]; + int s_fail[2]; + }; + static struct fmtsize size = { 0 }; + static int space_line = 0; + static char buf[512]; + + char *pos = buf; + int i, cnt_num = 0; + + if (!stat->rec[CPS_REC_INIT] && !stat->rec[CPS_REC_CONN] && + !stat->rec[CPS_REC_RECV] && !stat->rec[CPS_REC_SEND] + && !stat->rec[CPS_REC_FAIL]) + { + for (i = 0; i < CPS_CNT_NUM; ++i) + { + if (stat->cnt[i]) + break; + } + if (i >= CPS_CNT_NUM) + { + if (index < 0 && space_line++ == 0) + { + out ("\n"); + memset (&size, 0, sizeof (size)); + } + return; + } + } + + space_line = 0; + + if (index < 0) + pos += sprintf (pos, " sum "); + else + pos += sprintf (pos, " %3d ", index); + + pos = + cps_format (pos, (cps.client ? 'C' : 'A'), stat->rec[CPS_REC_INIT], nsec, + size.s_init); + pos = cps_format (pos, 'E', stat->rec[CPS_REC_CONN], nsec, size.s_conn); + if (cps.client) + { + pos = cps_format (pos, 'S', stat->rec[CPS_REC_SEND], nsec, size.s_send); + pos = cps_format (pos, 'R', stat->rec[CPS_REC_RECV], nsec, size.s_recv); + } + else + { + pos = cps_format (pos, 'R', stat->rec[CPS_REC_RECV], nsec, size.s_recv); + pos = cps_format (pos, 'S', stat->rec[CPS_REC_SEND], nsec, size.s_send); + } + pos = cps_format (pos, 'F', stat->rec[CPS_REC_FAIL], nsec, size.s_fail); + + pos = cps_tip (pos, 'T'); + pos = + cps_fmt (pos, + lb_sdiv (stat->rec[CPS_REC_CONN_TIME], stat->rec[CPS_REC_CONN]), + &size.s_conn[2]); + if (cps.client) + { + pos = + cps_fmt (pos, + lb_sdiv (stat->rec[CPS_REC_SEND_TIME], + stat->rec[CPS_REC_SEND]), &size.s_send[2]); + pos = + cps_fmt (pos, + lb_sdiv (stat->rec[CPS_REC_RECV_TIME], + stat->rec[CPS_REC_RECV]), &size.s_recv[2]); + } + else + { + pos = + cps_fmt (pos, + lb_sdiv (stat->rec[CPS_REC_RECV_TIME], + stat->rec[CPS_REC_RECV]), &size.s_recv[2]); + pos = + cps_fmt (pos, + lb_sdiv (stat->rec[CPS_REC_SEND_TIME], + stat->rec[CPS_REC_SEND]), &size.s_send[2]); + } + + *pos++ = '|'; + *pos = 0; + + out ("%s", buf); + + for (i = 0; i < CPS_CNT_NUM; ++i) + { + if (stat->cnt[i]) + { + if (cnt_num++ == 0) + out (" { %s:%s", cps_stat_name[i], f_uint (stat->cnt[i])); + else + out (" %s:%s", cps_stat_name[i], f_uint (stat->cnt[i])); + } + } + + if (cnt_num) + out (" }\n"); + else + out ("\n"); + + for (i = 1; i < CPS_ERR_NUM; ++i) + { + if (stat->err[i]) + out ("<E%d:%s> %s\n", i, f_uint (stat->err[i]), strerror (i)); + } + if (stat->err[0]) + out ("<E-:%s> Other error\n", f_uint (stat->err[0])); +} + +void +cps_close () +{ + cps.run_state = CPS_CLOSING; +} + +void +cps_timer (uint64_t nsec) +{ + const static struct timespec delay = {.tv_sec = 0,.tv_nsec = + CPS_DELAY_MS * 1000 * 1000 + }; + + int i, j; + struct cps_stat sum = { 0 }; + struct cps_stat *curr = cps.curr; + + cps.curr = cps.next; + cps.next = curr; + + /*wait for cps.curr use */ + (void) nanosleep (&delay, NULL); + + for (i = 0; i < cps.thread_num; ++i, ++curr) + { + struct cps_thread *thread = cps.thread[i]; + + curr->cnt[CPS_CNT_CONN_NUM] = thread->conn_num; + if (cps.verbose) + cps_output (thread->index, curr, nsec); + + for (j = 0; j < CPS_REC_NUM; ++j) + { + sum.rec[j] += curr->rec[j]; + curr->rec[j] = 0; + } + + for (j = 0; j < CPS_CNT_NUM; ++j) + { + sum.cnt[j] += curr->cnt[j]; + curr->cnt[j] = 0; + } + + for (j = 0; j < CPS_ERR_NUM; ++j) + { + sum.err[j] += curr->err[j]; + curr->err[j] = 0; + } + } + + cps_output (-1, &sum, nsec); +} + +int +cps_loop () +{ + const static struct timespec timeout = {.tv_sec = 0,.tv_nsec = + CPS_TIMER_MS * 1000 * 1000 + }; + + struct timespec begin, from; + time_t next_time = cps.interval; + + LB_TIME (begin); + from = begin; + + while (cps.run_state == CPS_RUNNING) + { + struct timespec now; + + (void) nanosleep (&timeout, NULL); + + LB_TIME (now); + + if (cps.client) + { + if (LB_CMP_S (now, begin, cps.test_time)) + cps_close (); + } + + if (!LB_CMP_S (now, begin, next_time)) + continue; + + cps_timer (LB_SUB_NS (now, from)); + + from = now; + next_time += cps.interval; + } + + while (cps.run_state == CPS_CLOSING && cps.active_thread) + { + (void) nanosleep (&timeout, NULL); + } + + return 0; +} + +int +cps_start () +{ + int i; + void *(*proc) (void *); + const char *name; + + cps.conn = + (struct cps_conn *) malloc (sizeof (struct cps_conn) * CPS_MAX_FD); + ERR_RETURN (!cps.conn, -1, "Out of memory\n"); + + if (cps.thread_num <= 0) + { + struct cps_thread *thread = cps_new_thread (); + ERR_RETURN (!thread, -1, "Out of memory\n"); + + cps.server_num = 1; + thread->server_num = 1; + thread->s_addr[0].sin_family = AF_INET; + thread->s_addr[0].sin_port = htons (CPS_PORT_DEF); + if (cps.client) + { + cps.client_num = 1; + thread->client_num = 1; + thread->c_addr_num = 1; + thread->s_addr[0].sin_addr.s_addr = htonl (0x7F000001); + } + else + { + thread->s_addr[0].sin_addr.s_addr = INADDR_ANY; + } + } + else if (cps.client) + { + for (i = 0; i < cps.thread_num; ++i) + { + if (cps.thread[i]->client_num) + continue; + cps.thread[i]->client_num = 1; + cps.thread[i]->c_addr[0].ip = INADDR_ANY; + cps.thread[i]->c_addr[0].ip_num = 1; + cps.client_num++; + } + } + + if (cps.req_len <= 0) + cps.req_len = CPS_REQ_DEF; + if (cps.res_len <= 0) + cps.res_len = CPS_RES_DEF; + if (cps.evnum <= 0) + cps.evnum = CPS_EVNUM_DEF; + if (cps.interval <= 0) + cps.interval = CPS_INTERVAL_DEF; + if (cps.test_time <= 0) + cps.test_time = CPS_TIME_DEF; + if (cps.rate == 0) + cps.rate = CPS_RATE_DEF; + + cps.curr = cps.records[0]; + cps.next = cps.records[1]; + + if (cps.client) + { + proc = cps_c_thread; + name = "client"; + } + else + { + proc = cps_s_thread; + name = "server"; + } + + cps_title (); + + for (i = 0; i < cps.thread_num; ++i) + { + if (cps.thread[i]->rate == 0) + cps.thread[i]->rate = cps.rate; + + cps.thread[i]->epfd = _epoll_create (CPS_EPSIZE); + ERR_RETURN (cps.thread[i]->epfd < 0, -1, "epoll_create(%d)=%d:%d\n", + CPS_EPSIZE, cps.thread[i]->epfd, errno); + + cps.thread[i]->tid = + lb_thread (proc, cps.thread[i], "cps-%s-%d", name, i); + ERR_RETURN (cps.thread[i]->tid == 0, -1, "Create thread %s-%d failed", + name, i); + + if (cps.thread[i]->core >= 0) + { + int ret = lb_setcpu (cps.thread[i]->tid, cps.thread[i]->core); + WRN (ret != 0, "Bind core error thread:%d\n", i); + } + + __sync_fetch_and_add (&cps.active_thread, 1); + } + + cps.run_state = CPS_RUNNING; + futex_wake (&cps.run_state, cps.thread_num); + + return 0; +} + +void +cps_exit () +{ + int i; + + cps.run_state = CPS_EXIT; + + for (i = 0; i < cps.thread_num; ++i) + { + int fd; + struct cps_thread *thread = cps.thread[i]; + + if (!thread) + continue; + + if (thread->tid) + pthread_join (thread->tid, NULL); + + if (thread->epfd >= 0) + _close (thread->epfd); + + for (fd = thread->server; fd >= 0; fd = CPS_CONN (fd)->next) + _close (fd); + + for (fd = thread->conn; fd >= 0; fd = CPS_CONN (fd)->next) + _close (fd); + + cps.thread[i] = NULL; + free (thread); + } + + if (cps.conn) + free (cps.conn); +} + +void +cps_break (int s) +{ + DBG (" SIGNALED %d running:%d\n", s, cps.run_state); + out ("\n"); + + if (cps.run_state == CPS_INIT || cps.run_state == CPS_RUNNING) + cps_close (); + else if (cps.run_state != CPS_EXIT) + cps_exit (); + else + exit (1); +} + +void +cps_sigpipe (int s) +{ + DBG ("SIGPIPE\n"); +} + +int +cps_init () +{ + struct sigaction s = { 0 }; + + (void) sigemptyset (&s.sa_mask); + + s.sa_flags = SA_NODEFER; + s.sa_handler = (void *) cps_break; + (void) sigaction (SIGINT, &s, NULL); + (void) sigaction (SIGQUIT, &s, NULL); + + s.sa_handler = cps_sigpipe; + (void) sigaction (SIGPIPE, &s, NULL); + +// lb_sigsegv_setup(); + + cps.CPU_NUM = get_nprocs (); + + if (cps.CPU_NUM <= 0) + cps.CPU_NUM = 1; + + return 0; +} + +#ifndef EXEC_CPS_C_ +#define EXEC_CPS_C_ + +#define CPS_OPTIONS "d:e:T:t:r:ci:" DBGOPT "mvh" + +static const struct option cps_options[] = { + {"data", 1, 0, 'd'}, + {"interval", 1, 0, 'i'}, + {"evnum", 1, 0, 'e'}, + {"client", 0, 0, 'c'}, + {"thread", 1, 0, 't'}, + {"rate", 1, 0, 'r'}, + {"time", 1, 0, 'T'}, + DBGOPT_LONG {"more", 0, 0, 'm'}, + {"verbose", 0, 0, 'v'}, + {"help", 0, 0, 'h'}, + {0, 0, 0, 0} +}; + +enum +{ + CPSOPT_SERVER = 0, + CPSOPT_S, + CPSOPT_CLIENT, + CPSOPT_C, + CPSOPT_RATE, + CPSOPT_CORE, + CPSOPT_CF, + CPSOPT_SF, +}; + +char *const cps_tokens[] = { + [CPSOPT_SERVER] = "server", + [CPSOPT_S] = "s", + [CPSOPT_CLIENT] = "client", + [CPSOPT_C] = "c", + [CPSOPT_RATE] = "rate", + [CPSOPT_CORE] = "core", + [CPSOPT_CF] = "cf", + [CPSOPT_SF] = "sf", + NULL +}; + +void +cps_usage (const char *name) +{ + out ("USAGE: %s [OPTIONS] [SERVER-ADDRESS] # %s version\n", name, + VERSION_NAME); + out (" Options:\n"); + out + (" -i, --interval=# report time(default: %ds max:%ds)\n", + CPS_INTERVAL_DEF, CPS_INTERVAL_MAX); + out + (" -c, --client server address list for one thread\n"); + out + (" -e, --evnum epoll event number(default:%d max:%d)\n", + CPS_EVNUM_DEF, CPS_EVNUM_MAX); + out + (" -T, --time=# C test time(default: %ds max:%ds)\n", + CPS_TIME_DEF, CPS_TIME_MAX); + out + (" -d, --data=#[:#] C request and response data length(default:%d:%d max:%d)\n", + CPS_REQ_DEF, CPS_RES_DEF, CPS_DATA_MAX); + out + (" -r, --rate=#[k|m|w] C global connect rate per each thread(CPS, default: %d max:%d)\n", + CPS_RATE_DEF, CPS_RATE_MAX); + out + (" -t, --thread=CONFIG set one net and thread(max: %d)\n", + CPS_THREAD_MAX); + out + (" server=X.X.X.X:P server address set(max: %d)\n", + CPS_SERVER_MAX); + out + (" core=# bind to core\n"); + out + (" client=X.X.X.X C client ip address set(max: %d max ip: %d)\n", + CPS_CLIENT_IAS_MAX, CPS_CLIENT_MAX); + out + (" rate=# C set connect rate for this thread(default: use global set)\n"); + out + (" cf C client loop first(default: both)\n"); + out + (" sf C server loop first(default: both)\n"); +#ifdef DEBUG + out + (" -D, --debug show debug information\n"); +#endif + out + (" -m, --more show more statistics\n"); + out + (" -v, --verbose show thread statistics\n"); + out (" -h, --help help\n"); + out (" IMPORTANT:\n"); + out + (" socket() EMFILE(%d) error: ulimit -n 1048576\n", + EMFILE); + out + (" bind() EADDRINUSE(%d) error: echo 1 > /proc/sys/net/ipv4/tcp_tw_recycle\n", + EADDRINUSE); + out + (" connect() EADDRNOTAVAIL(%d) error: echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse\n", + EADDRNOTAVAIL); + out + (" echo \"3000 65534\" > /proc/sys/net/ipv4/ip_local_port_range\n"); +} + +inline static uint64_t +cps_p_rate (const char *arg) +{ + uint64_t rate = p_uint (arg, CPS_RATE_MAX, &arg); + + if (!arg) + return (uint64_t) - 1ul; + + switch (*arg) + { + case 'm': /* fall through */ + case 'M': + rate *= 100; /* fall through */ + case 'w': /* fall through */ + case 'W': + rate *= 10; /* fall through */ + case 'k': /* fall through */ + case 'K': + rate *= 1000; + arg++; + break; + } + + if (*arg) + return (uint64_t) - 1ul; + + return rate; +} + +int +cps_opts (char *opts) +{ + struct cps_thread *thread; + struct inaddrs *client; + struct sockaddr_in *server; + + ERR_RETURN (cps.thread_num >= CPS_THREAD_MAX, -1, + "Too many thread, max %d\n", CPS_THREAD_MAX); + + thread = cps_new_thread (); + ERR_RETURN (!thread, -1, "Out of memory\n"); + server = thread->s_addr; + client = thread->c_addr; + + while (*opts) + { + char *value; + const char *end; + int ret = getsubopt (&opts, cps_tokens, &value); + switch (ret) + { + case CPSOPT_SERVER: + case CPSOPT_S: + { + int i; + uint64_t num; + struct inaddrs addr; + + end = p_addr_set (value, &addr, PA_DEF_PORT | CPS_PORT_DEF); + ERR_RETURN (!end + || *end, -1, "Invalid server address '%s'.\n", value); + + num = (uint64_t) addr.ip_num * addr.port_num; + ERR_RETURN (num > CPS_SERVER_MAX - thread->server_num, -1, + "Too many server, max %d\n", CPS_SERVER_MAX); + + for (i = 0; i < addr.ip_num; ++i) + { + uint32_t ip = addr.ip + i; + uint16_t j; + for (j = 0; j < addr.port_num; ++j) + { + server->sin_family = AF_INET; + server->sin_addr.s_addr = htonl (ip); + server->sin_port = htons (addr.port + j); + server++; + } + } + thread->server_num += num; + break; + } + case CPSOPT_CLIENT: + case CPSOPT_C: + { + ERR_RETURN (thread->c_addr_num >= CPS_CLIENT_IAS_MAX, -1, + "Too many client set, max %d\n", CPS_CLIENT_IAS_MAX); + + end = p_addr_set (value, client, PA_NO_PORT); + ERR_RETURN (!end + || *end, -1, "Invalid client address '%s'.\n", value); + ERR_RETURN (client->ip_num > CPS_CLIENT_MAX - thread->client_num, + -1, "Too many client, max %d\n", CPS_CLIENT_MAX); + + client->port = 0; + client->port_num = 1; + thread->c_addr_num++; + thread->client_num += client->ip_num; + client++; + break; + } + case CPSOPT_RATE: + thread->rate = cps_p_rate (value); + ERR_RETURN (thread->rate > CPS_RATE_MAX, -1, + "Invalid thread rate '%s'\n", value); + break; + + case CPSOPT_CF: + thread->loop = CPS_LOOP_CF; + break; + case CPSOPT_SF: + thread->loop = CPS_LOOP_SF; + break; + + case CPSOPT_CORE: + thread->core = (int) p_int (value, cps.CPU_NUM - 1, &end); + ERR_RETURN (!end || *end + || thread->core <= 0, -1, "Invalid bind core '%s'.\n", + value); + break; + + default: + ERR_RETURN (1, -1, "Unknown thread option '%s'\n", value); + } + } + + ERR_RETURN (!thread->server_num, -1, "No server set for net %d\n", + thread->index); + + cps.server_num += thread->server_num; + cps.client_num += thread->client_num; + return 0; +} + +int +cps_args (int argc, char *argv[]) +{ + int opt, index; + + while (EOF != + (opt = getopt_long (argc, argv, CPS_OPTIONS, cps_options, &index))) + { + const char *end; + + switch (opt) + { + case 't': + if (cps_opts (optarg)) + return -1; + break; + + case 'c': + cps.client = 1; + break; + + case 'd': + cps.req_len = (int) p_int (optarg, CPS_DATA_MAX, &end); + ERR_RETURN (!end, -1, "Invalid data length '%s'\n", optarg); + if (*end == ':') + { + end++; + cps.res_len = (int) p_int (end, CPS_DATA_MAX, &end); + ERR_RETURN (!end, -1, "Invalid response data length '%s'\n", + optarg); + } + else + { + cps.res_len = cps.req_len; + } + ERR_RETURN (*end != 0, -1, "Invalid data length '%s'\n", optarg); + break; + + case 'i': + cps.interval = (int) p_int (optarg, CPS_INTERVAL_MAX, &end); + ERR_RETURN (!end || *end, -1, "Invalid interval '%s'\n", optarg); + break; + case 'e': + cps.evnum = (int) p_int (optarg, CPS_EVNUM_MAX, &end); + ERR_RETURN (!end + || *end, -1, "Invalid event number '%s'\n", optarg); + break; + case 'T': + cps.test_time = (int) p_int (optarg, CPS_TIME_MAX, &end); + ERR_RETURN (!end || *end, -1, "Invalid test time '%s'\n", optarg); + break; + case 'r': + cps.rate = cps_p_rate (optarg); + ERR_RETURN (cps.rate > CPS_RATE_MAX, -1, "Invalid rate '%s'\n", + optarg); + break; + case 'v': + cps.verbose = 1; + break; + case 'm': + cps.more = 1; + break; + +#ifdef DEBUG + case 'D': + enable_debug = 1; + break; +#endif + case 'h': + cps_usage (argv[0]); + exit (0); + case '?': + err ("Invalid arguments\n"); + return -1; + default: + err ("Unknown option '%c'.\n", opt); + return -1; + } + } + + return 0; +} + +#endif /* #ifndef EXEC_CPS_C_ */ + +int +main (int argc, char *argv[]) +{ + int ret; + + if (cps_init ()) + return 1; + + cps_args (argc, argv) || cps_start () || cps_loop (); + + cps_exit (); + return 0; +} diff --git a/thirdparty/apps/testapp/cps/cps.h b/thirdparty/apps/testapp/cps/cps.h new file mode 100644 index 0000000..9b949f3 --- /dev/null +++ b/thirdparty/apps/testapp/cps/cps.h @@ -0,0 +1,283 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _CPS_H_ +#define _CPS_H_ + +#define CPS_FRAG_NUM 64 +#define CPS_FRAG_NS (20 * 1000 * 1000) /* 20 ms */ +#define CPS_FRAG_LOOP (CPS_FRAG_NS * CPS_FRAG_NUM) /* 1.28 s */ + +#define CPS_TIMER_MS 100 +#define CPS_DELAY_MS 5 + +#define CPS_MAX_FD (16 * 1024 * 1024) /* 10M */ +#define CPS_CONN_MAX (256 * 1024) + +#define CPS_EPSIZE (1 * 1000) /* =1k */ +#define CPS_EPWAIT_MS 200 /* ms */ + +#define CPS_THREAD_MAX 128 +#define CPS_SERVER_MAX 32 +#define CPS_CLIENT_MAX 256 +#define CPS_CLIENT_IAS_MAX 32 + +#define CPS_PORT_DEF 58166 +#define CPS_EVNUM_DEF 256 +#define CPS_EVNUM_MAX 1024 +#define CPS_TIME_DEF 300 +#define CPS_TIME_MAX (60 * 60 * 24 * 7) /* 604800s = 1 week */ +#define CPS_RATE_DEF 10000 /* 1w */ +#define CPS_RATE_MAX (100 * 1000 * 1000) /* 100m */ +#define CPS_REQ_DEF 1 +#define CPS_RES_DEF 1 +#define CPS_DATA_MAX 4096 +#define CPS_INTERVAL_DEF 10 /* s */ +#define CPS_INTERVAL_MAX 3600 /* s */ + +#define CPS_ERR_NUM 256 + +#define CPS_CONN_SID (-1) +#define CPS_EV_DATA(sid, fd) (((uint64_t)(uint32_t)(sid) << 32) | (uint64_t)(uint32_t)(fd)) +#define CPS_EV_FD(u64) ((int)(uint32_t)(u64)) +#define CPS_EV_SID(u64) ((int)(uint32_t)((u64) >> 32)) + +enum +{ + CPS_LOOP_BOTH = 0, + CPS_LOOP_CF, + CPS_LOOP_SF, +}; + +enum +{ + CPS_ERROR = -2, + CPS_EXIT = -1, + + CPS_INIT = 0, + + CPS_RUNNING = 1, + CPS_CLOSING = 2, +}; + +enum +{ + CPS_CNT_CONN_MAX, + CPS_CNT_CONN_NUM, + + CPS_CNT_GTFD, + CPS_CNT_SOCKET_ERR, + CPS_CNT_BIND_ERR, + CPS_CNT_ACCEPT_ERR, + CPS_CNT_CONNECT_ERR, + CPS_CNT_REUSEADDR_ERR, + CPS_CNT_NODELAY_ERR, + CPS_CNT_NONBLOCK_ERR, + CPS_CNT_SEND_ERR, + CPS_CNT_RECV_ERR, + CPS_CNT_EPOLL_ERR, + CPS_CNT_ERR_EVENT, + CPS_CNT_FD_ERR, + + CPS_CNT_NUM +}; + +#define CPS_CNT_ITEM(thread, id) (cps.curr[(thread)->index].cnt[(id)]) + +#define CPS_CNT_INC(thread, id) (++CPS_CNT_ITEM((thread), (id))) +#define CPS_CNT_INC_E(thread, id, e) do { \ + struct cps_stat *_stat = cps.curr + (thread)->index; \ + ++_stat->cnt[(id)]; \ + if ((e) >= CPS_ERR_NUM) ++_stat->err[0];\ + else ++_stat->err[(e)]; \ +} while(0) + +enum +{ + CPS_REC_INIT, + + CPS_REC_CONN, + CPS_REC_CONN_TIME, + + CPS_REC_RECV, + CPS_REC_RECV_TIME, + + CPS_REC_SEND, + CPS_REC_SEND_TIME, + + CPS_REC_FAIL, + + CPS_REC_NUM +}; + +#define CPS_REC_INC(thread, id) (++cps.curr[(thread)->index].rec[(id)]) + +#define CPS_REC_TIMED_INC(thread, id, last) do { \ + struct timespec _time; \ + struct cps_stat *_stat = cps.curr + (thread)->index; \ + LB_TIME(_time); \ + _stat->rec[(id)]++; \ + _stat->rec[(id) + 1] += LB_SUB_NS(_time, (last)); \ + (last) = _time; \ +} while (0) + +struct cps_conn +{ + union + { + int size; + int sid; + }; + int next; + int *prev; + struct timespec last; + + struct timespec create_time; +}; + +struct cps_stat +{ + uint64_t cnt[CPS_CNT_NUM]; + uint64_t rec[CPS_REC_NUM]; + uint64_t err[CPS_ERR_NUM]; +}; + +struct cps_thread +{ + int epfd; + int index; + int core; + int loop; + + int server; + int conn; + int conn_num; + + int server_num; + int client_num; + int c_addr_num; + + uint64_t rate; + pthread_t tid; + + struct sockaddr_in s_addr[CPS_SERVER_MAX]; + struct inaddrs c_addr[CPS_CLIENT_MAX]; + struct epoll_event event[CPS_EVNUM_MAX]; +}; + +struct cps_var +{ + int run_state; + int CPU_NUM; + + int verbose; + int more; + int client; + int evnum; + + int req_len; + int res_len; + + int test_time; + int interval; + + int active_thread; + int thread_num; + int server_num; + int client_num; + + uint64_t rate; + + struct cps_stat *curr; + struct cps_stat *next; + + struct cps_conn *conn; + struct cps_thread *thread[CPS_THREAD_MAX]; + + struct cps_stat records[2][CPS_THREAD_MAX]; +}; + +extern struct cps_var cps; + +inline static struct cps_conn * +CPS_CONN (int fd) +{ + return cps.conn + fd; +} + +inline static void +cps_add_server (struct cps_thread *thread, int fd, int sid) +{ + struct cps_conn *conn = CPS_CONN (fd); + + conn->sid = sid; + conn->next = thread->server; + conn->prev = &thread->server; + LB_TIME (conn->last); + if (thread->server >= 0) + CPS_CONN (thread->server)->prev = &conn->next; + thread->server = fd; +} + +inline static void +cps_add_conn (struct cps_thread *thread, int fd, int size, + struct timespec *begin) +{ + struct cps_conn *conn = CPS_CONN (fd); + + conn->size = size; + conn->last = *begin; + conn->next = thread->conn; + conn->prev = &thread->conn; + if (thread->conn >= 0) + CPS_CONN (thread->conn)->prev = &conn->next; + thread->conn = fd; + + if (++thread->conn_num > CPS_CNT_ITEM (thread, CPS_CNT_CONN_MAX)) + CPS_CNT_ITEM (thread, CPS_CNT_CONN_MAX) = thread->conn_num; + CPS_REC_TIMED_INC (thread, CPS_REC_CONN, conn->last); +} + +inline static void +cps_rem_conn (struct cps_thread *thread, int fd, struct cps_conn *conn) +{ + --thread->conn_num; + + *conn->prev = conn->next; + if (conn->next >= 0) + CPS_CONN (conn->next)->prev = conn->prev; +} + +inline static struct cps_thread * +cps_new_thread () +{ + struct cps_thread *thread = calloc (1, sizeof (struct cps_thread)); + + if (thread) + { + thread->index = cps.thread_num; + thread->epfd = -1; + thread->core = -1; + thread->conn = -1; + thread->server = -1; + + cps.thread[cps.thread_num++] = thread; + } + + return thread; +} + +#endif /* #ifndef _CPS_H_ */ diff --git a/thirdparty/apps/testapp/cps/cps_c.c b/thirdparty/apps/testapp/cps/cps_c.c new file mode 100644 index 0000000..dfcc341 --- /dev/null +++ b/thirdparty/apps/testapp/cps/cps_c.c @@ -0,0 +1,394 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "cps.h" + +struct cps_frag +{ + struct timespec begin; + uint64_t count; +}; + +struct cps_run +{ + struct cps_frag frag[CPS_FRAG_NUM]; + uint64_t total; + uint32_t fid; + int sid, ci, ii; + struct sockaddr_in addr; +}; + +inline static void +cps_c_next (struct cps_thread *thread, struct cps_run *run) +{ + if (thread->loop == CPS_LOOP_CF) + { + if (++run->ii >= thread->c_addr[run->ci].ip_num) + { + run->ii = 0; + if (++run->ci == thread->client_num) + { + run->ci = 0; + if (++run->sid >= thread->server_num) + run->sid = 0; + } + } + run->addr.sin_addr.s_addr = + htonl (thread->c_addr[run->ci].ip + run->ii); + } + else if (thread->loop == CPS_LOOP_SF) + { + if (++run->sid >= thread->server_num) + { + run->sid = 0; + if (++run->ii >= thread->c_addr[run->ci].ip_num) + { + run->ii = 0; + if (++run->ci >= thread->client_num) + run->ci = 0; + } + run->addr.sin_addr.s_addr = + htonl (thread->c_addr[run->ci].ip + run->ii); + } + } + else + { + if (++run->sid == thread->server_num) + run->sid = 0; + if (++run->ii >= thread->c_addr[run->ci].ip_num) + { + run->ii = 0; + if (++run->ci == thread->client_num) + run->ci = 0; + } + run->addr.sin_addr.s_addr = + htonl (thread->c_addr[run->ci].ip + run->ii); + } +} + +inline static int +cps_c_trigger (struct cps_thread *thread, struct cps_run *run) +{ + uint64_t nsec, num; + struct timespec now; + struct cps_frag *frag = &run->frag[run->fid % CPS_FRAG_NUM]; + struct cps_frag *from = &run->frag[(run->fid + 1) % CPS_FRAG_NUM]; + + LB_TIME (now); + + if (LB_CMP_NS (now, frag->begin, CPS_FRAG_NS)) + { + /* move to next fragment */ + frag = from; + from = &run->frag[++run->fid % CPS_FRAG_NUM]; + run->total -= frag->count; + frag->count = 0; + frag->begin = now; + } + + nsec = LB_SUB_NS (now, from->begin); + num = thread->rate * nsec / NSOFS; + if (num >= run->total) + { + run->total++; + frag->count++; + return 1; + } + + return 0; +} + +int +cps_c_create (struct cps_thread *thread, struct cps_run *run) +{ + int fd, ret; + struct timespec begin; + struct epoll_event event; + + CPS_REC_INC (thread, CPS_REC_INIT); + LB_TIME (begin); + + fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd < 0) + { + CPS_CNT_INC_E (thread, CPS_CNT_SOCKET_ERR, errno); + CPS_REC_INC (thread, CPS_REC_FAIL); + DBG ("->socket(...)=%d:%d\n", fd, errno); + return -1; + } + + if (fd >= CPS_MAX_FD) + { + CPS_CNT_INC (thread, CPS_CNT_GTFD); + goto ERR; + } + + ret = set_reuseaddr (fd, 1); + if (ret) + CPS_CNT_INC_E (thread, CPS_CNT_REUSEADDR_ERR, errno); + + ret = _bind (fd, (struct sockaddr *) &run->addr, sizeof (run->addr)); + if (ret) + { + CPS_CNT_INC_E (thread, CPS_CNT_BIND_ERR, errno); + DBG ("->bind(%d, %s)=%d:%d\n", fd, f_inaddr (&run->addr), ret, errno); + goto ERR; + } + ret = set_nodelay (fd, 1); + if (ret) + CPS_CNT_INC_E (thread, CPS_CNT_NODELAY_ERR, errno); + + ret = set_nonblock (fd); + if (ret) + { + CPS_CNT_INC_E (thread, CPS_CNT_NONBLOCK_ERR, errno); + goto ERR; + } + + ret = + _connect (fd, (struct sockaddr *) &thread->s_addr[run->sid], + sizeof (thread->s_addr[run->sid])); + if (ret) + { + const int e = errno; + if (e != EINPROGRESS) + { + CPS_CNT_INC_E (thread, CPS_CNT_CONNECT_ERR, e); + DBG ("->connect(%d, %s)=%d:%d\n", fd, + f_inaddr (&thread->s_addr[run->sid]), ret, errno); + goto ERR; + } + } + + event.events = EPOLLIN | EPOLLOUT | EPOLLET; + event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd); + ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event); + if (ret) + { + CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno); + DBG ("->epoll_ctl(%d, ADD, %d)=%d:%d\n", thread->epfd, fd, ret, errno); + goto ERR; + } + + cps_add_conn (thread, fd, 0, &begin); + + return 0; + +ERR: + _close (fd); + CPS_REC_INC (thread, CPS_REC_FAIL); + return -1; +} + +int +cps_c_io (struct cps_thread *thread, int fd, uint32_t events) +{ + int ret; + static char buf[CPS_DATA_MAX]; + +// struct cps_server *server = &thread->server[sid]; + struct cps_conn *conn = CPS_CONN (fd); + + if (events & EPOLLERR) + { + CPS_CNT_INC (thread, CPS_CNT_ERR_EVENT); + DBG ("(%d, %d, %x) EPOLLERR\n", thread->index, fd, events); + goto ERR; + } + + if (conn->size >= 0) + { + if (0 == (events & EPOLLOUT)) + return 0; + + while (1) + { + if (cps.run_state <= CPS_INIT) + return -1; + + ret = _send (fd, buf, cps.req_len - conn->size, 0); + if (ret > 0) + { + conn->size += ret; + if (conn->size >= cps.req_len) + { + struct epoll_event event; + event.events = EPOLLIN; + event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd); + conn->size = -cps.res_len; + ret = _epoll_ctl (thread->epfd, EPOLL_CTL_MOD, fd, &event); + if (ret) + { + CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno); + DBG ("->epoll_ctl(%d, MOD, %d)=%d:%d\n", thread->epfd, + fd, ret, errno); + goto ERR; + } + CPS_REC_TIMED_INC (thread, CPS_REC_SEND, conn->last); + break; + } + } + else + { + if (ret < 0) + { + const int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + return 0; + if (e == EINTR) + continue; + CPS_CNT_INC_E (thread, CPS_CNT_SEND_ERR, e); + } + else + { + CPS_CNT_INC (thread, CPS_CNT_SEND_ERR); + } + DBG ("->send(%d,, %d)=%d:%d\n", fd, cps.req_len - conn->size, + ret, errno); + goto ERR; + } + } + } + + if (0 == (events & EPOLLIN)) + return 0; + + while (cps.run_state > CPS_INIT) + { + ret = _recv (fd, buf, -conn->size, 0); + if (ret > 0) + { + conn->size += ret; + if (conn->size >= 0) + { + /* receive success */ + _close (fd); + CPS_REC_TIMED_INC (thread, CPS_REC_RECV, conn->last); + cps_rem_conn (thread, fd, conn); + return 0; + } + } + else + { + if (ret < 0) + { + const int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + return 0; /*wait event */ + if (e == EINTR) /* The receive was interrupted by delivery of a signal... */ + continue; /*recv again */ + CPS_CNT_INC_E (thread, CPS_CNT_RECV_ERR, e); + } + else + { + CPS_CNT_INC (thread, CPS_CNT_RECV_ERR); + } + DBG ("->recv(%d,, %d)=%d:%d\n", fd, -conn->size, ret, errno); + goto ERR; /* ret == 0 and not block meaning error */ + } + } + + DBG ("(%d, %d) cannot run there\n", thread->index, fd); + +ERR: + _close (fd); + CPS_REC_INC (thread, CPS_REC_FAIL); + cps_rem_conn (thread, fd, conn); + return -1; +} + +void * +cps_c_thread (void *arg) +{ + int i, num = 0; + struct cps_run run = { 0 }; + struct cps_thread *thread = (struct cps_thread *) arg; + struct epoll_event *event = thread->event; + + run.addr.sin_family = AF_INET; + run.addr.sin_port = htons (0); + + out + ("[%d] initialize thread %ld client:%d server:%d rate:%lu core:%d epfd:%d\n", + thread->index, pthread_self (), thread->client_num, thread->server_num, + thread->rate, thread->core, thread->epfd); + + futex_wait (&cps.run_state, CPS_INIT); + + LB_TIME (run.frag[0].begin); + for (i = 1; i < CPS_FRAG_NUM; ++i) + run.frag[i].begin = run.frag[0].begin; + + while (1) + { + /* open 1 connect */ + if (cps.run_state == CPS_RUNNING) + { + if (cps_c_trigger (thread, &run)) + { + cps_c_next (thread, &run); + cps_c_create (thread, &run); + } + } + else if (cps.run_state == CPS_CLOSING) + { + if (thread->conn_num <= 0) + break; + } + else + { + break; + } + + /* process 1 event */ + if (num > 0) + { + int fd = CPS_EV_FD (event->data.u64); + DBG ("epoll event:{sid:%d fd:%d e:%x}\n", + CPS_EV_SID (event->data.u64), fd, event->events); + + if ((uint32_t) fd >= CPS_MAX_FD) + { + CPS_CNT_INC (thread, CPS_CNT_FD_ERR); + } + else + { + (void) cps_c_io (thread, fd, event->events); + } + + num--; + event++; + } + + /* wait events */ + if (num <= 0) + { + event = thread->event; + num = _epoll_wait (thread->epfd, event, cps.evnum, 0); /* no wait */ + if (num < 0) + { + int e = errno; + if (e != EINTR) + CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, e); + } + } + + } + + __sync_fetch_and_sub (&cps.active_thread, 1); + return NULL; +} diff --git a/thirdparty/apps/testapp/cps/cps_s.c b/thirdparty/apps/testapp/cps/cps_s.c new file mode 100644 index 0000000..57d41c7 --- /dev/null +++ b/thirdparty/apps/testapp/cps/cps_s.c @@ -0,0 +1,320 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "cps.h" + +inline static void +cps_s_close (struct cps_thread *thread) +{ + int fd; + struct epoll_event ev = { 0 }; + + for (fd = thread->server; fd >= 0; fd = CPS_CONN (fd)->next) + { + (void) _epoll_ctl (thread->epfd, EPOLL_CTL_DEL, fd, &ev); + _close (fd); + } + + thread->server = -1; +} + +int +cps_s_listen (struct cps_thread *thread) +{ + int i; + + for (i = 0; i < thread->server_num; ++i) + { + int fd, ret; + struct timespec dummy; + struct epoll_event event; + struct sockaddr_in *server = &thread->s_addr[i]; + + fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno); + + ret = _bind (fd, (struct sockaddr *) server, sizeof (*server)); + ERR_RETURN (ret, -1, "bind(%d)=%d:%d\n", fd, ret, errno); + + ret = set_nonblock (fd); + ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno); + + ret = _listen (fd, SOMAXCONN); + ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno); + + event.events = EPOLLIN | EPOLLET; + event.data.u64 = CPS_EV_DATA (i, fd); + ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event); + ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret, + errno); + + out ("[%d.%d:%d] listen on %s\n", thread->index, i, fd, + f_inaddr (server)); + + cps_add_server (thread, fd, i); + } + + return 0; +} + +int +cps_s_accept (struct cps_thread *thread, int server_fd) +{ + while (cps.run_state == CPS_RUNNING) + { + int fd, ret; + struct timespec begin; + struct epoll_event event; +#if defined(DEBUG) + struct sockaddr_in addr; + socklen_t len = sizeof (addr); +#endif + + LB_TIME (begin); + +#if defined(DEBUG) && 0 + fd = + _accept4 (server_fd, (struct sockaddr *) &addr, &len, SOCK_NONBLOCK); +#else + fd = _accept4 (server_fd, NULL, NULL, SOCK_NONBLOCK); +#endif + if (fd < 0) + { + int e = errno; + if (e == EAGAIN || e == EWOULDBLOCK) + return 0; + DBG ("->accept4(%d)=%d:%d\n", server_fd, fd, e); + CPS_CNT_INC_E (thread, CPS_CNT_ACCEPT_ERR, errno); + return -1; + } + + CPS_REC_INC (thread, CPS_REC_INIT); +// DBG("(%d, %d) -> accepted(%d) %d: %s", thread->index, sid, server->fd, fd, f_inaddr(&addr)); + + if (fd >= CPS_MAX_FD) + { + _close (fd); + CPS_REC_INC (thread, CPS_REC_FAIL); + continue; + } + + ret = set_nodelay (fd, 1); + if (ret) + CPS_CNT_INC_E (thread, CPS_CNT_NODELAY_ERR, errno); + + event.events = EPOLLIN | EPOLLET; + event.data.u64 = CPS_EV_DATA (CPS_CONN_SID, fd); + ret = _epoll_ctl (thread->epfd, EPOLL_CTL_ADD, fd, &event); + if (ret) + { + _close (fd); + CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, errno); + CPS_REC_INC (thread, CPS_REC_FAIL); + DBG ("epoll_ctl(%d, %d)=%d:%d\n", thread->epfd, fd, ret, errno); + continue; + } + + cps_add_conn (thread, fd, -cps.req_len, &begin); + } + + return 0; +} + +int +cps_s_io (struct cps_thread *thread, int fd, uint32_t events) +{ + static char buf[CPS_DATA_MAX]; + + int ret; + struct cps_conn *conn = CPS_CONN (fd); + + DBG ("(%d, %d, %x) conn:{size:%d next:%d prev:%p:%d}\n", + thread->index, fd, events, conn->size, conn->next, conn->prev, + *conn->prev); + + if (events & EPOLLERR) + { + CPS_CNT_INC (thread, CPS_CNT_ERR_EVENT); + DBG ("(%d, %d, 0x%x)\n", thread->index, fd, events); + goto ERR; + } + + if (0 == (events & EPOLLIN)) + return 0; + + while (1) + { + if (cps.run_state <= CPS_INIT) + goto ERR; + + ret = _recv (fd, buf, sizeof (buf), 0); + if (ret > 0) + { + conn->size += ret; + if (conn->size >= 0) + { + CPS_REC_TIMED_INC (thread, CPS_REC_RECV, conn->last); + break; /* receive success */ + } + } + else + { + if (ret < 0) + { + const int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + return 0; + if (e == EINTR) + continue; + CPS_CNT_INC_E (thread, CPS_CNT_RECV_ERR, e); + } + else + { + CPS_CNT_INC (thread, CPS_CNT_RECV_ERR); + } + DBG ("->recv(%d,, %ld)=%d:%d\n", fd, sizeof (buf), ret, errno); + goto ERR; + } + } + + conn->size = 0; + + while (cps.run_state > CPS_INIT) + { + ret = _send (fd, buf, cps.res_len - conn->size, 0); + if (ret > 0) + { + conn->size += ret; + if (conn->size >= cps.res_len) + { + _close (fd); + CPS_REC_TIMED_INC (thread, CPS_REC_SEND, conn->last); + cps_rem_conn (thread, fd, conn); + return 0; + } + } + else + { + if (ret < 0) + { + const int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN || e == EINTR) + continue; + CPS_CNT_INC_E (thread, CPS_CNT_SEND_ERR, e); + } + else + { + CPS_CNT_INC (thread, CPS_CNT_SEND_ERR); + } + DBG ("->send(%d,, %d)=%d:%d\n", fd, cps.res_len - conn->size, ret, + errno); + goto ERR; + } + } + + return 0; + +ERR: + _close (fd); + cps_rem_conn (thread, fd, conn); + CPS_REC_INC (thread, CPS_REC_FAIL); + return -1; +} + +void * +cps_s_thread (void *arg) +{ + int num = 0; + struct cps_thread *thread = (struct cps_thread *) arg; + struct epoll_event *event = thread->event; + + out ("[%d] initialize thread %ld server:%d core:%d epfd:%d\n", + thread->index, pthread_self (), thread->server_num, thread->core, + thread->epfd); + + if (cps_s_listen (thread)) + { + cps.run_state = CPS_ERROR; + return NULL; + } + + futex_wait (&cps.run_state, CPS_INIT); + + while (1) + { + + if (num > 0) + { + int sid = CPS_EV_SID (event->data.u64); + int fd = CPS_EV_FD (event->data.u64); + DBG ("epoll evnet{sid:%d fd:%d event:%x}\n", sid, fd, + event->events); + if (sid >= 0) + { + if (event->events & EPOLLIN) + { + (void) cps_s_accept (thread, fd); + } + if (event->events & EPOLLERR) + { + wrn ("Error event for server %d\n", fd); + } + } + else + { + if (fd >= CPS_MAX_FD) + { + err ("Error connection index %d\n", fd); + } + else + { + (void) cps_s_io (thread, fd, event->events); + } + } + + num--; + event++; + } + + if (num <= 0) + { + if (cps.run_state == CPS_CLOSING) + { + if (thread->server >= 0) + cps_s_close (thread); + if (thread->conn_num <= 0) + break; + } + else if (cps.run_state <= CPS_INIT) + { + break; + } + + event = thread->event; + num = _epoll_wait (thread->epfd, event, cps.evnum, CPS_EPWAIT_MS); + if (num < 0) + { + int e = errno; + if (e != EINTR && e != ETIMEDOUT) + CPS_CNT_INC_E (thread, CPS_CNT_EPOLL_ERR, e); + } + } + } + + __sync_fetch_and_sub (&cps.active_thread, 1); + return NULL; +} |