aboutsummaryrefslogtreecommitdiffstats
path: root/thirdparty/apps/testapp/lp
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/apps/testapp/lp')
-rw-r--r--thirdparty/apps/testapp/lp/lp.c1138
-rw-r--r--thirdparty/apps/testapp/lp/lp.h615
-rw-r--r--thirdparty/apps/testapp/lp/lpc.c669
-rw-r--r--thirdparty/apps/testapp/lp/lps.c372
4 files changed, 2794 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/lp/lp.c b/thirdparty/apps/testapp/lp/lp.c
new file mode 100644
index 0000000..a9b71d1
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lp.c
@@ -0,0 +1,1138 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "lp.h"
+
+void lp_exec (struct lp_worker *worker);
+
+#define LP_OPTIONS "s:c:bi:e:m:nwEC" DBGOPT "vh"
+
+static const struct option lp_options[] = {
+ {"block", 0, 0, 'b'},
+ {"server", 1, 0, 's'},
+ {"client", 1, 0, 'c'},
+ {"nodelay", 0, 0, 'n'},
+ {"interval", 1, 0, 'i'},
+ {"no-error", 1, 0, 'e'},
+ {"error-msg", 0, 0, 'E'},
+ {"core", 1, 0, 'm'},
+ DBGOPT_LONG {"watch", 0, 0, 'w'},
+ {"no-color", 0, 0, 'C'},
+ {"verbose", 0, 0, 'v'},
+ {"help", 0, 0, 'h'},
+ {0, 0, 0, 0}
+};
+
+static const char *MODES[] = {
+ [LP_RAND] = "*random link",
+ [LP_SYNC] = "=sync increment",
+ [LP_CPS] = "}client per server",
+ [LP_SPC] = "{server per client",
+};
+
+struct lp_var lp = { 0 };
+
+void
+lp_dump_sess (int fd)
+{
+ char buf[256];
+ struct lp_sess *sess = LP_SESS (fd);
+
+ co_init (buf, sizeof (buf));
+
+ co_append (buf, 80,
+ "fd:%d sess:%p state:0x%x test:%u epout:%u io_num:%u query:%d reply:%d\n",
+ fd, sess, sess->state, sess->test, sess->epout, sess->io_num,
+ sess->query, sess->reply);
+ co_append (buf, 50, " nest-sess:%d prev-sess:%p", sess->next_sess,
+ sess->prev_sess);
+ co_app_if (sess->prev_sess, buf, 10, "(%ld)",
+ CON_OF (sess->prev_sess, struct lp_sess, next_sess) - lp.sess);
+ co_append (buf, 50, " nest-rest:%d prev-rest:%p", sess->next_rest,
+ sess->prev_rest);
+ co_app_if (sess->prev_rest, buf, 10, "(%ld)",
+ CON_OF (sess->prev_rest, struct lp_sess, next_rest) - lp.sess);
+ co_append (buf, 30, " time:%ld.%09ld\n", sess->time.tv_sec,
+ sess->time.tv_nsec);
+
+ co_flush (buf);
+}
+
+void
+lp_clean (struct lp_worker *worker)
+{
+ int i, fd;
+
+ while (lp.run_state == LP_CLEAN)
+ {
+ fd = worker->conn.first;
+ if (fd < 0)
+ break;
+ lp_del_conn (worker, fd, LP_SESS (fd));
+ }
+ LP_ASSERT (worker->conn.num == 0);
+ LP_ASSERT (worker->conn.first == -1);
+ LP_ASSERT (worker->conn.last = &worker->conn.first);
+
+ while (lp.run_state == LP_CLEAN)
+ {
+ fd = worker->sess.first;
+ if (fd < 0)
+ break;
+ lp_del_sess (worker, fd);
+ }
+ LP_ASSERT (worker->sess.num == 0);
+ LP_ASSERT (worker->sess.first == -1);
+ LP_ASSERT (worker->sess.last = &worker->sess.first);
+
+ for (i = 0; i < LP_MAX_TEST; ++i)
+ {
+ LP_ASSERT (worker->rest[i].num == 0);
+ LP_ASSERT (worker->rest[i].first == -1);
+ LP_ASSERT (worker->rest[i].last = &worker->rest[i].first);
+ LP_ASSERT (worker->wait[i].num == 0);
+ LP_ASSERT (worker->wait[i].first == -1);
+ LP_ASSERT (worker->wait[i].last = &worker->rest[i].first);
+ }
+}
+
+void *
+lp_thread (void *arg)
+{
+ const uint64_t one = 1;
+ struct lp_worker *worker = (struct lp_worker *) arg;
+
+ (void) __sync_or_and_fetch (&lp.active_worker, one << worker->index);
+
+ futex_wait (&lp.run_state, LP_INIT);
+
+ if (lp.client_mode)
+ lp_client (worker);
+ else
+ lp_server (worker);
+
+ lp_clean (worker);
+
+ out ("%s! worker %d return%s\n", CR, worker->index, CC);
+ (void) __sync_and_and_fetch (&lp.active_worker, ~(one << worker->index));
+
+ return NULL;
+}
+
+void
+lp_round (int id)
+{
+ struct lp_test *test = &lp.test[id];
+ char buf[128];
+
+ co_init (buf, sizeof (buf));
+
+ co_append (buf, 30, "%s! round %d:", CR, id);
+ co_append (buf, 30, " target:%s",
+ test->target == LP_TARGET_INF ? "*" : f_uint (test->target));
+ co_append (buf, 30, " time:%s",
+ test->time == LP_TIME_INF ? "*" : f_uint (test->time));
+ co_append (buf, 30, " up:%s",
+ test->up == LP_UP_INF ? "*" : f_uint (test->up));
+ co_append (buf, 30, " down:%s",
+ test->down == LP_DOWN_INF ? "*" : f_uint (test->down));
+ co_append (buf, 30, " query:%u", test->query);
+ co_append (buf, 30, " reply:%u", test->reply);
+ co_append (buf, 30, " times:%s",
+ test->times == LP_TIMES_INF ? "*" : f_uint (test->times));
+ co_app_if (test->close_after_io, buf, 1, "-");
+ co_append (buf, 30, " period:%u", test->period);
+ co_append (buf, 30, " wait:%u", test->wait);
+ co_append (buf, 30, " %s\n", CC);
+
+ co_flush (buf);
+}
+
+int
+lp_start ()
+{
+ int i;
+ cpu_set_t set;
+
+ lp.sess = (struct lp_sess *) calloc (LP_MAX_FD, sizeof (struct lp_sess));
+ ERR_RETURN (!lp.sess, -1, "Out of memory\n");
+
+ lp.run_state = LP_INIT;
+
+ lp.curr = lp.stat[0];
+ lp.next = lp.stat[1];
+
+ if (lp.interval == 0)
+ lp.interval = LP_INTERVAL_DEF;
+
+ if (lp.core)
+ {
+ CPU_ZERO (&set);
+ for (i = 0; i < 64; ++i)
+ {
+ if (lp.core & (1 << i))
+ CPU_SET (i, &set);
+ }
+ }
+
+ for (i = 0; i < lp.worker_num; ++i)
+ {
+ int ret;
+ struct epoll_event event;
+ struct lp_worker *worker = LP_WORKER (i);
+
+ out ("[%d] creating worker\n", i);
+
+ worker->io_buf = malloc (LP_IOBUF_SIZE);
+ ERR_RETURN (!worker->io_buf, -1, "Out of memory for IO buffer\n");
+
+ worker->ev_buf = malloc (sizeof (struct epoll_event) * LP_EVENT_NUM);
+ ERR_RETURN (!worker->ev_buf, -1, "Out of memory for events buffer\n");
+
+ TP (epoll_create);
+ worker->epfd = _epoll_create (10);
+ ERR_RETURN (worker->epfd < 0, -1, "epoll_create()=%d:%d\n",
+ worker->epfd, errno);
+ worker->ctlfd = eventfd (0, 0);
+ ERR_RETURN (worker->ctlfd < 0, -1, "eventfd()=%d:%d\n", worker->epfd,
+ errno);
+ TQ (epoll_create);
+
+ event.events = EPOLLIN;
+ event.data.u64 = LP_EV_MK (LP_CONTROL_TYPE, worker->ctlfd);
+ ret = _epoll_ctl (worker->epfd, EPOLL_CTL_ADD, worker->ctlfd, &event);
+ ERR_RETURN (ret, -1, "epoll_ctl(epfd:%d, add, evfd:%d,)=%d:%d\n",
+ worker->epfd, worker->ctlfd, ret, errno);
+
+ worker->tid =
+ lb_thread (lp_thread, worker, "lp-%s-%d",
+ lp.client_mode ? "client" : "server", i);
+ ERR_RETURN (worker->tid <= 0, -1, "Create worker thread failed\n");
+
+ if (lp.core)
+ {
+ ret = pthread_setaffinity_np (worker->tid, sizeof (set), &set);
+ if (ret)
+ err ("Bind core failed!\n");
+ }
+
+ if (lp.client_mode)
+ {
+ size_t size;
+
+ size =
+ sizeof (struct lb_run) + sizeof (struct lb_slot) * LP_UP_SLOT;
+ worker->up_run = (struct lb_run *) calloc (1, size);
+ ERR_RETURN (!worker->up_run, -1, "Out of memory for up run\n");
+
+ size =
+ sizeof (struct lb_run) + sizeof (struct lb_slot) * LP_DOWN_SLOT;
+ worker->down_run = (struct lb_run *) calloc (1, size);
+ ERR_RETURN (!worker->down_run, -1, "Out of memory for down run\n");
+
+ out
+ ("[%d] worker %ld created, client:%d server:%d epfd:%d ctlfd:%d\n",
+ worker->index, pthread_self (), worker->client_num,
+ worker->server_num, worker->epfd, worker->ctlfd);
+ }
+ else
+ {
+ int j;
+ TP (lp_listen);
+ for (j = 0; j < worker->server_num; ++j)
+ {
+ if (lp_listen (worker, j))
+ return -1;
+ }
+ TQ (lp_listen);
+ out ("[%d] worker %ld created, server:%d epfd:%d ctlfd:%d\n",
+ worker->index, pthread_self (), worker->server_num,
+ worker->epfd, worker->ctlfd);
+ }
+ }
+
+ out ("%s! running %s\n", CR, CC);
+ if (lp.client_mode)
+ {
+ lp_round (0);
+ }
+
+ lp.run_state = LP_EXEC;
+
+ futex_wake (&lp.run_state, lp.worker_num);
+ return 0;
+}
+
+void
+lp_stop (int state)
+{
+ int i;
+
+ lp.run_state = state;
+
+ for (i = 0; i < lp.worker_num; ++i)
+ {
+ struct lp_worker *worker = LP_WORKER (i);
+ if (worker->tid && worker->ctlfd >= 0)
+ {
+ (void) lp_post_cmd (worker->ctlfd, LP_CMD_STOP);
+ }
+ }
+}
+
+#define _FREE(p) do { if (p) { free(p); p = NULL; } } while (0)
+
+void
+lp_exit ()
+{
+ int i;
+
+ lp_stop (LP_EXIT);
+
+ for (i = 0; i < lp.worker_num; ++i)
+ {
+ int fd;
+ struct lp_worker *worker = LP_WORKER (i);
+
+ if (worker->tid)
+ {
+ const struct timespec wait = { tv_sec: 3, tv_nsec:0 };
+ (void) pthread_timedjoin_np (worker->tid, NULL, &wait);
+ worker->tid = 0;
+ }
+
+ for (fd = worker->server.first; fd >= 0; fd = LP_SESS (fd)->next_sess)
+ _close (fd);
+
+ lp_clean (worker);
+
+ if (worker->ctlfd >= 0)
+ {
+ _close (worker->ctlfd);
+ worker->ctlfd = -1;
+ }
+ if (worker->epfd >= 0)
+ {
+ _close (worker->epfd);
+ worker->epfd = -1;
+ }
+
+ _FREE (worker->server_addr);
+ _FREE (worker->client_addr);
+ _FREE (worker->up_run);
+ _FREE (worker->down_run);
+ _FREE (worker->io_buf);
+ _FREE (worker->ev_buf);
+ }
+
+ _FREE (lp.sess);
+}
+
+inline static const char *
+lp_errmsg (int e)
+{
+ static const char *errmsg[LP_ERRNO_NUM] = { 0 };
+ if (NULL == errmsg[e])
+ errmsg[e] = strerror (e);
+ return errmsg[e];
+}
+
+const static char *lp_cntmsg[LP_CNT_NUM] = {
+ [LP_E_SOCKET] = "socket",
+ [LP_E_BIND] = "bind",
+ [LP_E_ACCEPT] = "accept",
+ [LP_E_CONNECT] = "connect",
+ [LP_E_NODELAY] = "nodelay",
+ [LP_E_NONBLOCK] = "nonblock",
+ [LP_E_REUSEADDR] = "reuseaddr",
+ [LP_E_REUSEPORT] = "reuseport",
+ [LP_E_RECV] = "recv",
+ [LP_E_SEND] = "send",
+
+ [LP_E_EPADD] = "ep-add",
+ [LP_E_EPMOD] = "ep-mod",
+ [LP_E_EPDEL] = "ep-del",
+ [LP_E_EPWAIT] = "ep-wait",
+ [LP_E_EPUNUSED] = "ep-unused",
+ [LP_E_EPHUP] = "ep-hup",
+ [LP_E_EPERR] = "ep-err",
+ [LP_E_EPINOUT] = "ep-inout",
+ [LP_E_EVIDLE] = "ep-idle",
+ [LP_E_EPEVENT] = "ep-event",
+
+ [LP_E_IOSHUT] = "io-shut",
+ [LP_E_IOSIZE] = "io-size",
+ [LP_E_IOMORE] = "io-more",
+ [LP_E_IOEXCEED] = "io-exceed",
+ [LP_E_IOSEND0] = "io-send0",
+
+ [LP_W_CREATE] = "cre",
+ [LP_W_SOCKET] = "soc",
+ [LP_W_BIND] = "bin",
+ [LP_W_CONNECT] = "con",
+ [LP_W_CONNECTED] = "est",
+ [LP_W_ACCEPT] = "acc",
+ [LP_W_CLOSE] = "clo",
+};
+
+void
+lp_output (char buf[], const struct lp_stat *stat, uint64_t nsec, int mask)
+{
+ int i;
+ const uint64_t *cnt = stat->cnt;
+ static int w_num = 0, w_up = 0, w_down = 0, w_conn = 0;
+ static int w_q_mb = 0, w_q_cp = 0, w_r_mb = 0, w_r_cp = 0;
+
+ co_append (buf, 5, " %s ", CH);
+ w_num = co_wr_uint (buf, cnt[LP_REC_NUM], w_num);
+ co_append (buf, 5, "%s", CC);
+
+ co_append (buf, 3, " < ");
+ w_up = co_wr_uint (buf, lb_gdiv (cnt[LP_CONNECTED], nsec), w_up);
+ if (lp.client_mode && !lp.block_connecting)
+ {
+ co_append (buf, 1, " / ");
+ w_conn = co_wr_uint (buf, lb_gdiv (cnt[LP_CONNECT], nsec), w_conn);
+ }
+ co_append (buf, 3, " - ");
+ w_down = co_wr_uint (buf, lb_gdiv (cnt[LP_CLOSE], nsec), w_down);
+ co_append (buf, 3, " > ");
+
+ co_append (buf, 3, " [ ");
+ w_q_mb =
+ co_wr_uint (buf,
+ lb_gdiv (cnt[LP_QUERY_BYTE] * 8, nsec /* * (1000000 / 8) */ ),
+ w_q_mb);
+ co_append (buf, 1, " ");
+ w_q_cp = co_wr_uint (buf, lb_gdiv (cnt[LP_QUERY_COMP], nsec /* * 1000 */ ),
+ w_q_cp);
+ co_append (buf, 3, " : ");
+ w_r_mb =
+ co_wr_uint (buf,
+ lb_gdiv (cnt[LP_REPLY_BYTE] * 8, nsec /* * (1000000 / 8) */ ),
+ w_r_mb);
+ co_append (buf, 1, " ");
+ w_r_cp = co_wr_uint (buf, lb_gdiv (cnt[LP_REPLY_COMP], nsec /* * 1000 */ ),
+ w_r_cp);
+ co_append (buf, 3, " ] ");
+
+ co_app_if (cnt[LP_FAILED], buf, 40, " F:%s%s%s",
+ FR__, f_uint (lb_gdiv (cnt[LP_FAILED], nsec)), CC);
+
+ if (mask & LP_W_SIGN)
+ {
+ co_append (buf, 8, " time{");
+ for (i = LP_W_BEGIN; i < LP_W_END; i += 2)
+ co_app_if (cnt[i], buf, 60, " %s:%s", lp_cntmsg[i],
+ f_uint (cnt[i + 1] / cnt[i]));
+ co_append (buf, 4, " }");
+ }
+
+ if (mask & LP_E_SIGN)
+ {
+ co_append (buf, 8, " err{");
+ for (i = LP_E_BEGIN; i < LP_E_END; ++i)
+ co_app_if (cnt[i], buf, 60, " %s:%s", lp_cntmsg[i], f_uint (cnt[i]));
+ co_append (buf, 4, " }");
+ }
+
+ if (!lp.err_msg && (mask & LP_ERR_SIGN))
+ {
+ co_append (buf, 5, " E:{");
+ for (i = 1; i < LP_ERRNO_NUM; ++i)
+ co_app_if (stat->err[i], buf, 40, " %d:%lu", i, stat->err[i]);
+ co_app_if (stat->err[0], buf, 30, " -:%lu", stat->err[0]);
+ co_append (buf, 5, " }");
+ }
+
+ co_append (buf, 4, "\n");
+
+ if (lp.err_msg && (mask & LP_ERR_SIGN))
+ {
+ for (i = 1; i < LP_ERRNO_NUM; ++i)
+ co_app_if (stat->err[i], buf, 100, "<E%d:%lu> %s\n", i, stat->err[i],
+ lp_errmsg (i));
+ co_app_if (stat->err[0], buf, 100, "<E-:%s> Other error\n",
+ f_uint (stat->err[0]));
+ }
+}
+
+void
+lp_timer (uint64_t nsec)
+{
+ const static struct timespec delay = {.tv_sec = 0,.tv_nsec =
+ LP_DELAY_MS * 1000 * 1000
+ };
+ static int line = -2;
+ static int base;
+
+ int i, second, total = 0;
+ char buf[256];
+ struct tm *lc;
+ struct lp_stat *curr, sum = { 0 };
+
+ curr = lp.curr;
+ lp.curr = lp.next;
+ lp.next = curr;
+
+ {
+ time_t tv = time (NULL);
+ lc = localtime (&tv);
+ }
+
+ co_init (buf, sizeof (buf));
+
+ (void) nanosleep (&delay, NULL) /* wait for cps.curr use */ ;
+
+ for (i = 0; i < lp.worker_num; ++i)
+ {
+ int j, mask = 0;
+ uint64_t count = 0;
+
+ curr->cnt[LP_REC_NUM] = LP_WORKER (i)->sess.num;
+
+ for (count = 0, j = 0; j < LP_R_END; ++j)
+ {
+ sum.cnt[j] += curr->cnt[j];
+ count += curr->cnt[j];
+ }
+ if (count)
+ mask |= LP_R_SIGN;
+
+ for (count = 0, j = LP_E_BEGIN; j < LP_E_END; ++j)
+ {
+ sum.cnt[j] += curr->cnt[j];
+ count += curr->cnt[j];
+ }
+ if (count)
+ mask |= LP_E_SIGN;
+
+ if (lp.watch)
+ {
+ for (count = 0, j = LP_W_BEGIN; j < LP_W_END; j += 2)
+ {
+ sum.cnt[j] += curr->cnt[j];
+ sum.cnt[j + 1] += curr->cnt[j + 1];
+ count += curr->cnt[j];
+ }
+ if (count)
+ mask |= LP_W_SIGN;
+ }
+
+ for (count = 0, j = 0; j < LP_ERRNO_NUM; ++j)
+ {
+ sum.err[j] += curr->err[j];
+ count += curr->err[j];
+ }
+ if (count)
+ mask |= LP_ERR_SIGN;
+
+ if (mask && lp.verbose)
+ {
+ co_append (buf, 10, " %4dw ", i);
+ lp_output (buf, curr + i, nsec, mask);
+ }
+
+ total |= mask;
+ }
+
+ if (total)
+ {
+ line = line < 0 ? 0 : line + 1;
+ if (line == 0)
+ {
+ base = lc->tm_hour * 3600 + lc->tm_min * 60 + lc->tm_sec;
+ }
+ }
+ else
+ {
+ line = line > 0 ? 0 : line - 1;
+ if (line == -1)
+ co_append (buf, 5, "\n");
+ }
+
+ if (line >= 0)
+ {
+ if (line)
+ {
+ second = (lc->tm_hour * 3600 + lc->tm_min * 60 + lc->tm_sec) - base;
+ while (second < 0)
+ second += (24 * 3600);
+ co_append (buf, 10, " %5d", second);
+ }
+ else
+ {
+ co_append (buf, 10, " %2d:%02d ", lc->tm_hour, lc->tm_min);
+ }
+ lp_output (buf, &sum, nsec, total);
+ }
+
+ co_flush (buf);
+
+ (void) memset (curr, 0, sizeof (struct lp_stat) * lp.worker_num);
+}
+
+int
+lp_loop ()
+{
+ const static struct timespec timeout = {.tv_sec = 0,.tv_nsec = LP_LOOP_TIMER
+ };
+
+ struct timespec begin, from, last_begin;
+ time_t next_time = lp.interval;
+
+ LB_TIME (begin);
+ from = begin;
+ last_begin = begin;
+
+ while (lp.run_state > 0)
+ {
+ struct timespec now;
+
+ (void) nanosleep (&timeout, NULL);
+
+ LB_TIME (now);
+
+ if (lp.run_state == LP_CLEAN)
+ {
+ if (lp.active_worker == 0)
+ break;
+ }
+
+ if (lp.client_mode && lp.run_state == LP_EXEC)
+ {
+ struct lp_test *test = &lp.test[lp.test_id];
+ uint64_t total = lp_total_sess ();
+
+ if (LB_CMP_S (now, last_begin, test->time)
+ || (test->up >= test->down ? total >= test->target : total <=
+ test->target))
+ {
+ if (lp.test_id >= lp.test_num - 1)
+ {
+ lp.run_state = LP_CLEAN;
+ out ("%s! cleanup%s\n", CR, CC);
+ }
+ else
+ {
+ lp.test_id++; /* run changed */
+ last_begin = now;
+ lp_round (lp.test_id);
+ }
+ }
+ }
+
+ if (!LB_CMP_S (now, begin, next_time))
+ continue;
+
+ lp_timer (LB_SUB_NS (now, from));
+
+ from = now;
+ next_time += lp.interval;
+
+ if (!lp.client_mode)
+ {
+ const struct timespec rest = {.tv_sec = lp.interval - 1,.tv_nsec =
+ LP_LOOP_REST
+ };
+ (void) nanosleep (&rest, NULL);
+ }
+ }
+
+ return 0;
+}
+
+#ifndef SIGNAL_LP_C_
+#define SIGNAL_LP_C_
+
+void
+lp_break (int s)
+{
+ DBG (" SIGNALED %d running:%d\n", s, lp.run_state);
+ out ("\n");
+
+ if (lp.run_state == LP_CLEAN)
+ {
+ out ("%s! safe exit%s\n", CR, CC);
+ lp_exit ();
+ }
+ else if (lp.run_state >= 0)
+ {
+ out ("%s! clean exit%s\n", CR, CC);
+ lp_stop (LP_CLEAN);
+ }
+ else
+ {
+ out ("%s! direct exit%s\n", CR, CC);
+ exit (1);
+ }
+}
+
+void
+lp_sigpipe (int s)
+{
+ DBG ("SIGPIPE\n");
+}
+
+int
+lp_init ()
+{
+ struct sigaction s = { 0 };
+
+ (void) sigemptyset (&s.sa_mask);
+
+ s.sa_flags = SA_NODEFER;
+ s.sa_handler = (void *) lp_break;
+ (void) sigaction (SIGINT, &s, NULL);
+ (void) sigaction (SIGQUIT, &s, NULL);
+
+ s.sa_handler = lp_sigpipe;
+ (void) sigaction (SIGPIPE, &s, NULL);
+
+// lb_sigsegv_setup();
+
+ lp.CPU_NUM = get_nprocs ();
+ if (lp.CPU_NUM <= 0)
+ lp.CPU_NUM = 1;
+
+ {
+ struct timespec t;
+ LB_TIME (t);
+ srandom (getpid () + t.tv_sec + (t.tv_sec >> 32) + t.tv_nsec +
+ (t.tv_nsec >> 32));
+ }
+
+ return 0;
+}
+
+#endif
+
+void
+lp_usage (const char *name)
+{
+ out ("USAGE: %s [OPTIONS] TEST-SET... # %s version\n", name, VERSION_NAME);
+}
+
+void
+lp_help (const char *name)
+{
+ lp_usage (name);
+ out (" Options:\n");
+ out (" -s, --server LIST set one server address list\n");
+ out (" X.Y.Z.M-N:P1-P2,...\n");
+ out (" -c, --client LIST set one client address list\n");
+ out
+ (" CLIENT*SERVER: R.S.T.K-J:Pa-Pb,...*X.Y.Z.M-N:P1-P2,...\n");
+ out (" A,B,C,D*1,2 random link\n");
+ out (" A,B,C,D=1,2 A1B2C1D2\n");
+ out (" A,B,C,D}1,2 A1B1C1D1 A2B2C2D2\n");
+ out
+ (" A,B,C,D{1,2 A1A2 B1B2 C1C2 D1D2\n");
+ out
+ (" -b, --block set block mode for connecting(client only)\n");
+ out (" -n, --nodelay set nodelay\n");
+ out (" -i, --interval # report time(default:%ds max:%ds)\n",
+ LP_INTERVAL_DEF, LP_INTERVAL_MAX);
+ out
+ (" -m, --core #HEX set bind cpu core mask(hex mode)\n");
+#ifdef DEBUG
+ out (" -D, --debug show debug information\n");
+#endif
+ out
+ (" -w, --watch show watch time statistic\n");
+ out (" -e, --no-error #-# skip error\n");
+ out (" -E, --error-msg show error message\n");
+ out (" -C, --no-color no color\n");
+ out (" -v, --verbose show worker statistics\n");
+ out (" -h, --help help\n");
+ out (" TEST-SET for client\n");
+ out
+ (" TARGET@TIME+UP-DOWN=QUERY:REPLY*TIMES-/PERIOD%%WAIT (client only)\n");
+ out (" TARGET max connection(default: INFINITE)\n");
+ out (" @TIME max time(0 or default:INFINITE)\n");
+ out
+ (" +UP connect rate(default: 0 no connnect; *: INFINITE)\n");
+ out
+ (" -DOWN close rate(default: 0 no close; *: INFINITE)\n");
+ out (" =... IO set(default: no IO)\n");
+ out (" QUERY send query data len(%u-%u)\n",
+ LP_QUERY_MIN, LP_QUERY_MAX);
+ out
+ (" :REPLY receive response data len(0-%d; default: same with QUERY)\n",
+ LP_REPLY_MAX);
+ out
+ (" *TIMES- IO times(0 or default: INFINITE; suffix-: IO then close)\n");
+ out
+ (" /PERIOD IO period time(0-%us; default: one by one)\n",
+ LP_PERIOD_MAX);
+ out
+ (" %%WAIT first IO wait time(0-%us; default: 0 no wait)\n",
+ LP_WAIT_MAX);
+ out (" UNITS:\n");
+ out (" k=1000 m=1000k g=1000m w=10000 K=1024 M=1024K G=1024M\n");
+ out (" s=Seconds m=Minutes h=Hours\n");
+}
+
+int
+lp_args_test (const char *arg)
+{
+ const char *p;
+ struct lp_test *test = &lp.test[lp.test_num];
+
+ ERR_RETURN (lp.test_num >= LP_MAX_TEST, -1, "Too many test set, max:%d\n",
+ LP_MAX_TEST);
+ (void) memset (test, 0, sizeof (struct lp_test));
+
+ if (*arg >= '0' && *arg <= '9')
+ {
+ test->target = p_value (arg, LP_TARGET_MAX, UB_1kmgwKMG, &p);
+ ERR_RETURN (!p, -1, "Invalid test TARGET set: '%s'\n", arg);
+ }
+ else
+ {
+ test->target = LP_TARGET_DEF;
+ p = arg;
+ }
+
+ if (*p == '@')
+ {
+ test->time = p_value (p + 1, LP_TIME_MAX, UB_hms1, &p);
+ ERR_RETURN (!p, -1, "Invalid test TIME set: '%s'\n", arg);
+ }
+ else
+ {
+ test->time = LP_TIME_DEF;
+ }
+
+ if (*p == '+')
+ {
+ if (p[1] == '*')
+ {
+ test->up = LP_UP_INF;
+ p += 2;
+ }
+ else
+ {
+ test->up = p_value (p + 1, LP_UP_MAX, UB_1kmgwKMG, &p);
+ ERR_RETURN (!p, -1, "Invalid test UP-RATE set: '%s'\n", arg);
+ }
+ }
+ else
+ {
+ test->up = LP_UP_DEF;
+ }
+
+ if (*p == '-')
+ {
+ if (p[1] == '*')
+ {
+ test->down = LP_DOWN_INF;
+ p += 2;
+ }
+ else
+ {
+ test->down = p_value (p + 1, LP_DOWN_MAX, UB_1kmgwKMG, &p);
+ ERR_RETURN (!p, -1, "Invalid test DOWN-RATE set: '%s'\n", arg);
+ }
+ }
+ else
+ {
+ test->down = LP_DOWN_DEF;
+ }
+
+ if (*p == '=')
+ {
+ test->query = p_value (p + 1, LP_QUERY_MAX, UB_1kmgwKMG, &p);
+ ERR_RETURN (!p
+ || test->query < LP_QUERY_MIN, -1,
+ "Invalid test QUERY '%s'\n", arg);
+
+ if (*p == ':')
+ {
+ test->reply = p_value (p + 1, LP_REPLY_MAX, UB_1kmgwKMG, &p);
+ ERR_RETURN (!p, -1, "Invalid test REPLY set: '%s'\n", arg);
+ }
+ else
+ {
+ test->reply = test->query;
+ }
+
+ if (*p == '*')
+ {
+ test->times = p_uint (p + 1, LP_TIMES_MAX, &p);
+ ERR_RETURN (!p, -1, "Invalid test TIMES set: '%s'\n", arg);
+ if (test->times == 0)
+ test->times = LP_TIMES_INF;
+ if (*p == '-')
+ {
+ test->close_after_io = 1;
+ p++;
+ }
+ }
+ else
+ {
+ test->times = LP_TIMES_DEF;
+ }
+
+ if (*p == '/')
+ {
+ test->period = p_value (p + 1, LP_PERIOD_MAX, UB_hms1, &p);
+ ERR_RETURN (!p, -1, "Invalid test PRIOD set: '%s'\n", arg);
+ if (*p == '%')
+ {
+ test->wait = p_value (p + 1, LP_WAIT_MAX, UB_hms1, &p);
+ ERR_RETURN (!p, -1, "Invalid test WAIT set: '%s'\n", arg);
+ }
+ else
+ {
+ test->wait = LP_WAIT_DEF;
+ }
+ }
+ else
+ {
+ test->period = LP_PERIOD_DEF;
+ test->wait = 0;
+ }
+ }
+ else
+ {
+ test->query = 0;
+ test->reply = 0;
+ test->times = 0;
+ test->period = 0;
+ test->wait = 0;
+ }
+
+ ERR_RETURN (*p, -1, "Invalid test set: '%s'\n", arg);
+
+ if (test->up < test->down)
+ test->down_mode = 1;
+ lp.test_num++;
+ return 0;
+}
+
+inline static void
+lp_noerr (int b, int e)
+{
+ for (; b <= e; ++b)
+ lp.no_err[b / 64] |= (1 << (b % 64));
+}
+
+/* -e M-N */
+int
+lp_args_noerr (const char *arg)
+{
+ int b, e;
+
+ b = (int) p_uint (arg, LP_ERRNO_NUM, &arg);
+ if (!arg)
+ return -1;
+ if (*arg == '-')
+ {
+ e = (int) p_uint (arg + 1, LP_ERRNO_NUM, &arg);
+ if (!arg || e < b)
+ return -1;
+ }
+ else
+ {
+ e = b;
+ }
+
+ if (*arg != 0)
+ return -1;
+
+ lp_noerr (b, e);
+
+ return 0;
+}
+
+struct lp_worker *
+lp_init_worker ()
+{
+ int i;
+ struct lp_worker *worker = LP_WORKER (lp.worker_num);
+
+ ERR_RETURN (lp.worker_num >= LP_MAX_WORKER, NULL,
+ "Too many workers, limit:%d\n", LP_MAX_WORKER);
+
+ (void) memset (worker, 0, sizeof (*worker));
+
+ worker->index = lp.worker_num;
+ worker->epfd = -1;
+ worker->ctlfd = -1;
+
+ lp_init_head (&worker->server);
+ lp_init_head (&worker->sess);
+ lp_init_head (&worker->conn);
+ for (i = 0; i < LP_MAX_TEST; ++i)
+ {
+ lp_init_head (&worker->rest[i]);
+ lp_init_head (&worker->wait[i]);
+ }
+
+ lp.worker_num++;
+
+ return worker;
+}
+
+int
+lp_args (int argc, char *argv[])
+{
+ int i, opt, index, ret;
+ struct lp_worker *worker;
+ const char *arg;
+ int server_mode = 0;
+
+ while (EOF !=
+ (opt = getopt_long (argc, argv, LP_OPTIONS, lp_options, &index)))
+ {
+ const char *end;
+
+ switch (opt)
+ {
+ case 'c':
+ ERR_RETURN (server_mode, -1, "Only server or client\n");
+ lp.client_mode = 1;
+ worker = lp_init_worker ();
+ if (!worker)
+ return -1;
+ worker->client_num = p_addrin_list (optarg, &worker->client_addr,
+ LP_CLIENT_MAX,
+ PA_DEF_PORT | PAL_NO_SPACE,
+ &arg);
+ ERR_RETURN (worker->client_num <= 0, -1,
+ "Bad client for address list '%s'\n", optarg);
+ for (i = 0; i < CNT_OF (MODES); ++i)
+ {
+ if (*arg == MODES[i][0])
+ break;
+ }
+ ERR_RETURN (i >= CNT_OF (MODES), -1,
+ "Bad mode for address list '%s'\n", optarg);
+ worker->link_mode = i;
+ arg++;
+ worker->server_num = p_addrin_list (arg, &worker->server_addr,
+ LP_SERVER_MAX,
+ PA_MUST_PORT | PAL_NO_SPACE,
+ NULL);
+ ERR_RETURN (worker->server_num <= 0, -1,
+ "Bad server for address list '%s'\n", arg);
+ break;
+ case 's':
+ ERR_RETURN (lp.client_mode, -1, "Only server or client\n");
+ server_mode = 1;
+ worker = lp_init_worker ();
+ if (!worker)
+ return -1;
+ worker->server_num =
+ p_addrin_list (optarg, &worker->server_addr, LP_SERVER_MAX,
+ PA_MUST_PORT | PAL_NO_SPACE, NULL);
+ ERR_RETURN (worker->server_num <= 0, -1,
+ "Bad server for address list '%s'\n", optarg);
+ break;
+
+ case 'i':
+ lp.interval = (int) p_int (optarg, LP_INTERVAL_MAX, &end);
+ ERR_RETURN (!end || *end, -1, "Invalid interval '%s'\n", optarg);
+ break;
+ case 'e':
+ ret = lp_args_noerr (optarg);
+ ERR_RETURN (ret, ret, "Invalid no-error set '%s'\n", optarg);
+ break;
+ case 'm':
+ lp.core = p_hex (optarg, &end);
+ ERR_RETURN (!end
+ || lp.core >= (1 << lp.CPU_NUM), -1,
+ "Invalid bind core set\n");
+ break;
+ case 'C':
+ lb_set_color (LB_NO_COLOR);
+ break;
+
+ case 'b':
+ lp.block_connecting = 1;
+ break;
+ case 'n':
+ lp.nodelay = 1;
+ break;
+ case 'w':
+ lp.watch = 1;
+ break;
+ case 'E':
+ lp.err_msg = 1;
+ break;
+ case 'v':
+ lp.verbose = 1;
+ break;
+
+#ifdef DEBUG
+ case 'D':
+ enable_debug = 1;
+ break;
+#endif
+ case 'h':
+ lp_help (argv[0]);
+ exit (0);
+ case '?':
+ err ("Invalid arguments\n");
+ return -1;
+ default:
+ err ("Unknown option '%c'.\n", opt);
+ return -1;
+ }
+ }
+
+ ERR_RETURN (lp.worker_num <= 0, -1,
+ "Please set server or client address\n");
+
+ if (lp.client_mode)
+ {
+ for (index = optind; index < argc; ++index)
+ {
+ ret = lp_args_test (argv[index]);
+ if (ret)
+ return ret;
+ }
+ ERR_RETURN (lp.test_num <= 0, -1, "Please set test\n");
+ }
+ else
+ {
+ ERR_RETURN (optind < argc, -1, "Unknown option '%s'\n", argv[optind]);
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ if (argc <= 1)
+ {
+ lp_usage (argv[0]);
+ return 0;
+ }
+
+ if (lp_init ())
+ return 1;
+
+ if (lp_args (argc, argv) == 0 && lp_start () == 0)
+ lp_loop ();
+
+ lp_exit ();
+
+ return 0;
+}
diff --git a/thirdparty/apps/testapp/lp/lp.h b/thirdparty/apps/testapp/lp/lp.h
new file mode 100644
index 0000000..4a2a09a
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lp.h
@@ -0,0 +1,615 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef _LP_H_
+#define _LP_H_ 1
+
+#define LP_DBG 1
+
+#if LP_DBG
+#define LP_IF_DBG(line) line
+#else
+#define LP_IF_DBG(line) ((void)0)
+#endif
+
+#define LP_ASSERT(cond) LP_IF_DBG(assert(cond))
+
+#define LP_DELAY_MS 5
+#define LP_LOOP_TIMER (49 * 1000 * 1000)
+#define LP_LOOP_REST (900 * 1000 * 1000)
+
+#define LP_UP_SLOT 1024
+#define LP_UP_NSEC (1000 * 1000)
+
+#define LP_DOWN_SLOT 1024
+#define LP_DOWN_NSEC (1000 * 1000)
+
+#define LP_MAX_FD (16 * 1024 * 1024)
+#define LP_MAX_WORKER 64
+#define LP_MAX_TEST 10
+
+#define LP_IOBUF_SIZE (128 * 1024)
+
+#define LP_EVENT_NUM 256
+
+#define LP_SERVER_MAX 10000
+#define LP_CLIENT_MAX 100000
+
+#define LP_INFINITE (0xFFFFffff)
+#define LP_UNLIMITED LP_INFINITE
+#define LP_VALUE_MAX (LP_INFINITE - 1)
+
+#define LP_INTERVAL_MIN 1
+#define LP_INTERVAL_MAX 60
+#define LP_INTERVAL_DEF 1
+
+#define LP_TARGET_INF LP_INFINITE
+#define LP_TARGET_MAX (16 * 1000 * 1000) /* 16m max */
+#define LP_TARGET_DEF LP_INFINITE
+
+#define LP_TIME_INF LP_INFINITE
+#define LP_TIME_MAX (24 * 60 * 60)
+#define LP_TIME_DEF LP_TIME_INF
+
+#define LP_UP_MAX ( 1 * 1000 * 1000)
+#define LP_UP_INF LP_INFINITE
+#define LP_UP_DEF 0
+
+#define LP_DOWN_MAX (1 * 1000 * 1000)
+#define LP_DOWN_INF LP_INFINITE
+#define LP_DOWN_DEF 0
+
+#define LP_QUERY_MIN 8 /* 2 unsigned int */
+#define LP_QUERY_MAX 65536
+
+#define LP_REPLY_MAX (10 * 1024 * 1024) /* 10M */
+
+#define LP_TIMES_INF LP_INFINITE
+#define LP_TIMES_MAX LP_VALUE_MAX
+#define LP_TIMES_DEF LP_TIMES_INF
+
+#define LP_PERIOD_MIN 1
+#define LP_PERIOD_MAX (60 * 60)
+#define LP_PERIOD_DEF 1
+
+#define LP_WAIT_MAX (60 * 60)
+#define LP_WAIT_DEF 0
+
+#define LP_FLAG_MASK (0xffffFFFFull << 32)
+#define LP_TYPE_MASK (0x3ull << 32)
+#define LP_CONTROL_TYPE (0x1ull << 32)
+#define LP_LISTEN_TYPE (0x2ull << 32)
+#define LP_SESSION_TYPE (0x3ull << 32)
+
+inline static uint64_t
+LP_EV_MK (uint64_t flag, int fd)
+{
+ return flag | (uint64_t) (uint32_t) fd;
+}
+
+inline static int
+LP_EV_FD (uint64_t data_u64)
+{
+ return (int) (uint32_t) data_u64;
+}
+
+inline static uint64_t
+LP_EV_TYPE (uint64_t data_u64)
+{
+ return data_u64 & LP_TYPE_MASK;
+}
+
+inline static uint64_t
+LP_EV_FLAG (uint64_t data_u64)
+{
+ return data_u64 & LP_FLAG_MASK;
+}
+
+#define LP_R_SIGN 1
+#define LP_E_SIGN 2
+#define LP_W_SIGN 4
+#define LP_ERR_SIGN 8
+
+enum
+{
+ LP_CONNECTED,
+ LP_CONNECT,
+ LP_CLOSE,
+ LP_FAILED,
+
+ LP_QUERY_COMP,
+ LP_QUERY_BYTE,
+ LP_REPLY_COMP,
+ LP_REPLY_BYTE,
+
+ LP_REC_NUM,
+
+ LP_R_END,
+ LP_E_BEGIN = LP_R_END,
+
+ LP_E_SOCKET = LP_E_BEGIN,
+ LP_E_BIND,
+ LP_E_ACCEPT,
+ LP_E_CONNECT,
+ LP_E_NODELAY,
+ LP_E_NONBLOCK,
+ LP_E_REUSEADDR,
+ LP_E_REUSEPORT,
+ LP_E_RECV,
+ LP_E_SEND,
+
+ LP_E_EPADD,
+ LP_E_EPMOD,
+ LP_E_EPDEL,
+ LP_E_EPWAIT,
+ LP_E_EPUNUSED,
+ LP_E_EPHUP,
+ LP_E_EPERR,
+ LP_E_EPINOUT,
+ LP_E_EVIDLE,
+ LP_E_EPEVENT,
+
+ LP_E_IOSHUT,
+ LP_E_IOSIZE,
+ LP_E_IOMORE,
+ LP_E_IOEXCEED,
+ LP_E_IOSEND0,
+
+ LP_E_END,
+ LP_W_BEGIN = LP_E_END,
+
+ LP_W_CREATE = LP_W_BEGIN, LP_T_CREATE,
+ LP_W_SOCKET, LP_T_SOCKET,
+ LP_W_BIND, LP_T_BIND,
+ LP_W_CONNECT, LP_T_CONNECT,
+ LP_W_CONNECTED, LP_T_CONNECTED,
+ LP_W_ACCEPT, LP_T_ACCEPT,
+ LP_W_CLOSE, LP_T_CLOSE,
+
+ LP_W_END,
+
+ LP_CNT_NUM = LP_W_END
+};
+
+#define LP_ERRNO_NUM 256
+#define LP_NOERR_NUM ((LP_ERRNO_NUM + 63) / 64)
+
+struct lp_stat
+{
+ uint64_t cnt[LP_CNT_NUM];
+ uint64_t err[LP_ERRNO_NUM];
+};
+
+#define LP_STAT(worker) (lp.curr + (worker)->index)
+#define LP_ADD(worker, id, num) (LP_STAT(worker)->cnt[(id)] += (num))
+#define LP_CNT(worker, id) (++LP_STAT(worker)->cnt[(id)])
+#define LP_ERR(worker, id, e) do { \
+ unsigned int _e = (unsigned int)(e); \
+ if (_e >= LP_ERRNO_NUM) \
+ _e = 0; \
+ if (0 == (lp.no_err[_e / 64] & (1 << (_e % 64)))) { \
+ struct lp_stat *_stat = LP_STAT(worker); \
+ _stat->cnt[(id)]++; \
+ _stat->err[_e]++; \
+ } \
+} while (0)
+#define LP_CNT2(worker, id1, id2) do { \
+ struct lp_stat *_stat = LP_STAT(worker); \
+ _stat->cnt[(id1)]++; \
+ _stat->cnt[(id2)]++; \
+} while (0)
+#define LP_ERR2(worker, id1, id2, e) do { \
+ unsigned int _e = (unsigned int)(e); \
+ if (_e >= LP_ERRNO_NUM) \
+ _e = 0; \
+ if (0 == (lp.no_err[_e / 64] & (1 << (_e % 64)))) { \
+ struct lp_stat *_stat = LP_STAT(worker); \
+ _stat->cnt[(id1)]++; \
+ _stat->cnt[(id2)]++; \
+ _stat->err[_e]++; \
+ } \
+} while (0)
+
+#define LP_TIME_SET(begin) struct timespec begin; \
+ do { \
+ if (lp.watch) \
+ LB_TIME(begin); \
+ } while (0)
+#define LP_TIME_REG(worker, id, nsec) do { \
+ if (lp.watch) { \
+ uint64_t *_w = &(LP_STAT(worker)->cnt[(id)]); \
+ _w[0] ++; \
+ _w[1] += nsec; \
+ } \
+} while (0)
+#define LP_TIME_FOR(worker, id, begin, end) LP_TIME_REG((worker), (id), LB_SUB_NS((end), (begin)))
+#define LP_TIME_END(worker, id, begin) do { \
+ LP_TIME_SET(_end); \
+ LP_TIME_FOR((worker), (id), (begin), _end); \
+} while (0)
+
+struct lp_io
+{
+ int query;
+ int reply;
+} __attribute__ ((__packed__));
+
+struct lp_head
+{
+ uint32_t num;
+ int first;
+ int *last;
+};
+
+enum
+{
+ LP_S_UNUSED = 0,
+ LP_S_PREPARE = 0x20,
+ LP_S_CONNECTING = 0x80,
+
+ LP_S_CONNECTED = 0x10,
+
+ LP_S_IDLE = LP_S_CONNECTED | 0,
+ LP_S_QUERY = LP_S_CONNECTED | 1,
+ LP_S_REPLY = LP_S_CONNECTED | 2,
+ LP_S_REST = LP_S_CONNECTED | 4,
+ LP_S_WAIT = LP_S_CONNECTED | 8,
+};
+
+struct lp_sess
+{
+ uint8_t state;
+ uint8_t test;
+ uint8_t epout;
+ uint32_t io_num;
+ int query;
+ int reply;
+
+ int *prev_sess;
+ int next_sess;
+ int next_rest;
+ int *prev_rest;
+ struct timespec time;
+
+#if LP_DBG
+ uint16_t work;
+ uint16_t anum;
+ uint16_t fnum;
+#endif
+};
+
+struct lp_test
+{
+ uint32_t target;
+ uint32_t time;
+
+ uint32_t up;
+ uint32_t down;
+
+ int query;
+ int reply;
+
+ uint32_t times;
+ uint32_t period;
+ uint32_t wait;
+
+ uint8_t down_mode;
+ uint8_t close_after_io;
+ uint8_t _pad[2];
+};
+
+enum
+{
+ LP_SYNC,
+ LP_RAND,
+ LP_CPS,
+ LP_SPC,
+};
+
+struct lp_worker
+{
+ int index;
+ int epfd;
+ int ctlfd;
+ int server_num;
+ int client_num;
+ int link_mode;
+
+ pthread_t tid;
+
+ struct lp_head server;
+ struct lp_head sess; /* ESTABLISHED */
+ struct lp_head conn; /* connecting(for nonblock of client) */
+ struct lp_head rest[LP_MAX_TEST]; /* io wait queue */
+ struct lp_head wait[LP_MAX_TEST]; /* io wait queue */
+
+ struct sockaddr_in *server_addr;
+ struct sockaddr_in *client_addr;
+
+ struct lb_run *up_run;
+ struct lb_run *down_run;
+
+ struct epoll_event *ev_buf;
+
+ void *io_buf;
+};
+
+enum lp_state
+{
+ LP_EXIT = -1,
+ LP_INIT = 0,
+ LP_EXEC = 1,
+ LP_CLEAN = 2,
+};
+
+struct lp_var
+{
+ int CPU_NUM;
+ volatile int run_state;
+ int worker_num;
+
+ uint8_t verbose;
+ uint8_t watch;
+ uint8_t err_msg;
+ uint8_t _pad;
+
+ int client_mode;
+ int block_connecting;
+
+ int interval;
+ int nodelay;
+
+ int test_num;
+ volatile int test_id;
+
+ uint64_t core;
+ uint64_t active_worker;
+
+ struct lp_sess *sess;
+ struct lp_stat *volatile curr;
+ struct lp_stat *next;
+ struct lp_stat stat[2][LP_MAX_WORKER];
+ struct lp_test test[LP_MAX_TEST];
+ struct lp_worker worker[LP_MAX_WORKER];
+ uint64_t no_err[LP_NOERR_NUM];
+};
+
+extern struct lp_var lp;
+
+inline static struct lp_worker *
+LP_WORKER (int index)
+{
+ return &lp.worker[index];
+}
+
+inline static struct lp_sess *
+LP_SESS (int fd)
+{
+ return lp.sess + fd;
+}
+
+inline static uint64_t
+lp_total_sess ()
+{
+ int i;
+ uint64_t total = 0;
+ for (i = 0; i < lp.worker_num; ++i)
+ total += lp.worker[i].sess.num;
+ return total;
+}
+
+#define LP_APPEND(head, fd, sess, name) do { \
+ struct lp_sess *_s = (sess); \
+ LP_ASSERT(_s->next_##name == fd); \
+ LP_ASSERT(_s->prev_##name == NULL); \
+ _s->next_##name = -1; \
+ _s->prev_##name = (head).last; \
+ *(head).last = fd; \
+ (head).last = &_s->next_##name; \
+ (head).num++; \
+} while (0)
+
+#define LP_REMOVE(head, fd, sess, name) do { \
+ struct lp_sess *_s = (sess); \
+ LP_ASSERT((head).num); \
+ if ((*_s->prev_##name = _s->next_##name) >= 0) \
+ LP_SESS(_s->next_##name)->prev_##name = _s->prev_##name; \
+ else \
+ (head).last = _s->prev_##name; \
+ (head).num--; \
+ LP_IF_DBG(_s->next_##name = (fd)); \
+ LP_IF_DBG(_s->prev_##name = NULL); \
+} while (0)
+
+inline static void
+lp_init_head (struct lp_head *head)
+{
+ head->first = -1;
+ head->num = 0;
+ head->last = &head->first;
+}
+
+inline static void
+lp_dest_sess (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_ASSERT (sess->next_rest == fd);
+ LP_ASSERT (sess->next_sess == fd);
+ LP_ASSERT (sess->prev_rest == NULL);
+ LP_ASSERT (sess->prev_sess == NULL);
+ LP_ASSERT (sess->work == worker->index);
+#if LP_DBG
+ sess->fnum++;
+#endif
+ LP_ASSERT (sess->fnum == sess->anum);
+
+ sess->state = LP_S_UNUSED;
+}
+
+inline static struct lp_sess *
+lp_init_sess (struct lp_worker *worker, int fd)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ if (sess->state != LP_S_UNUSED)
+ {
+ void lp_dump_sess (int fd);
+ wrn ("Invalid session fd:%d\n", fd);
+#if LP_DBG
+ lp_dump_sess (fd);
+#endif
+ LP_ASSERT (0);
+ }
+ LP_ASSERT (sess->fnum == sess->anum);
+
+ sess->state = LP_S_PREPARE;
+ sess->epout = 0;
+ sess->io_num = 0;
+ sess->query = 0;
+ sess->reply = 0;
+
+#if LP_DBG
+ sess->work = worker->index;
+ sess->anum++;
+ sess->next_rest = sess->next_sess = fd;
+ sess->prev_rest = sess->prev_sess = NULL;
+#endif
+
+ return sess;
+}
+
+inline static void
+lp_add_conn (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_ASSERT (sess->state == LP_S_PREPARE);
+
+ LP_APPEND (worker->conn, fd, sess, sess);
+ sess->state = LP_S_CONNECTING;
+}
+
+inline static void
+lp_out_conn (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_ASSERT (worker->conn.num);
+ LP_ASSERT (sess->state == LP_S_CONNECTING);
+
+ LP_REMOVE (worker->conn, fd, sess, sess);
+}
+
+inline static void
+lp_del_conn (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ lp_out_conn (worker, fd, sess);
+ lp_dest_sess (worker, fd, sess);
+ _close (fd);
+}
+
+inline static void
+lp_add_rest (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_APPEND (worker->rest[sess->test], fd, sess, rest);
+ sess->state = LP_S_REST;
+}
+
+inline static void
+lp_out_rest (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_ASSERT (worker->rest[sess->test].num);
+
+ LP_REMOVE (worker->rest[sess->test], fd, sess, rest);
+ sess->state = LP_S_IDLE;
+}
+
+inline static void
+lp_add_wait (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_APPEND (worker->wait[sess->test], fd, sess, rest);
+ sess->state = LP_S_WAIT;
+}
+
+inline static void
+lp_out_wait (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ LP_ASSERT (worker->wait[sess->test].num);
+
+ LP_REMOVE (worker->wait[sess->test], fd, sess, rest);
+ sess->state = LP_S_IDLE;
+}
+
+inline static void
+lp_del_sess (struct lp_worker *worker, int fd)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ LP_ASSERT (sess->state & LP_S_CONNECTED);
+
+ if (sess->state == LP_S_REST)
+ lp_out_rest (worker, fd, sess);
+ else if (sess->state == LP_S_WAIT)
+ lp_out_wait (worker, fd, sess);
+ LP_REMOVE (worker->sess, fd, sess, sess);
+
+ lp_dest_sess (worker, fd, sess);
+ LP_TIME_SET (begin);
+ _close (fd);
+ LP_TIME_END (worker, LP_W_CLOSE, begin);
+
+ LP_CNT (worker, LP_CLOSE);
+}
+
+inline static int
+lp_epctl (const struct lp_worker *worker, int op, int fd, uint32_t io)
+{
+ struct epoll_event event;
+ event.events = io | EPOLLET | EPOLLRDHUP | EPOLLHUP;
+ event.data.u64 = LP_EV_MK (LP_SESSION_TYPE, fd);
+ return _epoll_ctl (worker->epfd, op, fd, &event);
+}
+
+#define lp_epadd(worker, fd, io) lp_epctl((worker), EPOLL_CTL_ADD, (fd), (io))
+#define lp_epmod(worker, fd, io) lp_epctl((worker), EPOLL_CTL_MOD, (fd), (io))
+#define lp_epdel(worker, fd) _epoll_ctl((worker->epfd), EPOLL_CTL_DEL, (fd), NULL)
+
+inline static int
+lp_set_epout (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ if (sess->epout)
+ return 1;
+
+ if (lp_epmod (worker, fd, EPOLLOUT))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+
+ sess->epout = 1;
+ return 1;
+}
+
+#define LP_CMD_STOP 1
+
+inline static int
+lp_post_cmd (int fd, long long int cmd)
+{
+ ssize_t ret = _write (fd, (void *) &cmd, sizeof (cmd));
+
+ return ret - sizeof (cmd);
+}
+
+void lp_client (struct lp_worker *worker);
+void lp_server (struct lp_worker *worker);
+int lp_listen (struct lp_worker *worker, int index);
+
+#endif /* #ifndef _LP_H_ */
diff --git a/thirdparty/apps/testapp/lp/lpc.c b/thirdparty/apps/testapp/lp/lpc.c
new file mode 100644
index 0000000..7da0414
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lpc.c
@@ -0,0 +1,669 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "lp.h"
+
+inline static int
+lp_io_finish (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+
+ sess->state = LP_S_IDLE;
+ --sess->io_num;
+
+ if (sess->io_num)
+ {
+ lp_add_rest (worker, fd, sess);
+ sess->time.tv_sec += lp.test[sess->test].period;
+ return 0;
+ }
+
+ if (lp.test[sess->test].close_after_io)
+ return -1;
+
+ if (lp_epdel (worker, fd))
+ LP_ERR (worker, LP_E_EPDEL, errno);
+
+ return 0;
+}
+
+int
+lp_recv_reply (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ LP_ASSERT (sess->query == 0);
+
+ while (1)
+ {
+ ret = _recv (fd, buf, LP_IOBUF_SIZE, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_REPLY_BYTE, ret);
+ sess->reply -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ if (sess->reply > 0)
+ return 1;
+ break;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ if (sess->reply)
+ LP_CNT (worker, LP_E_IOSHUT);
+ else
+ LP_CNT (worker, LP_REPLY_COMP);
+ return -1;
+ }
+ }
+
+ LP_CNT (worker, LP_REPLY_COMP);
+
+ if (sess->reply < 0)
+ {
+ LP_CNT (worker, LP_E_IOEXCEED);
+ sess->reply = 0;
+ }
+
+ return lp_io_finish (worker, fd, sess);
+}
+
+int
+lp_more_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ LP_ASSERT (sess->io_num);
+
+ while (sess->query > 0)
+ {
+ const int LEN =
+ sess->query < LP_IOBUF_SIZE ? sess->query : LP_IOBUF_SIZE;
+
+ ret = _send (fd, buf, LEN, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ sess->query -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return lp_set_epout (worker, fd, sess);
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ LP_CNT (worker, LP_QUERY_COMP);
+
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+
+ if (!sess->reply)
+ {
+ return lp_io_finish (worker, fd, sess);
+ }
+
+ sess->state = LP_S_REPLY;
+ return 0;
+}
+
+int
+lp_new_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret, len = 0;
+ char *buf = worker->io_buf;
+ struct lp_io *io = (struct lp_io *) buf;
+
+ LP_ASSERT (sess->state == LP_S_IDLE);
+ LP_ASSERT (sess->query == 0);
+ LP_ASSERT (sess->reply == 0);
+ LP_ASSERT (sess->io_num);
+
+ sess->state = LP_S_QUERY;
+ sess->query = lp.test[sess->test].query;
+ sess->reply = lp.test[sess->test].reply;
+
+ io->query = htonl (sess->query);
+ io->reply = htonl (sess->reply);
+
+ while (1)
+ {
+ ret = _send (fd, buf + len, sess->query - len, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ len += ret;
+ if (len >= sizeof (struct lp_io))
+ break;
+ LP_CNT (worker, LP_E_IOMORE);
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ LP_CNT (worker, LP_E_IOMORE);
+ continue;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ sess->query -= len;
+
+ return lp_more_query (worker, fd, sess);
+}
+
+int
+lp_pre_connect (struct lp_worker *worker, int fd, struct sockaddr_in *c_addr)
+{
+ int ret;
+
+ if (fd >= LP_MAX_FD)
+ {
+ LP_CNT (worker, LP_FAILED);
+ err ("fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ return -1;
+ }
+
+ ret = set_reuseaddr (fd, 1);
+ if (ret)
+ {
+ const int e = errno;
+ LP_ERR (worker, LP_E_REUSEADDR, e);
+ DBG ("set_reuseaddr(%d, 1)=%d:%d\n", fd, ret, e);
+ }
+
+ ret = set_reuseport (fd, 1);
+ if (ret)
+ {
+ const int e = errno;
+ LP_ERR (worker, LP_E_REUSEPORT, errno);
+ DBG ("set_reuseport(%d, 1)=%d:%d\n", fd, ret, e);
+ }
+
+ if (c_addr->sin_addr.s_addr != INADDR_ANY || c_addr->sin_port != 0)
+ {
+ LP_TIME_SET (bind_begin);
+ ret =
+ _bind (fd, (struct sockaddr *) c_addr, sizeof (struct sockaddr_in));
+ LP_TIME_END (worker, LP_W_BIND, bind_begin);
+ if (ret)
+ {
+ int e = errno;
+ if (e == EADDRINUSE)
+ return -1;
+ LP_ERR2 (worker, LP_FAILED, LP_E_BIND, errno);
+ DBG ("->bind(%d, %s)=%d:%d\n", fd, f_inaddr (c_addr), ret, errno);
+ return -1;
+ }
+ }
+
+ if (lp.nodelay)
+ {
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ LP_ERR2 (worker, LP_FAILED, LP_E_NODELAY, errno);
+ }
+
+ if (!lp.block_connecting)
+ {
+ ret = set_nonblock (fd);
+ if (ret)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_NONBLOCK, errno);
+ return -1;
+ }
+#if 1
+ ret = lp_epadd (worker, fd, EPOLLOUT);
+ if (ret)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, errno);
+ DBG ("->epoll_ctl(%d, ADD, %d)=%d:%d", worker->epfd, fd, ret,
+ errno);
+ return -1;
+ }
+ return 1;
+#endif
+ }
+
+ return 0;
+}
+
+int
+lp_connected (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ struct timespec now;
+ struct lp_test *test;
+
+ LB_TIME (now);
+ LP_TIME_FOR (worker, LP_W_CONNECTED, sess->time, now);
+ sess->time = now;
+
+ LP_APPEND (worker->sess, fd, sess, sess);
+ sess->state = LP_S_CONNECTED;
+ LP_CNT (worker, LP_CONNECTED);
+
+ test = &lp.test[sess->test];
+ if (test->query)
+ {
+ if (lp.block_connecting && set_nonblock (fd))
+ {
+ LP_ERR (worker, LP_E_NONBLOCK, errno);
+ return -1;
+ }
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+ else
+ {
+ if (lp_epadd (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPADD, errno);
+ return -1;
+ }
+ }
+ sess->io_num = test->times;
+
+ if (test->wait)
+ {
+ sess->time.tv_sec += test->wait;
+ lp_add_wait (worker, fd, sess);
+ return 0;
+ }
+
+ return lp_new_query (worker, fd, sess);
+ }
+
+ if (sess->epout)
+ {
+ if (lp_epdel (worker, fd))
+ LP_ERR (worker, LP_E_EPDEL, errno);
+ sess->epout = 0;
+ }
+
+ return 0;
+}
+
+int
+lp_connect (struct lp_worker *worker, int cid, int sid, int test_id)
+{
+ int ret, fd;
+ struct lp_sess *sess;
+ struct timespec connect_begin;
+
+ LP_TIME_SET (begin);
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (fd < 0)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_SOCKET, errno);
+ DBG ("->socket(...)=%d:%d\n", fd, errno);
+ return -1;
+ }
+ LP_TIME_END (worker, LP_W_SOCKET, begin);
+
+ ret = lp_pre_connect (worker, fd, worker->client_addr + cid);
+ if (ret < 0)
+ {
+ _close (fd);
+ return -1;
+ }
+
+ LB_TIME (connect_begin);
+ ret =
+ _connect (fd, (struct sockaddr *) (worker->server_addr + sid),
+ sizeof (struct sockaddr_in));
+ LP_TIME_END (worker, LP_W_CONNECT, connect_begin);
+
+ if (ret)
+ {
+ const int e = errno;
+ if (lp.block_connecting || e != EINPROGRESS)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_CONNECT, e);
+ DBG ("->connect(%d, %s)=%d:%d\n", fd,
+ f_inaddr (worker->server_addr + sid), ret, e);
+ _close (fd);
+ return -1;
+ }
+ }
+
+ LP_TIME_END (worker, LP_W_CREATE, begin);
+ LP_CNT (worker, LP_CONNECT);
+
+ sess = lp_init_sess (worker, fd);
+ sess->test = test_id;
+ sess->time = connect_begin;
+ if (!lp.block_connecting)
+ sess->epout = 1;
+
+ if (ret)
+ {
+ /* nonblock connect inprogress */
+ lp_add_conn (worker, fd, sess);
+ return 1;
+ }
+
+ ret = lp_connected (worker, fd, sess);
+ if (ret < 0)
+ lp_del_sess (worker, fd);
+
+ return ret;
+}
+
+inline static int
+lp_handle_client (struct lp_worker *worker, int fd, uint32_t events)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ if (sess->state == LP_S_UNUSED)
+ {
+ LP_CNT (worker, LP_E_EPUNUSED);
+ return 0;
+ }
+
+ if (events & EPOLLRDHUP)
+ {
+ LP_CNT (worker, LP_E_EPHUP);
+ return -1;
+ }
+
+ if (events & EPOLLERR)
+ {
+ LP_CNT (worker, LP_E_EPERR);
+ DBG ("epoll event error, fd:%d event:0x%x\n", fd, events);
+ return -1;
+ }
+
+ if (events & EPOLLIN)
+ {
+ if (sess->state == LP_S_REPLY)
+ return lp_recv_reply (worker, fd, sess);
+ LP_CNT (worker, LP_E_EPERR);
+ return 0;
+ }
+
+ if (events & EPOLLOUT)
+ {
+ if (sess->state == LP_S_CONNECTING)
+ {
+ lp_out_conn (worker, fd, sess);
+ return lp_connected (worker, fd, sess);
+ }
+ if (sess->state == LP_S_QUERY)
+ return lp_more_query (worker, fd, sess);
+ LP_CNT (worker, LP_E_EPERR);
+ return 0;
+ }
+
+ LP_CNT (worker, LP_E_EPEVENT);
+ return -1;
+}
+
+inline static void
+lp_init_mode (const struct lp_worker *worker, int *cid, int *sid)
+{
+ if (worker->link_mode == LP_RAND)
+ {
+ *cid = LB_RAND (worker->client_num);
+ *sid = LB_RAND (worker->server_num);
+ }
+ else
+ {
+ *cid = 0;
+ *sid = 0;
+ }
+}
+
+inline static void
+lp_next_mode (const struct lp_worker *worker, int *cid, int *sid)
+{
+ if (worker->link_mode == LP_RAND)
+ {
+ *cid = LB_RAND (worker->client_num);
+ *sid = LB_RAND (worker->server_num);
+ }
+ else if (worker->link_mode == LP_SYNC)
+ {
+ if (++*sid >= worker->server_num)
+ *sid = 0;
+ if (++*cid >= worker->client_num)
+ *cid = 0;
+ }
+ else if (worker->link_mode == LP_CPS)
+ {
+ if (++*cid >= worker->client_num)
+ {
+ *cid = 0;
+ if (++*sid >= worker->server_num)
+ *sid = 0;
+ }
+ }
+ else if (worker->link_mode == LP_SPC)
+ {
+ if (++*sid >= worker->server_num)
+ {
+ *sid = 0;
+ if (++*cid >= worker->client_num)
+ *cid = 0;
+ }
+ }
+ else
+ {
+ err ("Error mode value:%d\n", worker->link_mode);
+ }
+}
+
+void
+lp_client (struct lp_worker *worker)
+{
+ int ret, num = 0, cid, sid, test_id = -1;
+ struct epoll_event *event;
+ struct lp_test *test = &lp.test[0];
+ struct lb_run *up = worker->up_run;
+ struct lb_run *down = worker->down_run;
+ const int epfd = worker->epfd;
+
+ lp_init_mode (worker, &cid, &sid);
+
+ while (lp.run_state == LP_EXEC)
+ {
+ int i;
+ struct timespec now;
+
+ if (test_id != lp.test_id)
+ {
+ DBG ("worker %d change test_id %d\n", worker->index, lp.test_id);
+ test = &lp.test[test_id = lp.test_id];
+
+ if (test->up)
+ run_init (up, (test->up + lp.worker_num - 1) / lp.worker_num,
+ LP_UP_SLOT, LP_UP_NSEC);
+ if (test->down)
+ run_init (down, (test->down + lp.worker_num - 1) / lp.worker_num,
+ LP_DOWN_SLOT, LP_DOWN_NSEC);
+ }
+
+ LB_TIME (now);
+
+ /* up process */
+ if (test->up)
+ {
+ if (run_test (up, &now) > 0)
+ {
+ ret = lp_connect (worker, cid, sid, test_id);
+ if (ret >= 0)
+ run_add (up, 1);
+ lp_next_mode (worker, &cid, &sid);
+ LB_TIME (now);
+ }
+ }
+
+ /* down process */
+ if (test->down && worker->sess.num)
+ {
+ if (run_test (down, &now) > 0)
+ {
+ lp_del_sess (worker, worker->sess.first);
+ run_add (down, 1);
+ LB_TIME (now);
+ }
+ }
+
+ /* query */
+ for (i = 0; i <= test_id; ++i)
+ {
+ int fd;
+ struct lp_sess *sess;
+
+ if (worker->rest[i].num)
+ {
+ fd = worker->rest[i].first;
+ sess = LP_SESS (fd);
+ if (LB_CMP (now, sess->time) >= 0)
+ {
+ LP_ASSERT (sess->state == LP_S_REST);
+ LP_ASSERT (sess->test == i);
+ lp_out_rest (worker, fd, sess);
+ if (lp_new_query (worker, fd, sess) < 0)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ LB_TIME (now);
+ }
+ }
+ if (worker->wait[i].num)
+ {
+ fd = worker->wait[i].first;
+ sess = LP_SESS (fd);
+ if (LB_CMP (now, sess->time) >= 0)
+ {
+ LP_ASSERT (sess->state == LP_S_WAIT);
+ LP_ASSERT (sess->test == i);
+ lp_out_wait (worker, fd, sess);
+ if (lp_new_query (worker, fd, sess) < 0)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ LB_TIME (now);
+ }
+ }
+ }
+
+ /* check connect timeout */
+ if (worker->conn.first >= 0)
+ {
+ }
+
+ /* epoll event process */
+ if (num > 0)
+ {
+ const uint64_t type = LP_EV_TYPE (event->data.u64);
+ const int fd = LP_EV_FD (event->data.u64);
+
+ if (type == LP_SESSION_TYPE)
+ {
+ if (lp_handle_client (worker, fd, event->events) < 0)
+ {
+ struct lp_sess *sess = LP_SESS (fd);
+ if (sess->state & LP_S_CONNECTED)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ else if (sess->state == LP_S_CONNECTING)
+ {
+ lp_del_conn (worker, fd, sess);
+ }
+ else
+ {
+ }
+ }
+ }
+ else if (type == LP_CONTROL_TYPE)
+ {
+ break;
+ }
+ else
+ {
+ err ("epoll event error flag:%lx, fd:%d event:%x}\n",
+ LP_EV_FLAG (event->data.u64), fd, event->events);
+ }
+ num--;
+ event++;
+ }
+ else
+ {
+ num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, 0);
+ if (num > 0)
+ {
+ event = worker->ev_buf;
+ }
+ else if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ LP_ERR (worker, LP_E_EPWAIT, e);
+ }
+ }
+ }
+}
diff --git a/thirdparty/apps/testapp/lp/lps.c b/thirdparty/apps/testapp/lp/lps.c
new file mode 100644
index 0000000..8462c21
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lps.c
@@ -0,0 +1,372 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "lp.h"
+
+int
+lp_send_reply (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ while (sess->reply > 0)
+ {
+ const int LEN =
+ sess->reply < LP_IOBUF_SIZE ? sess->reply : LP_IOBUF_SIZE;
+
+ ret = _send (fd, buf, LEN, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_REPLY_BYTE, ret);
+ sess->reply -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return lp_set_epout (worker, fd, sess);
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ LP_CNT (worker, LP_REPLY_COMP);
+
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+
+ return 0;
+}
+
+int
+lp_just_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ char *buf = worker->io_buf;
+ struct lp_io *io = (struct lp_io *) buf;
+ int len = 0;
+
+ while (1)
+ {
+ int ret;
+
+ ret = _recv (fd, buf + len, LP_IOBUF_SIZE - len, 0);
+
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ len += ret;
+ if (len >= sizeof (struct lp_io))
+ break;
+ LP_CNT (worker, LP_E_IOMORE);
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ LP_CNT (worker, LP_E_IOMORE);
+ continue;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ //LP_CNT(worker, LP_E_IOSHUT);
+ return -1;
+ }
+ }
+
+ sess->query = htonl (io->query);
+ sess->reply = htonl (io->reply);
+
+ if (sess->query < LP_QUERY_MIN || sess->reply < 0)
+ {
+ LP_CNT (worker, LP_E_IOSIZE);
+ return -1;
+ }
+
+ sess->query -= len;
+ return 0;
+}
+
+int
+lp_recv_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ if (sess->query == 0)
+ {
+ ret = lp_just_query (worker, fd, sess);
+ if (ret)
+ return ret;
+ }
+
+ while (1)
+ {
+ ret = _recv (fd, buf, LP_IOBUF_SIZE, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ sess->query -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ if (sess->query > 0)
+ return 1;
+ break;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ if (sess->query || sess->reply)
+ {
+ //LP_CNT(worker, LP_E_IOSHUT);
+ }
+ else
+ LP_CNT (worker, LP_QUERY_COMP);
+ return -1;
+ }
+ }
+
+ LP_CNT (worker, LP_QUERY_COMP);
+
+ if (sess->query < 0)
+ {
+ LP_CNT (worker, LP_E_IOEXCEED);
+ sess->query = 0;
+ }
+
+ if (sess->reply)
+ return lp_send_reply (worker, fd, sess);
+
+ return 0;
+}
+
+inline static int
+lp_service (struct lp_worker *worker, int fd, uint32_t events)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ if (sess->state == LP_S_UNUSED)
+ {
+ LP_CNT (worker, LP_E_EPUNUSED);
+ return 0;
+ }
+
+ if (events & EPOLLRDHUP)
+ return -1;
+
+ if (events & EPOLLERR)
+ {
+ LP_CNT (worker, LP_E_EPERR);
+ DBG ("epoll event error, fd:%d event:0x%x\n", fd, events);
+ return -1;
+ }
+
+ if ((events & (EPOLLIN | EPOLLOUT)) == (EPOLLIN | EPOLLOUT))
+ {
+ LP_CNT (worker, LP_E_EPINOUT);
+ return -1;
+ }
+
+ if (events & EPOLLIN)
+ return lp_recv_query (worker, fd, sess);
+
+ if (events & EPOLLOUT)
+ return lp_send_reply (worker, fd, sess);
+
+ LP_CNT (worker, LP_E_EPEVENT);
+ return -1;
+}
+
+void
+lp_accept (struct lp_worker *worker, int listen_fd)
+{
+ while (lp.run_state == LP_EXEC)
+ {
+ int fd, ret;
+ struct lp_sess *sess;
+
+ LP_TIME_SET (begin);
+
+ fd = _accept4 (listen_fd, NULL, NULL, SOCK_NONBLOCK);
+ if (fd < 0)
+ {
+ int e = errno;
+ if (e == EAGAIN || e == EWOULDBLOCK)
+ return;
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_ACCEPT, e);
+ DBG ("->accept4(%d)=%d:%d\n", listen_fd, fd, e);
+ return;
+ }
+ else
+ {
+ LP_TIME_END (worker, LP_W_ACCEPT, begin);
+ }
+
+ if (fd >= LP_MAX_FD)
+ {
+ LP_CNT (worker, LP_FAILED);
+ _close (fd);
+ err ("accept fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ continue;
+ }
+
+ if (lp.nodelay)
+ {
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ LP_ERR (worker, LP_E_NODELAY, errno);
+ }
+
+ ret = lp_epadd (worker, fd, EPOLLIN);
+ if (ret)
+ {
+ int e = errno;
+ LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, e);
+ _close (fd);
+ DBG ("epoll_ctl(%d, %d)=%d:%d\n\n", worker->epfd, fd, ret, e);
+ continue;
+ }
+
+ sess = lp_init_sess (worker, fd);
+ LP_APPEND (worker->sess, fd, sess, sess);
+ sess->state = LP_S_CONNECTED;
+ LP_CNT (worker, LP_CONNECTED);
+
+ LP_TIME_END (worker, LP_W_CREATE, begin);
+ }
+}
+
+void
+lp_server (struct lp_worker *worker)
+{
+ int num = 0;
+ struct epoll_event *event;
+ const int epfd = worker->epfd;
+
+ while (lp.run_state == LP_EXEC)
+ {
+ if (num > 0)
+ {
+ const uint64_t type = LP_EV_TYPE (event->data.u64);
+ const int fd = LP_EV_FD (event->data.u64);
+
+ if (type == LP_LISTEN_TYPE)
+ {
+ lp_accept (worker, fd);
+ }
+ else if (type == LP_SESSION_TYPE)
+ {
+ if (lp_service (worker, fd, event->events) < 0)
+ lp_del_sess (worker, fd);
+ }
+ else if (type == LP_CONTROL_TYPE)
+ {
+ break;
+ }
+ else
+ {
+ err ("epoll event error flag:%lx, fd:%d event:%x}\n",
+ LP_EV_FLAG (event->data.u64), fd, event->events);
+ }
+ num--;
+ event++;
+ }
+ else
+ {
+ num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, -1);
+ if (num > 0)
+ {
+ event = worker->ev_buf;
+ }
+ else if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ LP_ERR (worker, LP_E_EPWAIT, e);
+ }
+ }
+ }
+}
+
+int
+lp_listen (struct lp_worker *worker, int index)
+{
+ int fd, ret;
+ struct epoll_event event;
+
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno);
+
+ if (fd >= LP_MAX_FD)
+ {
+ err ("socket()=%d >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ (void) _close (fd);
+ return -1;
+ }
+
+ LP_APPEND (worker->server, fd, lp_init_sess (worker, fd), sess);
+
+ ret =
+ _bind (fd, (struct sockaddr *) &worker->server_addr[index],
+ sizeof (worker->server_addr[index]));
+ ERR_RETURN (ret, -1, "bind(%d, %s)=%d:%d\n", fd,
+ f_inaddr (&worker->server_addr[index]), ret, errno);
+
+ ret = set_nonblock (fd);
+ ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = _listen (fd, SOMAXCONN);
+ ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = LP_EV_MK (LP_LISTEN_TYPE, fd);
+ ret = _epoll_ctl (worker->epfd, EPOLL_CTL_ADD, fd, &event);
+ ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", worker->epfd, fd, ret,
+ errno);
+
+ DBG ("worker %d server %d fd %d listen on %s\n", worker->index, index, fd,
+ f_inaddr (&worker->server_addr[index]));
+ return 0;
+}