aboutsummaryrefslogtreecommitdiffstats
path: root/thirdparty/apps/testapp/lp/lpc.c
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/apps/testapp/lp/lpc.c')
-rw-r--r--thirdparty/apps/testapp/lp/lpc.c669
1 files changed, 669 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/lp/lpc.c b/thirdparty/apps/testapp/lp/lpc.c
new file mode 100644
index 0000000..7da0414
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lpc.c
@@ -0,0 +1,669 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "lp.h"
+
+inline static int
+lp_io_finish (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+
+ sess->state = LP_S_IDLE;
+ --sess->io_num;
+
+ if (sess->io_num)
+ {
+ lp_add_rest (worker, fd, sess);
+ sess->time.tv_sec += lp.test[sess->test].period;
+ return 0;
+ }
+
+ if (lp.test[sess->test].close_after_io)
+ return -1;
+
+ if (lp_epdel (worker, fd))
+ LP_ERR (worker, LP_E_EPDEL, errno);
+
+ return 0;
+}
+
+int
+lp_recv_reply (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ LP_ASSERT (sess->query == 0);
+
+ while (1)
+ {
+ ret = _recv (fd, buf, LP_IOBUF_SIZE, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_REPLY_BYTE, ret);
+ sess->reply -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ if (sess->reply > 0)
+ return 1;
+ break;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ if (sess->reply)
+ LP_CNT (worker, LP_E_IOSHUT);
+ else
+ LP_CNT (worker, LP_REPLY_COMP);
+ return -1;
+ }
+ }
+
+ LP_CNT (worker, LP_REPLY_COMP);
+
+ if (sess->reply < 0)
+ {
+ LP_CNT (worker, LP_E_IOEXCEED);
+ sess->reply = 0;
+ }
+
+ return lp_io_finish (worker, fd, sess);
+}
+
+int
+lp_more_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ LP_ASSERT (sess->io_num);
+
+ while (sess->query > 0)
+ {
+ const int LEN =
+ sess->query < LP_IOBUF_SIZE ? sess->query : LP_IOBUF_SIZE;
+
+ ret = _send (fd, buf, LEN, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ sess->query -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return lp_set_epout (worker, fd, sess);
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ LP_CNT (worker, LP_QUERY_COMP);
+
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+
+ if (!sess->reply)
+ {
+ return lp_io_finish (worker, fd, sess);
+ }
+
+ sess->state = LP_S_REPLY;
+ return 0;
+}
+
+int
+lp_new_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret, len = 0;
+ char *buf = worker->io_buf;
+ struct lp_io *io = (struct lp_io *) buf;
+
+ LP_ASSERT (sess->state == LP_S_IDLE);
+ LP_ASSERT (sess->query == 0);
+ LP_ASSERT (sess->reply == 0);
+ LP_ASSERT (sess->io_num);
+
+ sess->state = LP_S_QUERY;
+ sess->query = lp.test[sess->test].query;
+ sess->reply = lp.test[sess->test].reply;
+
+ io->query = htonl (sess->query);
+ io->reply = htonl (sess->reply);
+
+ while (1)
+ {
+ ret = _send (fd, buf + len, sess->query - len, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ len += ret;
+ if (len >= sizeof (struct lp_io))
+ break;
+ LP_CNT (worker, LP_E_IOMORE);
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ LP_CNT (worker, LP_E_IOMORE);
+ continue;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ sess->query -= len;
+
+ return lp_more_query (worker, fd, sess);
+}
+
+int
+lp_pre_connect (struct lp_worker *worker, int fd, struct sockaddr_in *c_addr)
+{
+ int ret;
+
+ if (fd >= LP_MAX_FD)
+ {
+ LP_CNT (worker, LP_FAILED);
+ err ("fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ return -1;
+ }
+
+ ret = set_reuseaddr (fd, 1);
+ if (ret)
+ {
+ const int e = errno;
+ LP_ERR (worker, LP_E_REUSEADDR, e);
+ DBG ("set_reuseaddr(%d, 1)=%d:%d\n", fd, ret, e);
+ }
+
+ ret = set_reuseport (fd, 1);
+ if (ret)
+ {
+ const int e = errno;
+ LP_ERR (worker, LP_E_REUSEPORT, errno);
+ DBG ("set_reuseport(%d, 1)=%d:%d\n", fd, ret, e);
+ }
+
+ if (c_addr->sin_addr.s_addr != INADDR_ANY || c_addr->sin_port != 0)
+ {
+ LP_TIME_SET (bind_begin);
+ ret =
+ _bind (fd, (struct sockaddr *) c_addr, sizeof (struct sockaddr_in));
+ LP_TIME_END (worker, LP_W_BIND, bind_begin);
+ if (ret)
+ {
+ int e = errno;
+ if (e == EADDRINUSE)
+ return -1;
+ LP_ERR2 (worker, LP_FAILED, LP_E_BIND, errno);
+ DBG ("->bind(%d, %s)=%d:%d\n", fd, f_inaddr (c_addr), ret, errno);
+ return -1;
+ }
+ }
+
+ if (lp.nodelay)
+ {
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ LP_ERR2 (worker, LP_FAILED, LP_E_NODELAY, errno);
+ }
+
+ if (!lp.block_connecting)
+ {
+ ret = set_nonblock (fd);
+ if (ret)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_NONBLOCK, errno);
+ return -1;
+ }
+#if 1
+ ret = lp_epadd (worker, fd, EPOLLOUT);
+ if (ret)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, errno);
+ DBG ("->epoll_ctl(%d, ADD, %d)=%d:%d", worker->epfd, fd, ret,
+ errno);
+ return -1;
+ }
+ return 1;
+#endif
+ }
+
+ return 0;
+}
+
+int
+lp_connected (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ struct timespec now;
+ struct lp_test *test;
+
+ LB_TIME (now);
+ LP_TIME_FOR (worker, LP_W_CONNECTED, sess->time, now);
+ sess->time = now;
+
+ LP_APPEND (worker->sess, fd, sess, sess);
+ sess->state = LP_S_CONNECTED;
+ LP_CNT (worker, LP_CONNECTED);
+
+ test = &lp.test[sess->test];
+ if (test->query)
+ {
+ if (lp.block_connecting && set_nonblock (fd))
+ {
+ LP_ERR (worker, LP_E_NONBLOCK, errno);
+ return -1;
+ }
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+ else
+ {
+ if (lp_epadd (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPADD, errno);
+ return -1;
+ }
+ }
+ sess->io_num = test->times;
+
+ if (test->wait)
+ {
+ sess->time.tv_sec += test->wait;
+ lp_add_wait (worker, fd, sess);
+ return 0;
+ }
+
+ return lp_new_query (worker, fd, sess);
+ }
+
+ if (sess->epout)
+ {
+ if (lp_epdel (worker, fd))
+ LP_ERR (worker, LP_E_EPDEL, errno);
+ sess->epout = 0;
+ }
+
+ return 0;
+}
+
+int
+lp_connect (struct lp_worker *worker, int cid, int sid, int test_id)
+{
+ int ret, fd;
+ struct lp_sess *sess;
+ struct timespec connect_begin;
+
+ LP_TIME_SET (begin);
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (fd < 0)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_SOCKET, errno);
+ DBG ("->socket(...)=%d:%d\n", fd, errno);
+ return -1;
+ }
+ LP_TIME_END (worker, LP_W_SOCKET, begin);
+
+ ret = lp_pre_connect (worker, fd, worker->client_addr + cid);
+ if (ret < 0)
+ {
+ _close (fd);
+ return -1;
+ }
+
+ LB_TIME (connect_begin);
+ ret =
+ _connect (fd, (struct sockaddr *) (worker->server_addr + sid),
+ sizeof (struct sockaddr_in));
+ LP_TIME_END (worker, LP_W_CONNECT, connect_begin);
+
+ if (ret)
+ {
+ const int e = errno;
+ if (lp.block_connecting || e != EINPROGRESS)
+ {
+ LP_ERR2 (worker, LP_FAILED, LP_E_CONNECT, e);
+ DBG ("->connect(%d, %s)=%d:%d\n", fd,
+ f_inaddr (worker->server_addr + sid), ret, e);
+ _close (fd);
+ return -1;
+ }
+ }
+
+ LP_TIME_END (worker, LP_W_CREATE, begin);
+ LP_CNT (worker, LP_CONNECT);
+
+ sess = lp_init_sess (worker, fd);
+ sess->test = test_id;
+ sess->time = connect_begin;
+ if (!lp.block_connecting)
+ sess->epout = 1;
+
+ if (ret)
+ {
+ /* nonblock connect inprogress */
+ lp_add_conn (worker, fd, sess);
+ return 1;
+ }
+
+ ret = lp_connected (worker, fd, sess);
+ if (ret < 0)
+ lp_del_sess (worker, fd);
+
+ return ret;
+}
+
+inline static int
+lp_handle_client (struct lp_worker *worker, int fd, uint32_t events)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ if (sess->state == LP_S_UNUSED)
+ {
+ LP_CNT (worker, LP_E_EPUNUSED);
+ return 0;
+ }
+
+ if (events & EPOLLRDHUP)
+ {
+ LP_CNT (worker, LP_E_EPHUP);
+ return -1;
+ }
+
+ if (events & EPOLLERR)
+ {
+ LP_CNT (worker, LP_E_EPERR);
+ DBG ("epoll event error, fd:%d event:0x%x\n", fd, events);
+ return -1;
+ }
+
+ if (events & EPOLLIN)
+ {
+ if (sess->state == LP_S_REPLY)
+ return lp_recv_reply (worker, fd, sess);
+ LP_CNT (worker, LP_E_EPERR);
+ return 0;
+ }
+
+ if (events & EPOLLOUT)
+ {
+ if (sess->state == LP_S_CONNECTING)
+ {
+ lp_out_conn (worker, fd, sess);
+ return lp_connected (worker, fd, sess);
+ }
+ if (sess->state == LP_S_QUERY)
+ return lp_more_query (worker, fd, sess);
+ LP_CNT (worker, LP_E_EPERR);
+ return 0;
+ }
+
+ LP_CNT (worker, LP_E_EPEVENT);
+ return -1;
+}
+
+inline static void
+lp_init_mode (const struct lp_worker *worker, int *cid, int *sid)
+{
+ if (worker->link_mode == LP_RAND)
+ {
+ *cid = LB_RAND (worker->client_num);
+ *sid = LB_RAND (worker->server_num);
+ }
+ else
+ {
+ *cid = 0;
+ *sid = 0;
+ }
+}
+
+inline static void
+lp_next_mode (const struct lp_worker *worker, int *cid, int *sid)
+{
+ if (worker->link_mode == LP_RAND)
+ {
+ *cid = LB_RAND (worker->client_num);
+ *sid = LB_RAND (worker->server_num);
+ }
+ else if (worker->link_mode == LP_SYNC)
+ {
+ if (++*sid >= worker->server_num)
+ *sid = 0;
+ if (++*cid >= worker->client_num)
+ *cid = 0;
+ }
+ else if (worker->link_mode == LP_CPS)
+ {
+ if (++*cid >= worker->client_num)
+ {
+ *cid = 0;
+ if (++*sid >= worker->server_num)
+ *sid = 0;
+ }
+ }
+ else if (worker->link_mode == LP_SPC)
+ {
+ if (++*sid >= worker->server_num)
+ {
+ *sid = 0;
+ if (++*cid >= worker->client_num)
+ *cid = 0;
+ }
+ }
+ else
+ {
+ err ("Error mode value:%d\n", worker->link_mode);
+ }
+}
+
+void
+lp_client (struct lp_worker *worker)
+{
+ int ret, num = 0, cid, sid, test_id = -1;
+ struct epoll_event *event;
+ struct lp_test *test = &lp.test[0];
+ struct lb_run *up = worker->up_run;
+ struct lb_run *down = worker->down_run;
+ const int epfd = worker->epfd;
+
+ lp_init_mode (worker, &cid, &sid);
+
+ while (lp.run_state == LP_EXEC)
+ {
+ int i;
+ struct timespec now;
+
+ if (test_id != lp.test_id)
+ {
+ DBG ("worker %d change test_id %d\n", worker->index, lp.test_id);
+ test = &lp.test[test_id = lp.test_id];
+
+ if (test->up)
+ run_init (up, (test->up + lp.worker_num - 1) / lp.worker_num,
+ LP_UP_SLOT, LP_UP_NSEC);
+ if (test->down)
+ run_init (down, (test->down + lp.worker_num - 1) / lp.worker_num,
+ LP_DOWN_SLOT, LP_DOWN_NSEC);
+ }
+
+ LB_TIME (now);
+
+ /* up process */
+ if (test->up)
+ {
+ if (run_test (up, &now) > 0)
+ {
+ ret = lp_connect (worker, cid, sid, test_id);
+ if (ret >= 0)
+ run_add (up, 1);
+ lp_next_mode (worker, &cid, &sid);
+ LB_TIME (now);
+ }
+ }
+
+ /* down process */
+ if (test->down && worker->sess.num)
+ {
+ if (run_test (down, &now) > 0)
+ {
+ lp_del_sess (worker, worker->sess.first);
+ run_add (down, 1);
+ LB_TIME (now);
+ }
+ }
+
+ /* query */
+ for (i = 0; i <= test_id; ++i)
+ {
+ int fd;
+ struct lp_sess *sess;
+
+ if (worker->rest[i].num)
+ {
+ fd = worker->rest[i].first;
+ sess = LP_SESS (fd);
+ if (LB_CMP (now, sess->time) >= 0)
+ {
+ LP_ASSERT (sess->state == LP_S_REST);
+ LP_ASSERT (sess->test == i);
+ lp_out_rest (worker, fd, sess);
+ if (lp_new_query (worker, fd, sess) < 0)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ LB_TIME (now);
+ }
+ }
+ if (worker->wait[i].num)
+ {
+ fd = worker->wait[i].first;
+ sess = LP_SESS (fd);
+ if (LB_CMP (now, sess->time) >= 0)
+ {
+ LP_ASSERT (sess->state == LP_S_WAIT);
+ LP_ASSERT (sess->test == i);
+ lp_out_wait (worker, fd, sess);
+ if (lp_new_query (worker, fd, sess) < 0)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ LB_TIME (now);
+ }
+ }
+ }
+
+ /* check connect timeout */
+ if (worker->conn.first >= 0)
+ {
+ }
+
+ /* epoll event process */
+ if (num > 0)
+ {
+ const uint64_t type = LP_EV_TYPE (event->data.u64);
+ const int fd = LP_EV_FD (event->data.u64);
+
+ if (type == LP_SESSION_TYPE)
+ {
+ if (lp_handle_client (worker, fd, event->events) < 0)
+ {
+ struct lp_sess *sess = LP_SESS (fd);
+ if (sess->state & LP_S_CONNECTED)
+ {
+ lp_del_sess (worker, fd);
+ if (test->down)
+ run_add (down, 1);
+ }
+ else if (sess->state == LP_S_CONNECTING)
+ {
+ lp_del_conn (worker, fd, sess);
+ }
+ else
+ {
+ }
+ }
+ }
+ else if (type == LP_CONTROL_TYPE)
+ {
+ break;
+ }
+ else
+ {
+ err ("epoll event error flag:%lx, fd:%d event:%x}\n",
+ LP_EV_FLAG (event->data.u64), fd, event->events);
+ }
+ num--;
+ event++;
+ }
+ else
+ {
+ num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, 0);
+ if (num > 0)
+ {
+ event = worker->ev_buf;
+ }
+ else if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ LP_ERR (worker, LP_E_EPWAIT, e);
+ }
+ }
+ }
+}