diff options
Diffstat (limited to 'thirdparty/apps/testapp/lp')
-rw-r--r-- | thirdparty/apps/testapp/lp/lp.c | 1138 | ||||
-rw-r--r-- | thirdparty/apps/testapp/lp/lp.h | 615 | ||||
-rw-r--r-- | thirdparty/apps/testapp/lp/lpc.c | 669 | ||||
-rw-r--r-- | thirdparty/apps/testapp/lp/lps.c | 372 |
4 files changed, 2794 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/lp/lp.c b/thirdparty/apps/testapp/lp/lp.c new file mode 100644 index 0000000..a9b71d1 --- /dev/null +++ b/thirdparty/apps/testapp/lp/lp.c @@ -0,0 +1,1138 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "lp.h" + +void lp_exec (struct lp_worker *worker); + +#define LP_OPTIONS "s:c:bi:e:m:nwEC" DBGOPT "vh" + +static const struct option lp_options[] = { + {"block", 0, 0, 'b'}, + {"server", 1, 0, 's'}, + {"client", 1, 0, 'c'}, + {"nodelay", 0, 0, 'n'}, + {"interval", 1, 0, 'i'}, + {"no-error", 1, 0, 'e'}, + {"error-msg", 0, 0, 'E'}, + {"core", 1, 0, 'm'}, + DBGOPT_LONG {"watch", 0, 0, 'w'}, + {"no-color", 0, 0, 'C'}, + {"verbose", 0, 0, 'v'}, + {"help", 0, 0, 'h'}, + {0, 0, 0, 0} +}; + +static const char *MODES[] = { + [LP_RAND] = "*random link", + [LP_SYNC] = "=sync increment", + [LP_CPS] = "}client per server", + [LP_SPC] = "{server per client", +}; + +struct lp_var lp = { 0 }; + +void +lp_dump_sess (int fd) +{ + char buf[256]; + struct lp_sess *sess = LP_SESS (fd); + + co_init (buf, sizeof (buf)); + + co_append (buf, 80, + "fd:%d sess:%p state:0x%x test:%u epout:%u io_num:%u query:%d reply:%d\n", + fd, sess, sess->state, sess->test, sess->epout, sess->io_num, + sess->query, sess->reply); + co_append (buf, 50, " nest-sess:%d prev-sess:%p", sess->next_sess, + sess->prev_sess); + co_app_if (sess->prev_sess, buf, 10, "(%ld)", + CON_OF (sess->prev_sess, struct lp_sess, next_sess) - lp.sess); + co_append (buf, 50, " nest-rest:%d prev-rest:%p", sess->next_rest, + sess->prev_rest); + co_app_if (sess->prev_rest, buf, 10, "(%ld)", + CON_OF (sess->prev_rest, struct lp_sess, next_rest) - lp.sess); + co_append (buf, 30, " time:%ld.%09ld\n", sess->time.tv_sec, + sess->time.tv_nsec); + + co_flush (buf); +} + +void +lp_clean (struct lp_worker *worker) +{ + int i, fd; + + while (lp.run_state == LP_CLEAN) + { + fd = worker->conn.first; + if (fd < 0) + break; + lp_del_conn (worker, fd, LP_SESS (fd)); + } + LP_ASSERT (worker->conn.num == 0); + LP_ASSERT (worker->conn.first == -1); + LP_ASSERT (worker->conn.last = &worker->conn.first); + + while (lp.run_state == LP_CLEAN) + { + fd = worker->sess.first; + if (fd < 0) + break; + lp_del_sess (worker, fd); + } + LP_ASSERT (worker->sess.num == 0); + LP_ASSERT (worker->sess.first == -1); + LP_ASSERT (worker->sess.last = &worker->sess.first); + + for (i = 0; i < LP_MAX_TEST; ++i) + { + LP_ASSERT (worker->rest[i].num == 0); + LP_ASSERT (worker->rest[i].first == -1); + LP_ASSERT (worker->rest[i].last = &worker->rest[i].first); + LP_ASSERT (worker->wait[i].num == 0); + LP_ASSERT (worker->wait[i].first == -1); + LP_ASSERT (worker->wait[i].last = &worker->rest[i].first); + } +} + +void * +lp_thread (void *arg) +{ + const uint64_t one = 1; + struct lp_worker *worker = (struct lp_worker *) arg; + + (void) __sync_or_and_fetch (&lp.active_worker, one << worker->index); + + futex_wait (&lp.run_state, LP_INIT); + + if (lp.client_mode) + lp_client (worker); + else + lp_server (worker); + + lp_clean (worker); + + out ("%s! worker %d return%s\n", CR, worker->index, CC); + (void) __sync_and_and_fetch (&lp.active_worker, ~(one << worker->index)); + + return NULL; +} + +void +lp_round (int id) +{ + struct lp_test *test = &lp.test[id]; + char buf[128]; + + co_init (buf, sizeof (buf)); + + co_append (buf, 30, "%s! round %d:", CR, id); + co_append (buf, 30, " target:%s", + test->target == LP_TARGET_INF ? "*" : f_uint (test->target)); + co_append (buf, 30, " time:%s", + test->time == LP_TIME_INF ? "*" : f_uint (test->time)); + co_append (buf, 30, " up:%s", + test->up == LP_UP_INF ? "*" : f_uint (test->up)); + co_append (buf, 30, " down:%s", + test->down == LP_DOWN_INF ? "*" : f_uint (test->down)); + co_append (buf, 30, " query:%u", test->query); + co_append (buf, 30, " reply:%u", test->reply); + co_append (buf, 30, " times:%s", + test->times == LP_TIMES_INF ? "*" : f_uint (test->times)); + co_app_if (test->close_after_io, buf, 1, "-"); + co_append (buf, 30, " period:%u", test->period); + co_append (buf, 30, " wait:%u", test->wait); + co_append (buf, 30, " %s\n", CC); + + co_flush (buf); +} + +int +lp_start () +{ + int i; + cpu_set_t set; + + lp.sess = (struct lp_sess *) calloc (LP_MAX_FD, sizeof (struct lp_sess)); + ERR_RETURN (!lp.sess, -1, "Out of memory\n"); + + lp.run_state = LP_INIT; + + lp.curr = lp.stat[0]; + lp.next = lp.stat[1]; + + if (lp.interval == 0) + lp.interval = LP_INTERVAL_DEF; + + if (lp.core) + { + CPU_ZERO (&set); + for (i = 0; i < 64; ++i) + { + if (lp.core & (1 << i)) + CPU_SET (i, &set); + } + } + + for (i = 0; i < lp.worker_num; ++i) + { + int ret; + struct epoll_event event; + struct lp_worker *worker = LP_WORKER (i); + + out ("[%d] creating worker\n", i); + + worker->io_buf = malloc (LP_IOBUF_SIZE); + ERR_RETURN (!worker->io_buf, -1, "Out of memory for IO buffer\n"); + + worker->ev_buf = malloc (sizeof (struct epoll_event) * LP_EVENT_NUM); + ERR_RETURN (!worker->ev_buf, -1, "Out of memory for events buffer\n"); + + TP (epoll_create); + worker->epfd = _epoll_create (10); + ERR_RETURN (worker->epfd < 0, -1, "epoll_create()=%d:%d\n", + worker->epfd, errno); + worker->ctlfd = eventfd (0, 0); + ERR_RETURN (worker->ctlfd < 0, -1, "eventfd()=%d:%d\n", worker->epfd, + errno); + TQ (epoll_create); + + event.events = EPOLLIN; + event.data.u64 = LP_EV_MK (LP_CONTROL_TYPE, worker->ctlfd); + ret = _epoll_ctl (worker->epfd, EPOLL_CTL_ADD, worker->ctlfd, &event); + ERR_RETURN (ret, -1, "epoll_ctl(epfd:%d, add, evfd:%d,)=%d:%d\n", + worker->epfd, worker->ctlfd, ret, errno); + + worker->tid = + lb_thread (lp_thread, worker, "lp-%s-%d", + lp.client_mode ? "client" : "server", i); + ERR_RETURN (worker->tid <= 0, -1, "Create worker thread failed\n"); + + if (lp.core) + { + ret = pthread_setaffinity_np (worker->tid, sizeof (set), &set); + if (ret) + err ("Bind core failed!\n"); + } + + if (lp.client_mode) + { + size_t size; + + size = + sizeof (struct lb_run) + sizeof (struct lb_slot) * LP_UP_SLOT; + worker->up_run = (struct lb_run *) calloc (1, size); + ERR_RETURN (!worker->up_run, -1, "Out of memory for up run\n"); + + size = + sizeof (struct lb_run) + sizeof (struct lb_slot) * LP_DOWN_SLOT; + worker->down_run = (struct lb_run *) calloc (1, size); + ERR_RETURN (!worker->down_run, -1, "Out of memory for down run\n"); + + out + ("[%d] worker %ld created, client:%d server:%d epfd:%d ctlfd:%d\n", + worker->index, pthread_self (), worker->client_num, + worker->server_num, worker->epfd, worker->ctlfd); + } + else + { + int j; + TP (lp_listen); + for (j = 0; j < worker->server_num; ++j) + { + if (lp_listen (worker, j)) + return -1; + } + TQ (lp_listen); + out ("[%d] worker %ld created, server:%d epfd:%d ctlfd:%d\n", + worker->index, pthread_self (), worker->server_num, + worker->epfd, worker->ctlfd); + } + } + + out ("%s! running %s\n", CR, CC); + if (lp.client_mode) + { + lp_round (0); + } + + lp.run_state = LP_EXEC; + + futex_wake (&lp.run_state, lp.worker_num); + return 0; +} + +void +lp_stop (int state) +{ + int i; + + lp.run_state = state; + + for (i = 0; i < lp.worker_num; ++i) + { + struct lp_worker *worker = LP_WORKER (i); + if (worker->tid && worker->ctlfd >= 0) + { + (void) lp_post_cmd (worker->ctlfd, LP_CMD_STOP); + } + } +} + +#define _FREE(p) do { if (p) { free(p); p = NULL; } } while (0) + +void +lp_exit () +{ + int i; + + lp_stop (LP_EXIT); + + for (i = 0; i < lp.worker_num; ++i) + { + int fd; + struct lp_worker *worker = LP_WORKER (i); + + if (worker->tid) + { + const struct timespec wait = { tv_sec: 3, tv_nsec:0 }; + (void) pthread_timedjoin_np (worker->tid, NULL, &wait); + worker->tid = 0; + } + + for (fd = worker->server.first; fd >= 0; fd = LP_SESS (fd)->next_sess) + _close (fd); + + lp_clean (worker); + + if (worker->ctlfd >= 0) + { + _close (worker->ctlfd); + worker->ctlfd = -1; + } + if (worker->epfd >= 0) + { + _close (worker->epfd); + worker->epfd = -1; + } + + _FREE (worker->server_addr); + _FREE (worker->client_addr); + _FREE (worker->up_run); + _FREE (worker->down_run); + _FREE (worker->io_buf); + _FREE (worker->ev_buf); + } + + _FREE (lp.sess); +} + +inline static const char * +lp_errmsg (int e) +{ + static const char *errmsg[LP_ERRNO_NUM] = { 0 }; + if (NULL == errmsg[e]) + errmsg[e] = strerror (e); + return errmsg[e]; +} + +const static char *lp_cntmsg[LP_CNT_NUM] = { + [LP_E_SOCKET] = "socket", + [LP_E_BIND] = "bind", + [LP_E_ACCEPT] = "accept", + [LP_E_CONNECT] = "connect", + [LP_E_NODELAY] = "nodelay", + [LP_E_NONBLOCK] = "nonblock", + [LP_E_REUSEADDR] = "reuseaddr", + [LP_E_REUSEPORT] = "reuseport", + [LP_E_RECV] = "recv", + [LP_E_SEND] = "send", + + [LP_E_EPADD] = "ep-add", + [LP_E_EPMOD] = "ep-mod", + [LP_E_EPDEL] = "ep-del", + [LP_E_EPWAIT] = "ep-wait", + [LP_E_EPUNUSED] = "ep-unused", + [LP_E_EPHUP] = "ep-hup", + [LP_E_EPERR] = "ep-err", + [LP_E_EPINOUT] = "ep-inout", + [LP_E_EVIDLE] = "ep-idle", + [LP_E_EPEVENT] = "ep-event", + + [LP_E_IOSHUT] = "io-shut", + [LP_E_IOSIZE] = "io-size", + [LP_E_IOMORE] = "io-more", + [LP_E_IOEXCEED] = "io-exceed", + [LP_E_IOSEND0] = "io-send0", + + [LP_W_CREATE] = "cre", + [LP_W_SOCKET] = "soc", + [LP_W_BIND] = "bin", + [LP_W_CONNECT] = "con", + [LP_W_CONNECTED] = "est", + [LP_W_ACCEPT] = "acc", + [LP_W_CLOSE] = "clo", +}; + +void +lp_output (char buf[], const struct lp_stat *stat, uint64_t nsec, int mask) +{ + int i; + const uint64_t *cnt = stat->cnt; + static int w_num = 0, w_up = 0, w_down = 0, w_conn = 0; + static int w_q_mb = 0, w_q_cp = 0, w_r_mb = 0, w_r_cp = 0; + + co_append (buf, 5, " %s ", CH); + w_num = co_wr_uint (buf, cnt[LP_REC_NUM], w_num); + co_append (buf, 5, "%s", CC); + + co_append (buf, 3, " < "); + w_up = co_wr_uint (buf, lb_gdiv (cnt[LP_CONNECTED], nsec), w_up); + if (lp.client_mode && !lp.block_connecting) + { + co_append (buf, 1, " / "); + w_conn = co_wr_uint (buf, lb_gdiv (cnt[LP_CONNECT], nsec), w_conn); + } + co_append (buf, 3, " - "); + w_down = co_wr_uint (buf, lb_gdiv (cnt[LP_CLOSE], nsec), w_down); + co_append (buf, 3, " > "); + + co_append (buf, 3, " [ "); + w_q_mb = + co_wr_uint (buf, + lb_gdiv (cnt[LP_QUERY_BYTE] * 8, nsec /* * (1000000 / 8) */ ), + w_q_mb); + co_append (buf, 1, " "); + w_q_cp = co_wr_uint (buf, lb_gdiv (cnt[LP_QUERY_COMP], nsec /* * 1000 */ ), + w_q_cp); + co_append (buf, 3, " : "); + w_r_mb = + co_wr_uint (buf, + lb_gdiv (cnt[LP_REPLY_BYTE] * 8, nsec /* * (1000000 / 8) */ ), + w_r_mb); + co_append (buf, 1, " "); + w_r_cp = co_wr_uint (buf, lb_gdiv (cnt[LP_REPLY_COMP], nsec /* * 1000 */ ), + w_r_cp); + co_append (buf, 3, " ] "); + + co_app_if (cnt[LP_FAILED], buf, 40, " F:%s%s%s", + FR__, f_uint (lb_gdiv (cnt[LP_FAILED], nsec)), CC); + + if (mask & LP_W_SIGN) + { + co_append (buf, 8, " time{"); + for (i = LP_W_BEGIN; i < LP_W_END; i += 2) + co_app_if (cnt[i], buf, 60, " %s:%s", lp_cntmsg[i], + f_uint (cnt[i + 1] / cnt[i])); + co_append (buf, 4, " }"); + } + + if (mask & LP_E_SIGN) + { + co_append (buf, 8, " err{"); + for (i = LP_E_BEGIN; i < LP_E_END; ++i) + co_app_if (cnt[i], buf, 60, " %s:%s", lp_cntmsg[i], f_uint (cnt[i])); + co_append (buf, 4, " }"); + } + + if (!lp.err_msg && (mask & LP_ERR_SIGN)) + { + co_append (buf, 5, " E:{"); + for (i = 1; i < LP_ERRNO_NUM; ++i) + co_app_if (stat->err[i], buf, 40, " %d:%lu", i, stat->err[i]); + co_app_if (stat->err[0], buf, 30, " -:%lu", stat->err[0]); + co_append (buf, 5, " }"); + } + + co_append (buf, 4, "\n"); + + if (lp.err_msg && (mask & LP_ERR_SIGN)) + { + for (i = 1; i < LP_ERRNO_NUM; ++i) + co_app_if (stat->err[i], buf, 100, "<E%d:%lu> %s\n", i, stat->err[i], + lp_errmsg (i)); + co_app_if (stat->err[0], buf, 100, "<E-:%s> Other error\n", + f_uint (stat->err[0])); + } +} + +void +lp_timer (uint64_t nsec) +{ + const static struct timespec delay = {.tv_sec = 0,.tv_nsec = + LP_DELAY_MS * 1000 * 1000 + }; + static int line = -2; + static int base; + + int i, second, total = 0; + char buf[256]; + struct tm *lc; + struct lp_stat *curr, sum = { 0 }; + + curr = lp.curr; + lp.curr = lp.next; + lp.next = curr; + + { + time_t tv = time (NULL); + lc = localtime (&tv); + } + + co_init (buf, sizeof (buf)); + + (void) nanosleep (&delay, NULL) /* wait for cps.curr use */ ; + + for (i = 0; i < lp.worker_num; ++i) + { + int j, mask = 0; + uint64_t count = 0; + + curr->cnt[LP_REC_NUM] = LP_WORKER (i)->sess.num; + + for (count = 0, j = 0; j < LP_R_END; ++j) + { + sum.cnt[j] += curr->cnt[j]; + count += curr->cnt[j]; + } + if (count) + mask |= LP_R_SIGN; + + for (count = 0, j = LP_E_BEGIN; j < LP_E_END; ++j) + { + sum.cnt[j] += curr->cnt[j]; + count += curr->cnt[j]; + } + if (count) + mask |= LP_E_SIGN; + + if (lp.watch) + { + for (count = 0, j = LP_W_BEGIN; j < LP_W_END; j += 2) + { + sum.cnt[j] += curr->cnt[j]; + sum.cnt[j + 1] += curr->cnt[j + 1]; + count += curr->cnt[j]; + } + if (count) + mask |= LP_W_SIGN; + } + + for (count = 0, j = 0; j < LP_ERRNO_NUM; ++j) + { + sum.err[j] += curr->err[j]; + count += curr->err[j]; + } + if (count) + mask |= LP_ERR_SIGN; + + if (mask && lp.verbose) + { + co_append (buf, 10, " %4dw ", i); + lp_output (buf, curr + i, nsec, mask); + } + + total |= mask; + } + + if (total) + { + line = line < 0 ? 0 : line + 1; + if (line == 0) + { + base = lc->tm_hour * 3600 + lc->tm_min * 60 + lc->tm_sec; + } + } + else + { + line = line > 0 ? 0 : line - 1; + if (line == -1) + co_append (buf, 5, "\n"); + } + + if (line >= 0) + { + if (line) + { + second = (lc->tm_hour * 3600 + lc->tm_min * 60 + lc->tm_sec) - base; + while (second < 0) + second += (24 * 3600); + co_append (buf, 10, " %5d", second); + } + else + { + co_append (buf, 10, " %2d:%02d ", lc->tm_hour, lc->tm_min); + } + lp_output (buf, &sum, nsec, total); + } + + co_flush (buf); + + (void) memset (curr, 0, sizeof (struct lp_stat) * lp.worker_num); +} + +int +lp_loop () +{ + const static struct timespec timeout = {.tv_sec = 0,.tv_nsec = LP_LOOP_TIMER + }; + + struct timespec begin, from, last_begin; + time_t next_time = lp.interval; + + LB_TIME (begin); + from = begin; + last_begin = begin; + + while (lp.run_state > 0) + { + struct timespec now; + + (void) nanosleep (&timeout, NULL); + + LB_TIME (now); + + if (lp.run_state == LP_CLEAN) + { + if (lp.active_worker == 0) + break; + } + + if (lp.client_mode && lp.run_state == LP_EXEC) + { + struct lp_test *test = &lp.test[lp.test_id]; + uint64_t total = lp_total_sess (); + + if (LB_CMP_S (now, last_begin, test->time) + || (test->up >= test->down ? total >= test->target : total <= + test->target)) + { + if (lp.test_id >= lp.test_num - 1) + { + lp.run_state = LP_CLEAN; + out ("%s! cleanup%s\n", CR, CC); + } + else + { + lp.test_id++; /* run changed */ + last_begin = now; + lp_round (lp.test_id); + } + } + } + + if (!LB_CMP_S (now, begin, next_time)) + continue; + + lp_timer (LB_SUB_NS (now, from)); + + from = now; + next_time += lp.interval; + + if (!lp.client_mode) + { + const struct timespec rest = {.tv_sec = lp.interval - 1,.tv_nsec = + LP_LOOP_REST + }; + (void) nanosleep (&rest, NULL); + } + } + + return 0; +} + +#ifndef SIGNAL_LP_C_ +#define SIGNAL_LP_C_ + +void +lp_break (int s) +{ + DBG (" SIGNALED %d running:%d\n", s, lp.run_state); + out ("\n"); + + if (lp.run_state == LP_CLEAN) + { + out ("%s! safe exit%s\n", CR, CC); + lp_exit (); + } + else if (lp.run_state >= 0) + { + out ("%s! clean exit%s\n", CR, CC); + lp_stop (LP_CLEAN); + } + else + { + out ("%s! direct exit%s\n", CR, CC); + exit (1); + } +} + +void +lp_sigpipe (int s) +{ + DBG ("SIGPIPE\n"); +} + +int +lp_init () +{ + struct sigaction s = { 0 }; + + (void) sigemptyset (&s.sa_mask); + + s.sa_flags = SA_NODEFER; + s.sa_handler = (void *) lp_break; + (void) sigaction (SIGINT, &s, NULL); + (void) sigaction (SIGQUIT, &s, NULL); + + s.sa_handler = lp_sigpipe; + (void) sigaction (SIGPIPE, &s, NULL); + +// lb_sigsegv_setup(); + + lp.CPU_NUM = get_nprocs (); + if (lp.CPU_NUM <= 0) + lp.CPU_NUM = 1; + + { + struct timespec t; + LB_TIME (t); + srandom (getpid () + t.tv_sec + (t.tv_sec >> 32) + t.tv_nsec + + (t.tv_nsec >> 32)); + } + + return 0; +} + +#endif + +void +lp_usage (const char *name) +{ + out ("USAGE: %s [OPTIONS] TEST-SET... # %s version\n", name, VERSION_NAME); +} + +void +lp_help (const char *name) +{ + lp_usage (name); + out (" Options:\n"); + out (" -s, --server LIST set one server address list\n"); + out (" X.Y.Z.M-N:P1-P2,...\n"); + out (" -c, --client LIST set one client address list\n"); + out + (" CLIENT*SERVER: R.S.T.K-J:Pa-Pb,...*X.Y.Z.M-N:P1-P2,...\n"); + out (" A,B,C,D*1,2 random link\n"); + out (" A,B,C,D=1,2 A1B2C1D2\n"); + out (" A,B,C,D}1,2 A1B1C1D1 A2B2C2D2\n"); + out + (" A,B,C,D{1,2 A1A2 B1B2 C1C2 D1D2\n"); + out + (" -b, --block set block mode for connecting(client only)\n"); + out (" -n, --nodelay set nodelay\n"); + out (" -i, --interval # report time(default:%ds max:%ds)\n", + LP_INTERVAL_DEF, LP_INTERVAL_MAX); + out + (" -m, --core #HEX set bind cpu core mask(hex mode)\n"); +#ifdef DEBUG + out (" -D, --debug show debug information\n"); +#endif + out + (" -w, --watch show watch time statistic\n"); + out (" -e, --no-error #-# skip error\n"); + out (" -E, --error-msg show error message\n"); + out (" -C, --no-color no color\n"); + out (" -v, --verbose show worker statistics\n"); + out (" -h, --help help\n"); + out (" TEST-SET for client\n"); + out + (" TARGET@TIME+UP-DOWN=QUERY:REPLY*TIMES-/PERIOD%%WAIT (client only)\n"); + out (" TARGET max connection(default: INFINITE)\n"); + out (" @TIME max time(0 or default:INFINITE)\n"); + out + (" +UP connect rate(default: 0 no connnect; *: INFINITE)\n"); + out + (" -DOWN close rate(default: 0 no close; *: INFINITE)\n"); + out (" =... IO set(default: no IO)\n"); + out (" QUERY send query data len(%u-%u)\n", + LP_QUERY_MIN, LP_QUERY_MAX); + out + (" :REPLY receive response data len(0-%d; default: same with QUERY)\n", + LP_REPLY_MAX); + out + (" *TIMES- IO times(0 or default: INFINITE; suffix-: IO then close)\n"); + out + (" /PERIOD IO period time(0-%us; default: one by one)\n", + LP_PERIOD_MAX); + out + (" %%WAIT first IO wait time(0-%us; default: 0 no wait)\n", + LP_WAIT_MAX); + out (" UNITS:\n"); + out (" k=1000 m=1000k g=1000m w=10000 K=1024 M=1024K G=1024M\n"); + out (" s=Seconds m=Minutes h=Hours\n"); +} + +int +lp_args_test (const char *arg) +{ + const char *p; + struct lp_test *test = &lp.test[lp.test_num]; + + ERR_RETURN (lp.test_num >= LP_MAX_TEST, -1, "Too many test set, max:%d\n", + LP_MAX_TEST); + (void) memset (test, 0, sizeof (struct lp_test)); + + if (*arg >= '0' && *arg <= '9') + { + test->target = p_value (arg, LP_TARGET_MAX, UB_1kmgwKMG, &p); + ERR_RETURN (!p, -1, "Invalid test TARGET set: '%s'\n", arg); + } + else + { + test->target = LP_TARGET_DEF; + p = arg; + } + + if (*p == '@') + { + test->time = p_value (p + 1, LP_TIME_MAX, UB_hms1, &p); + ERR_RETURN (!p, -1, "Invalid test TIME set: '%s'\n", arg); + } + else + { + test->time = LP_TIME_DEF; + } + + if (*p == '+') + { + if (p[1] == '*') + { + test->up = LP_UP_INF; + p += 2; + } + else + { + test->up = p_value (p + 1, LP_UP_MAX, UB_1kmgwKMG, &p); + ERR_RETURN (!p, -1, "Invalid test UP-RATE set: '%s'\n", arg); + } + } + else + { + test->up = LP_UP_DEF; + } + + if (*p == '-') + { + if (p[1] == '*') + { + test->down = LP_DOWN_INF; + p += 2; + } + else + { + test->down = p_value (p + 1, LP_DOWN_MAX, UB_1kmgwKMG, &p); + ERR_RETURN (!p, -1, "Invalid test DOWN-RATE set: '%s'\n", arg); + } + } + else + { + test->down = LP_DOWN_DEF; + } + + if (*p == '=') + { + test->query = p_value (p + 1, LP_QUERY_MAX, UB_1kmgwKMG, &p); + ERR_RETURN (!p + || test->query < LP_QUERY_MIN, -1, + "Invalid test QUERY '%s'\n", arg); + + if (*p == ':') + { + test->reply = p_value (p + 1, LP_REPLY_MAX, UB_1kmgwKMG, &p); + ERR_RETURN (!p, -1, "Invalid test REPLY set: '%s'\n", arg); + } + else + { + test->reply = test->query; + } + + if (*p == '*') + { + test->times = p_uint (p + 1, LP_TIMES_MAX, &p); + ERR_RETURN (!p, -1, "Invalid test TIMES set: '%s'\n", arg); + if (test->times == 0) + test->times = LP_TIMES_INF; + if (*p == '-') + { + test->close_after_io = 1; + p++; + } + } + else + { + test->times = LP_TIMES_DEF; + } + + if (*p == '/') + { + test->period = p_value (p + 1, LP_PERIOD_MAX, UB_hms1, &p); + ERR_RETURN (!p, -1, "Invalid test PRIOD set: '%s'\n", arg); + if (*p == '%') + { + test->wait = p_value (p + 1, LP_WAIT_MAX, UB_hms1, &p); + ERR_RETURN (!p, -1, "Invalid test WAIT set: '%s'\n", arg); + } + else + { + test->wait = LP_WAIT_DEF; + } + } + else + { + test->period = LP_PERIOD_DEF; + test->wait = 0; + } + } + else + { + test->query = 0; + test->reply = 0; + test->times = 0; + test->period = 0; + test->wait = 0; + } + + ERR_RETURN (*p, -1, "Invalid test set: '%s'\n", arg); + + if (test->up < test->down) + test->down_mode = 1; + lp.test_num++; + return 0; +} + +inline static void +lp_noerr (int b, int e) +{ + for (; b <= e; ++b) + lp.no_err[b / 64] |= (1 << (b % 64)); +} + +/* -e M-N */ +int +lp_args_noerr (const char *arg) +{ + int b, e; + + b = (int) p_uint (arg, LP_ERRNO_NUM, &arg); + if (!arg) + return -1; + if (*arg == '-') + { + e = (int) p_uint (arg + 1, LP_ERRNO_NUM, &arg); + if (!arg || e < b) + return -1; + } + else + { + e = b; + } + + if (*arg != 0) + return -1; + + lp_noerr (b, e); + + return 0; +} + +struct lp_worker * +lp_init_worker () +{ + int i; + struct lp_worker *worker = LP_WORKER (lp.worker_num); + + ERR_RETURN (lp.worker_num >= LP_MAX_WORKER, NULL, + "Too many workers, limit:%d\n", LP_MAX_WORKER); + + (void) memset (worker, 0, sizeof (*worker)); + + worker->index = lp.worker_num; + worker->epfd = -1; + worker->ctlfd = -1; + + lp_init_head (&worker->server); + lp_init_head (&worker->sess); + lp_init_head (&worker->conn); + for (i = 0; i < LP_MAX_TEST; ++i) + { + lp_init_head (&worker->rest[i]); + lp_init_head (&worker->wait[i]); + } + + lp.worker_num++; + + return worker; +} + +int +lp_args (int argc, char *argv[]) +{ + int i, opt, index, ret; + struct lp_worker *worker; + const char *arg; + int server_mode = 0; + + while (EOF != + (opt = getopt_long (argc, argv, LP_OPTIONS, lp_options, &index))) + { + const char *end; + + switch (opt) + { + case 'c': + ERR_RETURN (server_mode, -1, "Only server or client\n"); + lp.client_mode = 1; + worker = lp_init_worker (); + if (!worker) + return -1; + worker->client_num = p_addrin_list (optarg, &worker->client_addr, + LP_CLIENT_MAX, + PA_DEF_PORT | PAL_NO_SPACE, + &arg); + ERR_RETURN (worker->client_num <= 0, -1, + "Bad client for address list '%s'\n", optarg); + for (i = 0; i < CNT_OF (MODES); ++i) + { + if (*arg == MODES[i][0]) + break; + } + ERR_RETURN (i >= CNT_OF (MODES), -1, + "Bad mode for address list '%s'\n", optarg); + worker->link_mode = i; + arg++; + worker->server_num = p_addrin_list (arg, &worker->server_addr, + LP_SERVER_MAX, + PA_MUST_PORT | PAL_NO_SPACE, + NULL); + ERR_RETURN (worker->server_num <= 0, -1, + "Bad server for address list '%s'\n", arg); + break; + case 's': + ERR_RETURN (lp.client_mode, -1, "Only server or client\n"); + server_mode = 1; + worker = lp_init_worker (); + if (!worker) + return -1; + worker->server_num = + p_addrin_list (optarg, &worker->server_addr, LP_SERVER_MAX, + PA_MUST_PORT | PAL_NO_SPACE, NULL); + ERR_RETURN (worker->server_num <= 0, -1, + "Bad server for address list '%s'\n", optarg); + break; + + case 'i': + lp.interval = (int) p_int (optarg, LP_INTERVAL_MAX, &end); + ERR_RETURN (!end || *end, -1, "Invalid interval '%s'\n", optarg); + break; + case 'e': + ret = lp_args_noerr (optarg); + ERR_RETURN (ret, ret, "Invalid no-error set '%s'\n", optarg); + break; + case 'm': + lp.core = p_hex (optarg, &end); + ERR_RETURN (!end + || lp.core >= (1 << lp.CPU_NUM), -1, + "Invalid bind core set\n"); + break; + case 'C': + lb_set_color (LB_NO_COLOR); + break; + + case 'b': + lp.block_connecting = 1; + break; + case 'n': + lp.nodelay = 1; + break; + case 'w': + lp.watch = 1; + break; + case 'E': + lp.err_msg = 1; + break; + case 'v': + lp.verbose = 1; + break; + +#ifdef DEBUG + case 'D': + enable_debug = 1; + break; +#endif + case 'h': + lp_help (argv[0]); + exit (0); + case '?': + err ("Invalid arguments\n"); + return -1; + default: + err ("Unknown option '%c'.\n", opt); + return -1; + } + } + + ERR_RETURN (lp.worker_num <= 0, -1, + "Please set server or client address\n"); + + if (lp.client_mode) + { + for (index = optind; index < argc; ++index) + { + ret = lp_args_test (argv[index]); + if (ret) + return ret; + } + ERR_RETURN (lp.test_num <= 0, -1, "Please set test\n"); + } + else + { + ERR_RETURN (optind < argc, -1, "Unknown option '%s'\n", argv[optind]); + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + if (argc <= 1) + { + lp_usage (argv[0]); + return 0; + } + + if (lp_init ()) + return 1; + + if (lp_args (argc, argv) == 0 && lp_start () == 0) + lp_loop (); + + lp_exit (); + + return 0; +} diff --git a/thirdparty/apps/testapp/lp/lp.h b/thirdparty/apps/testapp/lp/lp.h new file mode 100644 index 0000000..4a2a09a --- /dev/null +++ b/thirdparty/apps/testapp/lp/lp.h @@ -0,0 +1,615 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef _LP_H_ +#define _LP_H_ 1 + +#define LP_DBG 1 + +#if LP_DBG +#define LP_IF_DBG(line) line +#else +#define LP_IF_DBG(line) ((void)0) +#endif + +#define LP_ASSERT(cond) LP_IF_DBG(assert(cond)) + +#define LP_DELAY_MS 5 +#define LP_LOOP_TIMER (49 * 1000 * 1000) +#define LP_LOOP_REST (900 * 1000 * 1000) + +#define LP_UP_SLOT 1024 +#define LP_UP_NSEC (1000 * 1000) + +#define LP_DOWN_SLOT 1024 +#define LP_DOWN_NSEC (1000 * 1000) + +#define LP_MAX_FD (16 * 1024 * 1024) +#define LP_MAX_WORKER 64 +#define LP_MAX_TEST 10 + +#define LP_IOBUF_SIZE (128 * 1024) + +#define LP_EVENT_NUM 256 + +#define LP_SERVER_MAX 10000 +#define LP_CLIENT_MAX 100000 + +#define LP_INFINITE (0xFFFFffff) +#define LP_UNLIMITED LP_INFINITE +#define LP_VALUE_MAX (LP_INFINITE - 1) + +#define LP_INTERVAL_MIN 1 +#define LP_INTERVAL_MAX 60 +#define LP_INTERVAL_DEF 1 + +#define LP_TARGET_INF LP_INFINITE +#define LP_TARGET_MAX (16 * 1000 * 1000) /* 16m max */ +#define LP_TARGET_DEF LP_INFINITE + +#define LP_TIME_INF LP_INFINITE +#define LP_TIME_MAX (24 * 60 * 60) +#define LP_TIME_DEF LP_TIME_INF + +#define LP_UP_MAX ( 1 * 1000 * 1000) +#define LP_UP_INF LP_INFINITE +#define LP_UP_DEF 0 + +#define LP_DOWN_MAX (1 * 1000 * 1000) +#define LP_DOWN_INF LP_INFINITE +#define LP_DOWN_DEF 0 + +#define LP_QUERY_MIN 8 /* 2 unsigned int */ +#define LP_QUERY_MAX 65536 + +#define LP_REPLY_MAX (10 * 1024 * 1024) /* 10M */ + +#define LP_TIMES_INF LP_INFINITE +#define LP_TIMES_MAX LP_VALUE_MAX +#define LP_TIMES_DEF LP_TIMES_INF + +#define LP_PERIOD_MIN 1 +#define LP_PERIOD_MAX (60 * 60) +#define LP_PERIOD_DEF 1 + +#define LP_WAIT_MAX (60 * 60) +#define LP_WAIT_DEF 0 + +#define LP_FLAG_MASK (0xffffFFFFull << 32) +#define LP_TYPE_MASK (0x3ull << 32) +#define LP_CONTROL_TYPE (0x1ull << 32) +#define LP_LISTEN_TYPE (0x2ull << 32) +#define LP_SESSION_TYPE (0x3ull << 32) + +inline static uint64_t +LP_EV_MK (uint64_t flag, int fd) +{ + return flag | (uint64_t) (uint32_t) fd; +} + +inline static int +LP_EV_FD (uint64_t data_u64) +{ + return (int) (uint32_t) data_u64; +} + +inline static uint64_t +LP_EV_TYPE (uint64_t data_u64) +{ + return data_u64 & LP_TYPE_MASK; +} + +inline static uint64_t +LP_EV_FLAG (uint64_t data_u64) +{ + return data_u64 & LP_FLAG_MASK; +} + +#define LP_R_SIGN 1 +#define LP_E_SIGN 2 +#define LP_W_SIGN 4 +#define LP_ERR_SIGN 8 + +enum +{ + LP_CONNECTED, + LP_CONNECT, + LP_CLOSE, + LP_FAILED, + + LP_QUERY_COMP, + LP_QUERY_BYTE, + LP_REPLY_COMP, + LP_REPLY_BYTE, + + LP_REC_NUM, + + LP_R_END, + LP_E_BEGIN = LP_R_END, + + LP_E_SOCKET = LP_E_BEGIN, + LP_E_BIND, + LP_E_ACCEPT, + LP_E_CONNECT, + LP_E_NODELAY, + LP_E_NONBLOCK, + LP_E_REUSEADDR, + LP_E_REUSEPORT, + LP_E_RECV, + LP_E_SEND, + + LP_E_EPADD, + LP_E_EPMOD, + LP_E_EPDEL, + LP_E_EPWAIT, + LP_E_EPUNUSED, + LP_E_EPHUP, + LP_E_EPERR, + LP_E_EPINOUT, + LP_E_EVIDLE, + LP_E_EPEVENT, + + LP_E_IOSHUT, + LP_E_IOSIZE, + LP_E_IOMORE, + LP_E_IOEXCEED, + LP_E_IOSEND0, + + LP_E_END, + LP_W_BEGIN = LP_E_END, + + LP_W_CREATE = LP_W_BEGIN, LP_T_CREATE, + LP_W_SOCKET, LP_T_SOCKET, + LP_W_BIND, LP_T_BIND, + LP_W_CONNECT, LP_T_CONNECT, + LP_W_CONNECTED, LP_T_CONNECTED, + LP_W_ACCEPT, LP_T_ACCEPT, + LP_W_CLOSE, LP_T_CLOSE, + + LP_W_END, + + LP_CNT_NUM = LP_W_END +}; + +#define LP_ERRNO_NUM 256 +#define LP_NOERR_NUM ((LP_ERRNO_NUM + 63) / 64) + +struct lp_stat +{ + uint64_t cnt[LP_CNT_NUM]; + uint64_t err[LP_ERRNO_NUM]; +}; + +#define LP_STAT(worker) (lp.curr + (worker)->index) +#define LP_ADD(worker, id, num) (LP_STAT(worker)->cnt[(id)] += (num)) +#define LP_CNT(worker, id) (++LP_STAT(worker)->cnt[(id)]) +#define LP_ERR(worker, id, e) do { \ + unsigned int _e = (unsigned int)(e); \ + if (_e >= LP_ERRNO_NUM) \ + _e = 0; \ + if (0 == (lp.no_err[_e / 64] & (1 << (_e % 64)))) { \ + struct lp_stat *_stat = LP_STAT(worker); \ + _stat->cnt[(id)]++; \ + _stat->err[_e]++; \ + } \ +} while (0) +#define LP_CNT2(worker, id1, id2) do { \ + struct lp_stat *_stat = LP_STAT(worker); \ + _stat->cnt[(id1)]++; \ + _stat->cnt[(id2)]++; \ +} while (0) +#define LP_ERR2(worker, id1, id2, e) do { \ + unsigned int _e = (unsigned int)(e); \ + if (_e >= LP_ERRNO_NUM) \ + _e = 0; \ + if (0 == (lp.no_err[_e / 64] & (1 << (_e % 64)))) { \ + struct lp_stat *_stat = LP_STAT(worker); \ + _stat->cnt[(id1)]++; \ + _stat->cnt[(id2)]++; \ + _stat->err[_e]++; \ + } \ +} while (0) + +#define LP_TIME_SET(begin) struct timespec begin; \ + do { \ + if (lp.watch) \ + LB_TIME(begin); \ + } while (0) +#define LP_TIME_REG(worker, id, nsec) do { \ + if (lp.watch) { \ + uint64_t *_w = &(LP_STAT(worker)->cnt[(id)]); \ + _w[0] ++; \ + _w[1] += nsec; \ + } \ +} while (0) +#define LP_TIME_FOR(worker, id, begin, end) LP_TIME_REG((worker), (id), LB_SUB_NS((end), (begin))) +#define LP_TIME_END(worker, id, begin) do { \ + LP_TIME_SET(_end); \ + LP_TIME_FOR((worker), (id), (begin), _end); \ +} while (0) + +struct lp_io +{ + int query; + int reply; +} __attribute__ ((__packed__)); + +struct lp_head +{ + uint32_t num; + int first; + int *last; +}; + +enum +{ + LP_S_UNUSED = 0, + LP_S_PREPARE = 0x20, + LP_S_CONNECTING = 0x80, + + LP_S_CONNECTED = 0x10, + + LP_S_IDLE = LP_S_CONNECTED | 0, + LP_S_QUERY = LP_S_CONNECTED | 1, + LP_S_REPLY = LP_S_CONNECTED | 2, + LP_S_REST = LP_S_CONNECTED | 4, + LP_S_WAIT = LP_S_CONNECTED | 8, +}; + +struct lp_sess +{ + uint8_t state; + uint8_t test; + uint8_t epout; + uint32_t io_num; + int query; + int reply; + + int *prev_sess; + int next_sess; + int next_rest; + int *prev_rest; + struct timespec time; + +#if LP_DBG + uint16_t work; + uint16_t anum; + uint16_t fnum; +#endif +}; + +struct lp_test +{ + uint32_t target; + uint32_t time; + + uint32_t up; + uint32_t down; + + int query; + int reply; + + uint32_t times; + uint32_t period; + uint32_t wait; + + uint8_t down_mode; + uint8_t close_after_io; + uint8_t _pad[2]; +}; + +enum +{ + LP_SYNC, + LP_RAND, + LP_CPS, + LP_SPC, +}; + +struct lp_worker +{ + int index; + int epfd; + int ctlfd; + int server_num; + int client_num; + int link_mode; + + pthread_t tid; + + struct lp_head server; + struct lp_head sess; /* ESTABLISHED */ + struct lp_head conn; /* connecting(for nonblock of client) */ + struct lp_head rest[LP_MAX_TEST]; /* io wait queue */ + struct lp_head wait[LP_MAX_TEST]; /* io wait queue */ + + struct sockaddr_in *server_addr; + struct sockaddr_in *client_addr; + + struct lb_run *up_run; + struct lb_run *down_run; + + struct epoll_event *ev_buf; + + void *io_buf; +}; + +enum lp_state +{ + LP_EXIT = -1, + LP_INIT = 0, + LP_EXEC = 1, + LP_CLEAN = 2, +}; + +struct lp_var +{ + int CPU_NUM; + volatile int run_state; + int worker_num; + + uint8_t verbose; + uint8_t watch; + uint8_t err_msg; + uint8_t _pad; + + int client_mode; + int block_connecting; + + int interval; + int nodelay; + + int test_num; + volatile int test_id; + + uint64_t core; + uint64_t active_worker; + + struct lp_sess *sess; + struct lp_stat *volatile curr; + struct lp_stat *next; + struct lp_stat stat[2][LP_MAX_WORKER]; + struct lp_test test[LP_MAX_TEST]; + struct lp_worker worker[LP_MAX_WORKER]; + uint64_t no_err[LP_NOERR_NUM]; +}; + +extern struct lp_var lp; + +inline static struct lp_worker * +LP_WORKER (int index) +{ + return &lp.worker[index]; +} + +inline static struct lp_sess * +LP_SESS (int fd) +{ + return lp.sess + fd; +} + +inline static uint64_t +lp_total_sess () +{ + int i; + uint64_t total = 0; + for (i = 0; i < lp.worker_num; ++i) + total += lp.worker[i].sess.num; + return total; +} + +#define LP_APPEND(head, fd, sess, name) do { \ + struct lp_sess *_s = (sess); \ + LP_ASSERT(_s->next_##name == fd); \ + LP_ASSERT(_s->prev_##name == NULL); \ + _s->next_##name = -1; \ + _s->prev_##name = (head).last; \ + *(head).last = fd; \ + (head).last = &_s->next_##name; \ + (head).num++; \ +} while (0) + +#define LP_REMOVE(head, fd, sess, name) do { \ + struct lp_sess *_s = (sess); \ + LP_ASSERT((head).num); \ + if ((*_s->prev_##name = _s->next_##name) >= 0) \ + LP_SESS(_s->next_##name)->prev_##name = _s->prev_##name; \ + else \ + (head).last = _s->prev_##name; \ + (head).num--; \ + LP_IF_DBG(_s->next_##name = (fd)); \ + LP_IF_DBG(_s->prev_##name = NULL); \ +} while (0) + +inline static void +lp_init_head (struct lp_head *head) +{ + head->first = -1; + head->num = 0; + head->last = &head->first; +} + +inline static void +lp_dest_sess (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_ASSERT (sess->next_rest == fd); + LP_ASSERT (sess->next_sess == fd); + LP_ASSERT (sess->prev_rest == NULL); + LP_ASSERT (sess->prev_sess == NULL); + LP_ASSERT (sess->work == worker->index); +#if LP_DBG + sess->fnum++; +#endif + LP_ASSERT (sess->fnum == sess->anum); + + sess->state = LP_S_UNUSED; +} + +inline static struct lp_sess * +lp_init_sess (struct lp_worker *worker, int fd) +{ + struct lp_sess *sess = LP_SESS (fd); + + if (sess->state != LP_S_UNUSED) + { + void lp_dump_sess (int fd); + wrn ("Invalid session fd:%d\n", fd); +#if LP_DBG + lp_dump_sess (fd); +#endif + LP_ASSERT (0); + } + LP_ASSERT (sess->fnum == sess->anum); + + sess->state = LP_S_PREPARE; + sess->epout = 0; + sess->io_num = 0; + sess->query = 0; + sess->reply = 0; + +#if LP_DBG + sess->work = worker->index; + sess->anum++; + sess->next_rest = sess->next_sess = fd; + sess->prev_rest = sess->prev_sess = NULL; +#endif + + return sess; +} + +inline static void +lp_add_conn (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_ASSERT (sess->state == LP_S_PREPARE); + + LP_APPEND (worker->conn, fd, sess, sess); + sess->state = LP_S_CONNECTING; +} + +inline static void +lp_out_conn (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_ASSERT (worker->conn.num); + LP_ASSERT (sess->state == LP_S_CONNECTING); + + LP_REMOVE (worker->conn, fd, sess, sess); +} + +inline static void +lp_del_conn (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + lp_out_conn (worker, fd, sess); + lp_dest_sess (worker, fd, sess); + _close (fd); +} + +inline static void +lp_add_rest (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_APPEND (worker->rest[sess->test], fd, sess, rest); + sess->state = LP_S_REST; +} + +inline static void +lp_out_rest (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_ASSERT (worker->rest[sess->test].num); + + LP_REMOVE (worker->rest[sess->test], fd, sess, rest); + sess->state = LP_S_IDLE; +} + +inline static void +lp_add_wait (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_APPEND (worker->wait[sess->test], fd, sess, rest); + sess->state = LP_S_WAIT; +} + +inline static void +lp_out_wait (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + LP_ASSERT (worker->wait[sess->test].num); + + LP_REMOVE (worker->wait[sess->test], fd, sess, rest); + sess->state = LP_S_IDLE; +} + +inline static void +lp_del_sess (struct lp_worker *worker, int fd) +{ + struct lp_sess *sess = LP_SESS (fd); + + LP_ASSERT (sess->state & LP_S_CONNECTED); + + if (sess->state == LP_S_REST) + lp_out_rest (worker, fd, sess); + else if (sess->state == LP_S_WAIT) + lp_out_wait (worker, fd, sess); + LP_REMOVE (worker->sess, fd, sess, sess); + + lp_dest_sess (worker, fd, sess); + LP_TIME_SET (begin); + _close (fd); + LP_TIME_END (worker, LP_W_CLOSE, begin); + + LP_CNT (worker, LP_CLOSE); +} + +inline static int +lp_epctl (const struct lp_worker *worker, int op, int fd, uint32_t io) +{ + struct epoll_event event; + event.events = io | EPOLLET | EPOLLRDHUP | EPOLLHUP; + event.data.u64 = LP_EV_MK (LP_SESSION_TYPE, fd); + return _epoll_ctl (worker->epfd, op, fd, &event); +} + +#define lp_epadd(worker, fd, io) lp_epctl((worker), EPOLL_CTL_ADD, (fd), (io)) +#define lp_epmod(worker, fd, io) lp_epctl((worker), EPOLL_CTL_MOD, (fd), (io)) +#define lp_epdel(worker, fd) _epoll_ctl((worker->epfd), EPOLL_CTL_DEL, (fd), NULL) + +inline static int +lp_set_epout (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + if (sess->epout) + return 1; + + if (lp_epmod (worker, fd, EPOLLOUT)) + { + LP_ERR (worker, LP_E_EPMOD, errno); + return -1; + } + + sess->epout = 1; + return 1; +} + +#define LP_CMD_STOP 1 + +inline static int +lp_post_cmd (int fd, long long int cmd) +{ + ssize_t ret = _write (fd, (void *) &cmd, sizeof (cmd)); + + return ret - sizeof (cmd); +} + +void lp_client (struct lp_worker *worker); +void lp_server (struct lp_worker *worker); +int lp_listen (struct lp_worker *worker, int index); + +#endif /* #ifndef _LP_H_ */ diff --git a/thirdparty/apps/testapp/lp/lpc.c b/thirdparty/apps/testapp/lp/lpc.c new file mode 100644 index 0000000..7da0414 --- /dev/null +++ b/thirdparty/apps/testapp/lp/lpc.c @@ -0,0 +1,669 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "lp.h" + +inline static int +lp_io_finish (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret; + + sess->state = LP_S_IDLE; + --sess->io_num; + + if (sess->io_num) + { + lp_add_rest (worker, fd, sess); + sess->time.tv_sec += lp.test[sess->test].period; + return 0; + } + + if (lp.test[sess->test].close_after_io) + return -1; + + if (lp_epdel (worker, fd)) + LP_ERR (worker, LP_E_EPDEL, errno); + + return 0; +} + +int +lp_recv_reply (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret; + char *buf = worker->io_buf; + + LP_ASSERT (sess->query == 0); + + while (1) + { + ret = _recv (fd, buf, LP_IOBUF_SIZE, 0); + if (ret > 0) + { + LP_ADD (worker, LP_REPLY_BYTE, ret); + sess->reply -= ret; + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + { + if (sess->reply > 0) + return 1; + break; + } + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_RECV, e); + return -1; + } + else + { + if (sess->reply) + LP_CNT (worker, LP_E_IOSHUT); + else + LP_CNT (worker, LP_REPLY_COMP); + return -1; + } + } + + LP_CNT (worker, LP_REPLY_COMP); + + if (sess->reply < 0) + { + LP_CNT (worker, LP_E_IOEXCEED); + sess->reply = 0; + } + + return lp_io_finish (worker, fd, sess); +} + +int +lp_more_query (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret; + char *buf = worker->io_buf; + + LP_ASSERT (sess->io_num); + + while (sess->query > 0) + { + const int LEN = + sess->query < LP_IOBUF_SIZE ? sess->query : LP_IOBUF_SIZE; + + ret = _send (fd, buf, LEN, 0); + if (ret > 0) + { + LP_ADD (worker, LP_QUERY_BYTE, ret); + sess->query -= ret; + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + return lp_set_epout (worker, fd, sess); + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_SEND, e); + return -1; + } + else + { + LP_CNT (worker, LP_E_IOSEND0); + } + } + + LP_CNT (worker, LP_QUERY_COMP); + + if (sess->epout) + { + if (lp_epmod (worker, fd, EPOLLIN)) + { + LP_ERR (worker, LP_E_EPMOD, errno); + return -1; + } + sess->epout = 0; + } + + if (!sess->reply) + { + return lp_io_finish (worker, fd, sess); + } + + sess->state = LP_S_REPLY; + return 0; +} + +int +lp_new_query (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret, len = 0; + char *buf = worker->io_buf; + struct lp_io *io = (struct lp_io *) buf; + + LP_ASSERT (sess->state == LP_S_IDLE); + LP_ASSERT (sess->query == 0); + LP_ASSERT (sess->reply == 0); + LP_ASSERT (sess->io_num); + + sess->state = LP_S_QUERY; + sess->query = lp.test[sess->test].query; + sess->reply = lp.test[sess->test].reply; + + io->query = htonl (sess->query); + io->reply = htonl (sess->reply); + + while (1) + { + ret = _send (fd, buf + len, sess->query - len, 0); + if (ret > 0) + { + LP_ADD (worker, LP_QUERY_BYTE, ret); + len += ret; + if (len >= sizeof (struct lp_io)) + break; + LP_CNT (worker, LP_E_IOMORE); + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + { + LP_CNT (worker, LP_E_IOMORE); + continue; + } + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_SEND, e); + return -1; + } + else + { + LP_CNT (worker, LP_E_IOSEND0); + } + } + + sess->query -= len; + + return lp_more_query (worker, fd, sess); +} + +int +lp_pre_connect (struct lp_worker *worker, int fd, struct sockaddr_in *c_addr) +{ + int ret; + + if (fd >= LP_MAX_FD) + { + LP_CNT (worker, LP_FAILED); + err ("fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD); + return -1; + } + + ret = set_reuseaddr (fd, 1); + if (ret) + { + const int e = errno; + LP_ERR (worker, LP_E_REUSEADDR, e); + DBG ("set_reuseaddr(%d, 1)=%d:%d\n", fd, ret, e); + } + + ret = set_reuseport (fd, 1); + if (ret) + { + const int e = errno; + LP_ERR (worker, LP_E_REUSEPORT, errno); + DBG ("set_reuseport(%d, 1)=%d:%d\n", fd, ret, e); + } + + if (c_addr->sin_addr.s_addr != INADDR_ANY || c_addr->sin_port != 0) + { + LP_TIME_SET (bind_begin); + ret = + _bind (fd, (struct sockaddr *) c_addr, sizeof (struct sockaddr_in)); + LP_TIME_END (worker, LP_W_BIND, bind_begin); + if (ret) + { + int e = errno; + if (e == EADDRINUSE) + return -1; + LP_ERR2 (worker, LP_FAILED, LP_E_BIND, errno); + DBG ("->bind(%d, %s)=%d:%d\n", fd, f_inaddr (c_addr), ret, errno); + return -1; + } + } + + if (lp.nodelay) + { + ret = set_nodelay (fd, 1); + if (ret) + LP_ERR2 (worker, LP_FAILED, LP_E_NODELAY, errno); + } + + if (!lp.block_connecting) + { + ret = set_nonblock (fd); + if (ret) + { + LP_ERR2 (worker, LP_FAILED, LP_E_NONBLOCK, errno); + return -1; + } +#if 1 + ret = lp_epadd (worker, fd, EPOLLOUT); + if (ret) + { + LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, errno); + DBG ("->epoll_ctl(%d, ADD, %d)=%d:%d", worker->epfd, fd, ret, + errno); + return -1; + } + return 1; +#endif + } + + return 0; +} + +int +lp_connected (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + struct timespec now; + struct lp_test *test; + + LB_TIME (now); + LP_TIME_FOR (worker, LP_W_CONNECTED, sess->time, now); + sess->time = now; + + LP_APPEND (worker->sess, fd, sess, sess); + sess->state = LP_S_CONNECTED; + LP_CNT (worker, LP_CONNECTED); + + test = &lp.test[sess->test]; + if (test->query) + { + if (lp.block_connecting && set_nonblock (fd)) + { + LP_ERR (worker, LP_E_NONBLOCK, errno); + return -1; + } + if (sess->epout) + { + if (lp_epmod (worker, fd, EPOLLIN)) + { + LP_ERR (worker, LP_E_EPMOD, errno); + return -1; + } + sess->epout = 0; + } + else + { + if (lp_epadd (worker, fd, EPOLLIN)) + { + LP_ERR (worker, LP_E_EPADD, errno); + return -1; + } + } + sess->io_num = test->times; + + if (test->wait) + { + sess->time.tv_sec += test->wait; + lp_add_wait (worker, fd, sess); + return 0; + } + + return lp_new_query (worker, fd, sess); + } + + if (sess->epout) + { + if (lp_epdel (worker, fd)) + LP_ERR (worker, LP_E_EPDEL, errno); + sess->epout = 0; + } + + return 0; +} + +int +lp_connect (struct lp_worker *worker, int cid, int sid, int test_id) +{ + int ret, fd; + struct lp_sess *sess; + struct timespec connect_begin; + + LP_TIME_SET (begin); + fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd < 0) + { + LP_ERR2 (worker, LP_FAILED, LP_E_SOCKET, errno); + DBG ("->socket(...)=%d:%d\n", fd, errno); + return -1; + } + LP_TIME_END (worker, LP_W_SOCKET, begin); + + ret = lp_pre_connect (worker, fd, worker->client_addr + cid); + if (ret < 0) + { + _close (fd); + return -1; + } + + LB_TIME (connect_begin); + ret = + _connect (fd, (struct sockaddr *) (worker->server_addr + sid), + sizeof (struct sockaddr_in)); + LP_TIME_END (worker, LP_W_CONNECT, connect_begin); + + if (ret) + { + const int e = errno; + if (lp.block_connecting || e != EINPROGRESS) + { + LP_ERR2 (worker, LP_FAILED, LP_E_CONNECT, e); + DBG ("->connect(%d, %s)=%d:%d\n", fd, + f_inaddr (worker->server_addr + sid), ret, e); + _close (fd); + return -1; + } + } + + LP_TIME_END (worker, LP_W_CREATE, begin); + LP_CNT (worker, LP_CONNECT); + + sess = lp_init_sess (worker, fd); + sess->test = test_id; + sess->time = connect_begin; + if (!lp.block_connecting) + sess->epout = 1; + + if (ret) + { + /* nonblock connect inprogress */ + lp_add_conn (worker, fd, sess); + return 1; + } + + ret = lp_connected (worker, fd, sess); + if (ret < 0) + lp_del_sess (worker, fd); + + return ret; +} + +inline static int +lp_handle_client (struct lp_worker *worker, int fd, uint32_t events) +{ + struct lp_sess *sess = LP_SESS (fd); + + if (sess->state == LP_S_UNUSED) + { + LP_CNT (worker, LP_E_EPUNUSED); + return 0; + } + + if (events & EPOLLRDHUP) + { + LP_CNT (worker, LP_E_EPHUP); + return -1; + } + + if (events & EPOLLERR) + { + LP_CNT (worker, LP_E_EPERR); + DBG ("epoll event error, fd:%d event:0x%x\n", fd, events); + return -1; + } + + if (events & EPOLLIN) + { + if (sess->state == LP_S_REPLY) + return lp_recv_reply (worker, fd, sess); + LP_CNT (worker, LP_E_EPERR); + return 0; + } + + if (events & EPOLLOUT) + { + if (sess->state == LP_S_CONNECTING) + { + lp_out_conn (worker, fd, sess); + return lp_connected (worker, fd, sess); + } + if (sess->state == LP_S_QUERY) + return lp_more_query (worker, fd, sess); + LP_CNT (worker, LP_E_EPERR); + return 0; + } + + LP_CNT (worker, LP_E_EPEVENT); + return -1; +} + +inline static void +lp_init_mode (const struct lp_worker *worker, int *cid, int *sid) +{ + if (worker->link_mode == LP_RAND) + { + *cid = LB_RAND (worker->client_num); + *sid = LB_RAND (worker->server_num); + } + else + { + *cid = 0; + *sid = 0; + } +} + +inline static void +lp_next_mode (const struct lp_worker *worker, int *cid, int *sid) +{ + if (worker->link_mode == LP_RAND) + { + *cid = LB_RAND (worker->client_num); + *sid = LB_RAND (worker->server_num); + } + else if (worker->link_mode == LP_SYNC) + { + if (++*sid >= worker->server_num) + *sid = 0; + if (++*cid >= worker->client_num) + *cid = 0; + } + else if (worker->link_mode == LP_CPS) + { + if (++*cid >= worker->client_num) + { + *cid = 0; + if (++*sid >= worker->server_num) + *sid = 0; + } + } + else if (worker->link_mode == LP_SPC) + { + if (++*sid >= worker->server_num) + { + *sid = 0; + if (++*cid >= worker->client_num) + *cid = 0; + } + } + else + { + err ("Error mode value:%d\n", worker->link_mode); + } +} + +void +lp_client (struct lp_worker *worker) +{ + int ret, num = 0, cid, sid, test_id = -1; + struct epoll_event *event; + struct lp_test *test = &lp.test[0]; + struct lb_run *up = worker->up_run; + struct lb_run *down = worker->down_run; + const int epfd = worker->epfd; + + lp_init_mode (worker, &cid, &sid); + + while (lp.run_state == LP_EXEC) + { + int i; + struct timespec now; + + if (test_id != lp.test_id) + { + DBG ("worker %d change test_id %d\n", worker->index, lp.test_id); + test = &lp.test[test_id = lp.test_id]; + + if (test->up) + run_init (up, (test->up + lp.worker_num - 1) / lp.worker_num, + LP_UP_SLOT, LP_UP_NSEC); + if (test->down) + run_init (down, (test->down + lp.worker_num - 1) / lp.worker_num, + LP_DOWN_SLOT, LP_DOWN_NSEC); + } + + LB_TIME (now); + + /* up process */ + if (test->up) + { + if (run_test (up, &now) > 0) + { + ret = lp_connect (worker, cid, sid, test_id); + if (ret >= 0) + run_add (up, 1); + lp_next_mode (worker, &cid, &sid); + LB_TIME (now); + } + } + + /* down process */ + if (test->down && worker->sess.num) + { + if (run_test (down, &now) > 0) + { + lp_del_sess (worker, worker->sess.first); + run_add (down, 1); + LB_TIME (now); + } + } + + /* query */ + for (i = 0; i <= test_id; ++i) + { + int fd; + struct lp_sess *sess; + + if (worker->rest[i].num) + { + fd = worker->rest[i].first; + sess = LP_SESS (fd); + if (LB_CMP (now, sess->time) >= 0) + { + LP_ASSERT (sess->state == LP_S_REST); + LP_ASSERT (sess->test == i); + lp_out_rest (worker, fd, sess); + if (lp_new_query (worker, fd, sess) < 0) + { + lp_del_sess (worker, fd); + if (test->down) + run_add (down, 1); + } + LB_TIME (now); + } + } + if (worker->wait[i].num) + { + fd = worker->wait[i].first; + sess = LP_SESS (fd); + if (LB_CMP (now, sess->time) >= 0) + { + LP_ASSERT (sess->state == LP_S_WAIT); + LP_ASSERT (sess->test == i); + lp_out_wait (worker, fd, sess); + if (lp_new_query (worker, fd, sess) < 0) + { + lp_del_sess (worker, fd); + if (test->down) + run_add (down, 1); + } + LB_TIME (now); + } + } + } + + /* check connect timeout */ + if (worker->conn.first >= 0) + { + } + + /* epoll event process */ + if (num > 0) + { + const uint64_t type = LP_EV_TYPE (event->data.u64); + const int fd = LP_EV_FD (event->data.u64); + + if (type == LP_SESSION_TYPE) + { + if (lp_handle_client (worker, fd, event->events) < 0) + { + struct lp_sess *sess = LP_SESS (fd); + if (sess->state & LP_S_CONNECTED) + { + lp_del_sess (worker, fd); + if (test->down) + run_add (down, 1); + } + else if (sess->state == LP_S_CONNECTING) + { + lp_del_conn (worker, fd, sess); + } + else + { + } + } + } + else if (type == LP_CONTROL_TYPE) + { + break; + } + else + { + err ("epoll event error flag:%lx, fd:%d event:%x}\n", + LP_EV_FLAG (event->data.u64), fd, event->events); + } + num--; + event++; + } + else + { + num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, 0); + if (num > 0) + { + event = worker->ev_buf; + } + else if (num < 0) + { + int e = errno; + if (e != EINTR && e != ETIMEDOUT) + LP_ERR (worker, LP_E_EPWAIT, e); + } + } + } +} diff --git a/thirdparty/apps/testapp/lp/lps.c b/thirdparty/apps/testapp/lp/lps.c new file mode 100644 index 0000000..8462c21 --- /dev/null +++ b/thirdparty/apps/testapp/lp/lps.c @@ -0,0 +1,372 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "lb.h" +#include "lp.h" + +int +lp_send_reply (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret; + char *buf = worker->io_buf; + + while (sess->reply > 0) + { + const int LEN = + sess->reply < LP_IOBUF_SIZE ? sess->reply : LP_IOBUF_SIZE; + + ret = _send (fd, buf, LEN, 0); + if (ret > 0) + { + LP_ADD (worker, LP_REPLY_BYTE, ret); + sess->reply -= ret; + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + return lp_set_epout (worker, fd, sess); + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_SEND, e); + return -1; + } + else + { + LP_CNT (worker, LP_E_IOSEND0); + } + } + + LP_CNT (worker, LP_REPLY_COMP); + + if (sess->epout) + { + if (lp_epmod (worker, fd, EPOLLIN)) + { + LP_ERR (worker, LP_E_EPMOD, errno); + return -1; + } + sess->epout = 0; + } + + return 0; +} + +int +lp_just_query (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + char *buf = worker->io_buf; + struct lp_io *io = (struct lp_io *) buf; + int len = 0; + + while (1) + { + int ret; + + ret = _recv (fd, buf + len, LP_IOBUF_SIZE - len, 0); + + if (ret > 0) + { + LP_ADD (worker, LP_QUERY_BYTE, ret); + len += ret; + if (len >= sizeof (struct lp_io)) + break; + LP_CNT (worker, LP_E_IOMORE); + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + { + LP_CNT (worker, LP_E_IOMORE); + continue; + } + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_RECV, e); + return -1; + } + else + { + //LP_CNT(worker, LP_E_IOSHUT); + return -1; + } + } + + sess->query = htonl (io->query); + sess->reply = htonl (io->reply); + + if (sess->query < LP_QUERY_MIN || sess->reply < 0) + { + LP_CNT (worker, LP_E_IOSIZE); + return -1; + } + + sess->query -= len; + return 0; +} + +int +lp_recv_query (struct lp_worker *worker, int fd, struct lp_sess *sess) +{ + int ret; + char *buf = worker->io_buf; + + if (sess->query == 0) + { + ret = lp_just_query (worker, fd, sess); + if (ret) + return ret; + } + + while (1) + { + ret = _recv (fd, buf, LP_IOBUF_SIZE, 0); + if (ret > 0) + { + LP_ADD (worker, LP_QUERY_BYTE, ret); + sess->query -= ret; + } + else if (ret < 0) + { + int e = errno; + if (e == EWOULDBLOCK || e == EAGAIN) + { + if (sess->query > 0) + return 1; + break; + } + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_RECV, e); + return -1; + } + else + { + if (sess->query || sess->reply) + { + //LP_CNT(worker, LP_E_IOSHUT); + } + else + LP_CNT (worker, LP_QUERY_COMP); + return -1; + } + } + + LP_CNT (worker, LP_QUERY_COMP); + + if (sess->query < 0) + { + LP_CNT (worker, LP_E_IOEXCEED); + sess->query = 0; + } + + if (sess->reply) + return lp_send_reply (worker, fd, sess); + + return 0; +} + +inline static int +lp_service (struct lp_worker *worker, int fd, uint32_t events) +{ + struct lp_sess *sess = LP_SESS (fd); + + if (sess->state == LP_S_UNUSED) + { + LP_CNT (worker, LP_E_EPUNUSED); + return 0; + } + + if (events & EPOLLRDHUP) + return -1; + + if (events & EPOLLERR) + { + LP_CNT (worker, LP_E_EPERR); + DBG ("epoll event error, fd:%d event:0x%x\n", fd, events); + return -1; + } + + if ((events & (EPOLLIN | EPOLLOUT)) == (EPOLLIN | EPOLLOUT)) + { + LP_CNT (worker, LP_E_EPINOUT); + return -1; + } + + if (events & EPOLLIN) + return lp_recv_query (worker, fd, sess); + + if (events & EPOLLOUT) + return lp_send_reply (worker, fd, sess); + + LP_CNT (worker, LP_E_EPEVENT); + return -1; +} + +void +lp_accept (struct lp_worker *worker, int listen_fd) +{ + while (lp.run_state == LP_EXEC) + { + int fd, ret; + struct lp_sess *sess; + + LP_TIME_SET (begin); + + fd = _accept4 (listen_fd, NULL, NULL, SOCK_NONBLOCK); + if (fd < 0) + { + int e = errno; + if (e == EAGAIN || e == EWOULDBLOCK) + return; + if (e == EINTR) + continue; + LP_ERR (worker, LP_E_ACCEPT, e); + DBG ("->accept4(%d)=%d:%d\n", listen_fd, fd, e); + return; + } + else + { + LP_TIME_END (worker, LP_W_ACCEPT, begin); + } + + if (fd >= LP_MAX_FD) + { + LP_CNT (worker, LP_FAILED); + _close (fd); + err ("accept fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD); + continue; + } + + if (lp.nodelay) + { + ret = set_nodelay (fd, 1); + if (ret) + LP_ERR (worker, LP_E_NODELAY, errno); + } + + ret = lp_epadd (worker, fd, EPOLLIN); + if (ret) + { + int e = errno; + LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, e); + _close (fd); + DBG ("epoll_ctl(%d, %d)=%d:%d\n\n", worker->epfd, fd, ret, e); + continue; + } + + sess = lp_init_sess (worker, fd); + LP_APPEND (worker->sess, fd, sess, sess); + sess->state = LP_S_CONNECTED; + LP_CNT (worker, LP_CONNECTED); + + LP_TIME_END (worker, LP_W_CREATE, begin); + } +} + +void +lp_server (struct lp_worker *worker) +{ + int num = 0; + struct epoll_event *event; + const int epfd = worker->epfd; + + while (lp.run_state == LP_EXEC) + { + if (num > 0) + { + const uint64_t type = LP_EV_TYPE (event->data.u64); + const int fd = LP_EV_FD (event->data.u64); + + if (type == LP_LISTEN_TYPE) + { + lp_accept (worker, fd); + } + else if (type == LP_SESSION_TYPE) + { + if (lp_service (worker, fd, event->events) < 0) + lp_del_sess (worker, fd); + } + else if (type == LP_CONTROL_TYPE) + { + break; + } + else + { + err ("epoll event error flag:%lx, fd:%d event:%x}\n", + LP_EV_FLAG (event->data.u64), fd, event->events); + } + num--; + event++; + } + else + { + num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, -1); + if (num > 0) + { + event = worker->ev_buf; + } + else if (num < 0) + { + int e = errno; + if (e != EINTR && e != ETIMEDOUT) + LP_ERR (worker, LP_E_EPWAIT, e); + } + } + } +} + +int +lp_listen (struct lp_worker *worker, int index) +{ + int fd, ret; + struct epoll_event event; + + fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno); + + if (fd >= LP_MAX_FD) + { + err ("socket()=%d >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD); + (void) _close (fd); + return -1; + } + + LP_APPEND (worker->server, fd, lp_init_sess (worker, fd), sess); + + ret = + _bind (fd, (struct sockaddr *) &worker->server_addr[index], + sizeof (worker->server_addr[index])); + ERR_RETURN (ret, -1, "bind(%d, %s)=%d:%d\n", fd, + f_inaddr (&worker->server_addr[index]), ret, errno); + + ret = set_nonblock (fd); + ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno); + + ret = _listen (fd, SOMAXCONN); + ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno); + + event.events = EPOLLIN | EPOLLET; + event.data.u64 = LP_EV_MK (LP_LISTEN_TYPE, fd); + ret = _epoll_ctl (worker->epfd, EPOLL_CTL_ADD, fd, &event); + ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", worker->epfd, fd, ret, + errno); + + DBG ("worker %d server %d fd %d listen on %s\n", worker->index, index, fd, + f_inaddr (&worker->server_addr[index])); + return 0; +} |