aboutsummaryrefslogtreecommitdiffstats
path: root/thirdparty/apps/testapp/lp/lps.c
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/apps/testapp/lp/lps.c')
-rw-r--r--thirdparty/apps/testapp/lp/lps.c372
1 files changed, 372 insertions, 0 deletions
diff --git a/thirdparty/apps/testapp/lp/lps.c b/thirdparty/apps/testapp/lp/lps.c
new file mode 100644
index 0000000..8462c21
--- /dev/null
+++ b/thirdparty/apps/testapp/lp/lps.c
@@ -0,0 +1,372 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "lb.h"
+#include "lp.h"
+
+int
+lp_send_reply (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ while (sess->reply > 0)
+ {
+ const int LEN =
+ sess->reply < LP_IOBUF_SIZE ? sess->reply : LP_IOBUF_SIZE;
+
+ ret = _send (fd, buf, LEN, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_REPLY_BYTE, ret);
+ sess->reply -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ return lp_set_epout (worker, fd, sess);
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_SEND, e);
+ return -1;
+ }
+ else
+ {
+ LP_CNT (worker, LP_E_IOSEND0);
+ }
+ }
+
+ LP_CNT (worker, LP_REPLY_COMP);
+
+ if (sess->epout)
+ {
+ if (lp_epmod (worker, fd, EPOLLIN))
+ {
+ LP_ERR (worker, LP_E_EPMOD, errno);
+ return -1;
+ }
+ sess->epout = 0;
+ }
+
+ return 0;
+}
+
+int
+lp_just_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ char *buf = worker->io_buf;
+ struct lp_io *io = (struct lp_io *) buf;
+ int len = 0;
+
+ while (1)
+ {
+ int ret;
+
+ ret = _recv (fd, buf + len, LP_IOBUF_SIZE - len, 0);
+
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ len += ret;
+ if (len >= sizeof (struct lp_io))
+ break;
+ LP_CNT (worker, LP_E_IOMORE);
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ LP_CNT (worker, LP_E_IOMORE);
+ continue;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ //LP_CNT(worker, LP_E_IOSHUT);
+ return -1;
+ }
+ }
+
+ sess->query = htonl (io->query);
+ sess->reply = htonl (io->reply);
+
+ if (sess->query < LP_QUERY_MIN || sess->reply < 0)
+ {
+ LP_CNT (worker, LP_E_IOSIZE);
+ return -1;
+ }
+
+ sess->query -= len;
+ return 0;
+}
+
+int
+lp_recv_query (struct lp_worker *worker, int fd, struct lp_sess *sess)
+{
+ int ret;
+ char *buf = worker->io_buf;
+
+ if (sess->query == 0)
+ {
+ ret = lp_just_query (worker, fd, sess);
+ if (ret)
+ return ret;
+ }
+
+ while (1)
+ {
+ ret = _recv (fd, buf, LP_IOBUF_SIZE, 0);
+ if (ret > 0)
+ {
+ LP_ADD (worker, LP_QUERY_BYTE, ret);
+ sess->query -= ret;
+ }
+ else if (ret < 0)
+ {
+ int e = errno;
+ if (e == EWOULDBLOCK || e == EAGAIN)
+ {
+ if (sess->query > 0)
+ return 1;
+ break;
+ }
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_RECV, e);
+ return -1;
+ }
+ else
+ {
+ if (sess->query || sess->reply)
+ {
+ //LP_CNT(worker, LP_E_IOSHUT);
+ }
+ else
+ LP_CNT (worker, LP_QUERY_COMP);
+ return -1;
+ }
+ }
+
+ LP_CNT (worker, LP_QUERY_COMP);
+
+ if (sess->query < 0)
+ {
+ LP_CNT (worker, LP_E_IOEXCEED);
+ sess->query = 0;
+ }
+
+ if (sess->reply)
+ return lp_send_reply (worker, fd, sess);
+
+ return 0;
+}
+
+inline static int
+lp_service (struct lp_worker *worker, int fd, uint32_t events)
+{
+ struct lp_sess *sess = LP_SESS (fd);
+
+ if (sess->state == LP_S_UNUSED)
+ {
+ LP_CNT (worker, LP_E_EPUNUSED);
+ return 0;
+ }
+
+ if (events & EPOLLRDHUP)
+ return -1;
+
+ if (events & EPOLLERR)
+ {
+ LP_CNT (worker, LP_E_EPERR);
+ DBG ("epoll event error, fd:%d event:0x%x\n", fd, events);
+ return -1;
+ }
+
+ if ((events & (EPOLLIN | EPOLLOUT)) == (EPOLLIN | EPOLLOUT))
+ {
+ LP_CNT (worker, LP_E_EPINOUT);
+ return -1;
+ }
+
+ if (events & EPOLLIN)
+ return lp_recv_query (worker, fd, sess);
+
+ if (events & EPOLLOUT)
+ return lp_send_reply (worker, fd, sess);
+
+ LP_CNT (worker, LP_E_EPEVENT);
+ return -1;
+}
+
+void
+lp_accept (struct lp_worker *worker, int listen_fd)
+{
+ while (lp.run_state == LP_EXEC)
+ {
+ int fd, ret;
+ struct lp_sess *sess;
+
+ LP_TIME_SET (begin);
+
+ fd = _accept4 (listen_fd, NULL, NULL, SOCK_NONBLOCK);
+ if (fd < 0)
+ {
+ int e = errno;
+ if (e == EAGAIN || e == EWOULDBLOCK)
+ return;
+ if (e == EINTR)
+ continue;
+ LP_ERR (worker, LP_E_ACCEPT, e);
+ DBG ("->accept4(%d)=%d:%d\n", listen_fd, fd, e);
+ return;
+ }
+ else
+ {
+ LP_TIME_END (worker, LP_W_ACCEPT, begin);
+ }
+
+ if (fd >= LP_MAX_FD)
+ {
+ LP_CNT (worker, LP_FAILED);
+ _close (fd);
+ err ("accept fd(%d) >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ continue;
+ }
+
+ if (lp.nodelay)
+ {
+ ret = set_nodelay (fd, 1);
+ if (ret)
+ LP_ERR (worker, LP_E_NODELAY, errno);
+ }
+
+ ret = lp_epadd (worker, fd, EPOLLIN);
+ if (ret)
+ {
+ int e = errno;
+ LP_ERR2 (worker, LP_FAILED, LP_E_EPADD, e);
+ _close (fd);
+ DBG ("epoll_ctl(%d, %d)=%d:%d\n\n", worker->epfd, fd, ret, e);
+ continue;
+ }
+
+ sess = lp_init_sess (worker, fd);
+ LP_APPEND (worker->sess, fd, sess, sess);
+ sess->state = LP_S_CONNECTED;
+ LP_CNT (worker, LP_CONNECTED);
+
+ LP_TIME_END (worker, LP_W_CREATE, begin);
+ }
+}
+
+void
+lp_server (struct lp_worker *worker)
+{
+ int num = 0;
+ struct epoll_event *event;
+ const int epfd = worker->epfd;
+
+ while (lp.run_state == LP_EXEC)
+ {
+ if (num > 0)
+ {
+ const uint64_t type = LP_EV_TYPE (event->data.u64);
+ const int fd = LP_EV_FD (event->data.u64);
+
+ if (type == LP_LISTEN_TYPE)
+ {
+ lp_accept (worker, fd);
+ }
+ else if (type == LP_SESSION_TYPE)
+ {
+ if (lp_service (worker, fd, event->events) < 0)
+ lp_del_sess (worker, fd);
+ }
+ else if (type == LP_CONTROL_TYPE)
+ {
+ break;
+ }
+ else
+ {
+ err ("epoll event error flag:%lx, fd:%d event:%x}\n",
+ LP_EV_FLAG (event->data.u64), fd, event->events);
+ }
+ num--;
+ event++;
+ }
+ else
+ {
+ num = _epoll_wait (epfd, worker->ev_buf, LP_EVENT_NUM, -1);
+ if (num > 0)
+ {
+ event = worker->ev_buf;
+ }
+ else if (num < 0)
+ {
+ int e = errno;
+ if (e != EINTR && e != ETIMEDOUT)
+ LP_ERR (worker, LP_E_EPWAIT, e);
+ }
+ }
+ }
+}
+
+int
+lp_listen (struct lp_worker *worker, int index)
+{
+ int fd, ret;
+ struct epoll_event event;
+
+ fd = _socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ ERR_RETURN (fd < 0, -1, "socket()=%d:%d\n", fd, errno);
+
+ if (fd >= LP_MAX_FD)
+ {
+ err ("socket()=%d >= LP_MAX_FD(%d)\n", fd, LP_MAX_FD);
+ (void) _close (fd);
+ return -1;
+ }
+
+ LP_APPEND (worker->server, fd, lp_init_sess (worker, fd), sess);
+
+ ret =
+ _bind (fd, (struct sockaddr *) &worker->server_addr[index],
+ sizeof (worker->server_addr[index]));
+ ERR_RETURN (ret, -1, "bind(%d, %s)=%d:%d\n", fd,
+ f_inaddr (&worker->server_addr[index]), ret, errno);
+
+ ret = set_nonblock (fd);
+ ERR_RETURN (ret, -1, "set_nonblock(%d)=%d:%d\n", fd, ret, errno);
+
+ ret = _listen (fd, SOMAXCONN);
+ ERR_RETURN (ret, -1, "listen(%d)=%d:%d\n", fd, ret, errno);
+
+ event.events = EPOLLIN | EPOLLET;
+ event.data.u64 = LP_EV_MK (LP_LISTEN_TYPE, fd);
+ ret = _epoll_ctl (worker->epfd, EPOLL_CTL_ADD, fd, &event);
+ ERR_RETURN (ret, -1, "epoll_ctl(%d, %d)=%d:%d\n", worker->epfd, fd, ret,
+ errno);
+
+ DBG ("worker %d server %d fd %d listen on %s\n", worker->index, index, fd,
+ f_inaddr (&worker->server_addr[index]));
+ return 0;
+}