From e18a033b921d0d79fa8278f853548e6125b93e0c Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Tue, 31 Oct 2017 12:40:17 +0000 Subject: Integrate TLDK with NGINX Created a clone of nginx (from https://github.com/nginx/nginx) to demonstrate and benchmark TLDK library integrated with real world application. A new nginx module is created and and BSD socket-like API is implemented on top of native TLDK API. Note, that right now only minimalistic subset of socket-like API is provided: - accept - close - readv - recv - writev so only limited nginx functionality is available for a moment. Change-Id: Ie1efe9349a0538da4348a48fb8306cbf636b5a92 Signed-off-by: Mohammad Abdul Awal Signed-off-by: Reshma Pattan Signed-off-by: Remy Horton Signed-off-by: Konstantin Ananyev --- app/nginx/src/tldk/tldk_sock.c | 549 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 549 insertions(+) create mode 100644 app/nginx/src/tldk/tldk_sock.c (limited to 'app/nginx/src/tldk/tldk_sock.c') diff --git a/app/nginx/src/tldk/tldk_sock.c b/app/nginx/src/tldk/tldk_sock.c new file mode 100644 index 0000000..7831fc3 --- /dev/null +++ b/app/nginx/src/tldk/tldk_sock.c @@ -0,0 +1,549 @@ +/* + * Copyright (c) 2017 Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include + +#include +#include + +struct tldk_sock_stat { + uint64_t nb_accept; + uint64_t nb_close; + uint64_t nb_readv; + uint64_t nb_recv; + uint64_t nb_setopts; + uint64_t nb_shutdown; + uint64_t nb_writev; +}; + +static struct tldk_sock_stat sock_stat; + +/* One socket/file table per worker */ +struct tldk_stbl stbl = { + .snum = 0, +}; + +static int (*real_accept4)(int, struct sockaddr *, socklen_t *, int); +static int (*real_close)(int); +static ssize_t (*real_readv)(int, const struct iovec *, int); +static ssize_t (*real_recv)(int, void *, size_t, int); +static int (*real_setsockopt)(int, int, int, const void *, socklen_t); +static int (*real_shutdown)(int, int); +static ssize_t (*real_writev)(int, const struct iovec *, int); + +static inline uint32_t +get_socks(struct tldk_sock_list *list, struct tldk_sock *rs[], + uint32_t num) +{ + struct tldk_sock *s; + uint32_t i, n; + + n = RTE_MIN(list->num, num); + for (i = 0, s = LIST_FIRST(&list->head); + i != n; + i++, s = LIST_NEXT(s, link)) { + rs[i] = s; + } + + /* we retrieved all free entries */ + if (s == NULL) + LIST_INIT(&list->head); + else + LIST_FIRST(&list->head) = s; + + list->num -= n; + return n; +} + +static inline struct tldk_sock * +get_sock(struct tldk_sock_list *list) +{ + struct tldk_sock *s; + + if (get_socks(list, &s, 1) != 1) + return NULL; + + return s; +} + +static inline void +put_socks(struct tldk_sock_list *list, struct tldk_sock *fs[], uint32_t num) +{ + uint32_t i; + + for (i = 0; i != num; i++) + LIST_INSERT_HEAD(&list->head, fs[i], link); + list->num += num; +} + +static inline void +put_sock(struct tldk_sock_list *list, struct tldk_sock *s) +{ + put_socks(list, &s, 1); +} + +static inline void +rem_sock(struct tldk_sock_list *list, struct tldk_sock *s) +{ + LIST_REMOVE(s, link); + list->num--; +} + +static void +term_sock(struct tldk_sock *ts) +{ + tle_event_idle(ts->erev); + tle_event_idle(ts->rxev); + tle_event_idle(ts->txev); + tle_tcp_stream_close(ts->s); + ts->s = NULL; + ts->posterr = 0; +} + +static int32_t +close_sock(struct tldk_sock *ts) +{ + if (ts->s == NULL) + return EBADF; + term_sock(ts); + rem_sock(&stbl.use, ts); + put_sock(&stbl.free, ts); + return 0; +} + +static void +dump_sock_stats(void) +{ + RTE_LOG(NOTICE, USER1, "%s(worker=%lu)={\n" + "nb_accept=%" PRIu64 ";\n" + "nb_close=%" PRIu64 ";\n" + "nb_readv=%" PRIu64 ";\n" + "nb_recv=%" PRIu64 ";\n" + "nb_setopts=%" PRIu64 ";\n" + "nb_shutdown=%" PRIu64 ";\n" + "nb_writev=%" PRIu64 ";\n" + "};\n", + __func__, + ngx_worker, + sock_stat.nb_accept, + sock_stat.nb_close, + sock_stat.nb_readv, + sock_stat.nb_recv, + sock_stat.nb_setopts, + sock_stat.nb_shutdown, + sock_stat.nb_writev); +} + +void +tldk_stbl_fini(void) +{ + dump_sock_stats(); + tldk_dump_event_stats(); + rte_free(stbl.sd); + tle_evq_destroy(stbl.txeq); + tle_evq_destroy(stbl.rxeq); + tle_evq_destroy(stbl.ereq); + tle_evq_destroy(stbl.syneq); +} + +#define INIT_FUNC(func) do { \ + real_##func = dlsym(RTLD_NEXT, #func); \ + RTE_ASSERT(real_##func); \ +} while (0) + +static void __attribute__((constructor)) +stub_init(void) +{ + INIT_FUNC(accept4); + INIT_FUNC(close); + INIT_FUNC(readv); + INIT_FUNC(recv); + INIT_FUNC(setsockopt); + INIT_FUNC(shutdown); + INIT_FUNC(writev); +} + +#undef INIT_FUNC + +int +tldk_stbl_init(const ngx_cycle_t *cycle, const struct tldk_ctx *tc) +{ + uint32_t i, lc, sid, sn; + size_t sz; + struct tle_evq_param eprm; + struct rlimit rlim; + + lc = tc->cf->lcore; + sn = tc->cf->nb_stream; + sid = rte_lcore_to_socket_id(lc); + + if (sn < cycle->listening.nelts + cycle->connection_n) + return -EINVAL; + + if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) + return -errno; + + stbl.nosd = rlim.rlim_max; + + /* allocate event queues */ + + memset(&eprm, 0, sizeof(eprm)); + eprm.socket_id = sid; + eprm.max_events = sn; + + stbl.syneq = tle_evq_create(&eprm); + stbl.ereq = tle_evq_create(&eprm); + stbl.rxeq = tle_evq_create(&eprm); + stbl.txeq = tle_evq_create(&eprm); + + RTE_LOG(NOTICE, USER1, "%s(lcore=%u, worker=%lu): " + "synevq=%p, erevq=%p, rxevq=%p, txevq=%p\n", + __func__, lc, ngx_worker, + stbl.syneq, stbl.ereq, stbl.rxeq, stbl.txeq); + if (stbl.syneq == NULL || stbl.ereq == NULL || stbl.rxeq == NULL || + stbl.txeq == NULL) + return -ENOMEM; + + LIST_INIT(&stbl.lstn.head); + LIST_INIT(&stbl.free.head); + LIST_INIT(&stbl.use.head); + + sz = sn * sizeof(*stbl.sd); + stbl.sd = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + rte_lcore_to_socket_id(lc)); + + if (stbl.sd == NULL) { + RTE_LOG(ERR, USER1, "%s(lcore=%u, worker=%lu): " + "failed to allocate %zu bytes\n", + __func__, lc, ngx_worker, sz); + return -ENOMEM; + } + + stbl.snum = sn; + + /* init listen socks */ + for (i = 0; i != cycle->listening.nelts; i++) { + stbl.sd[i].rxev = tle_event_alloc(stbl.syneq, stbl.sd + i); + stbl.sd[i].txev = tle_event_alloc(stbl.txeq, stbl.sd + i); + stbl.sd[i].erev = tle_event_alloc(stbl.ereq, stbl.sd + i); + put_sock(&stbl.lstn, stbl.sd + i); + } + + /* init worker connection socks */ + for (; i != sn; i++) { + stbl.sd[i].rxev = tle_event_alloc(stbl.rxeq, stbl.sd + i); + stbl.sd[i].txev = tle_event_alloc(stbl.txeq, stbl.sd + i); + stbl.sd[i].erev = tle_event_alloc(stbl.ereq, stbl.sd + i); + put_sock(&stbl.free, stbl.sd + i); + } + + return 0; +} + +int +tldk_open_bind_listen(struct tldk_ctx *tcx, int domain, int type, + const struct sockaddr *addr, socklen_t addrlen, int backlog) +{ + int32_t rc; + struct tldk_sock *ts; + struct tle_tcp_stream_param sprm; + + ts = get_sock(&stbl.lstn); + if (ts == NULL) { + errno = ENOBUFS; + return -1; + } + + tle_event_active(ts->erev, TLE_SEV_DOWN); + tle_event_active(ts->rxev, TLE_SEV_DOWN); + tle_event_active(ts->txev, TLE_SEV_DOWN); + + /* setup stream parameters */ + + memset(&sprm, 0, sizeof(sprm)); + + sprm.cfg.err_ev = ts->erev; + sprm.cfg.recv_ev = ts->rxev; + sprm.cfg.send_ev = ts->txev; + + memcpy(&sprm.addr.local, addr, addrlen); + sprm.addr.remote.ss_family = sprm.addr.local.ss_family; + + ts->s = tle_tcp_stream_open(tcx->ctx, &sprm); + if (ts->s != NULL) + rc = tle_tcp_stream_listen(ts->s); + else + rc = -rte_errno; + + if (rc != 0) { + term_sock(ts); + put_sock(&stbl.lstn, ts); + errno = -rc; + return -1; + } + + return SOCK_TO_SD(ts); +} + +/* + * socket API + */ + +int +close(int sd) +{ + int32_t rc; + struct tldk_sock *ts; + + FE_TRACE("worker#%lu: %s(%d);\n", + ngx_worker, __func__, sd); + + ts = sd_to_sock(sd); + if (ts == NULL) + return real_close(sd); + + sock_stat.nb_close++; + + rc = close_sock(ts); + if (rc != 0) { + errno =-rc; + return -1; + } + return 0; +} + +int +shutdown(int sd, int how) +{ + struct tldk_sock *ts; + + FE_TRACE("worker#%lu: %s(%d, %#x);\n", + ngx_worker, __func__, sd, how); + + ts = sd_to_sock(sd); + if (ts == NULL) + return real_shutdown(sd, how); + + sock_stat.nb_shutdown++; + + errno = ENOTSUP; + return -1; +} + + +int +accept4(int sd, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + uint32_t n, slen; + struct tle_stream *s; + struct tldk_sock *cs, *ts; + struct tle_tcp_stream_cfg prm; + struct tle_tcp_stream_addr sa; + + FE_TRACE("worker#%lu: %s(%d, %p, %p, %#x);\n", + ngx_worker, __func__, sd, addr, addrlen, flags); + + ts = sd_to_sock(sd); + if (ts == NULL) + return real_accept4(sd, addr, addrlen, flags); + else if (ts->s == NULL) { + errno = EBADF; + return -1; + } + + sock_stat.nb_accept++; + + n = ts->acpt.num; + if (n == 0) { + n = tle_tcp_stream_accept(ts->s, ts->acpt.buf, + RTE_DIM(ts->acpt.buf)); + if (n == 0) { + errno = EAGAIN; + return -1; + } + } + + s = ts->acpt.buf[n - 1]; + ts->acpt.num = n - 1; + + tle_event_raise(ts->rxev); + + cs = get_sock(&stbl.free); + if (cs == NULL) { + tle_tcp_stream_close(s); + errno = ENOBUFS; + return -1; + } + + cs->s = s; + put_sock(&stbl.use, cs); + + tle_event_active(cs->erev, TLE_SEV_DOWN); + tle_event_active(cs->rxev, TLE_SEV_DOWN); + tle_event_active(cs->txev, TLE_SEV_DOWN); + + memset(&prm, 0, sizeof(prm)); + prm.recv_ev = cs->rxev; + prm.send_ev = cs->txev; + prm.err_ev = cs->erev; + tle_tcp_stream_update_cfg(&s, &prm, 1); + + if (tle_tcp_stream_get_addr(s, &sa) == 0) { + + if (sa.remote.ss_family == AF_INET) + slen = sizeof(struct sockaddr_in); + else if (sa.remote.ss_family == AF_INET6) + slen = sizeof(struct sockaddr_in6); + else + slen = 0; + + slen = RTE_MIN(slen, *addrlen); + memcpy(addr, &sa.remote, slen); + *addrlen = slen; + } + + return SOCK_TO_SD(cs); +} + +ssize_t +recv(int sd, void *buf, size_t len, int flags) +{ + ssize_t sz; + struct tldk_sock *ts; + struct iovec iv; + + FE_TRACE("worker#%lu: %s(%d, %p, %zu, %#x);\n", + ngx_worker, __func__, sd, buf, len, flags); + + ts = sd_to_sock(sd); + if (ts == NULL) + return real_recv(sd, buf, len, flags); + else if (ts->s == NULL) { + errno = EBADF; + return -1; + } + + sock_stat.nb_recv++; + + iv.iov_base = buf; + iv.iov_len = len; + + sz = tle_tcp_stream_readv(ts->s, &iv, 1); + if (sz < 0) + errno = rte_errno; + else if (sz == 0 && ts->posterr == 0) { + errno = EAGAIN; + sz = -1; + } + + FE_TRACE("worker#%lu: %s(%d, %p, %zu, %#x) returns %zd;\n", + ngx_worker, __func__, sd, buf, len, flags, sz); + return sz; +} + +ssize_t +readv(int sd, const struct iovec *iov, int iovcnt) +{ + ssize_t sz; + struct tldk_sock *ts; + struct tldk_ctx *tcx; + + FE_TRACE("worker#%lu: %s(%d, %p, %d);\n", + ngx_worker, __func__, sd, iov, iovcnt); + + tcx = wrk2ctx + ngx_worker; + ts = sd_to_sock(sd); + if (ts == NULL) + return real_readv(sd, iov, iovcnt); + else if (ts->s == NULL || tcx == NULL) { + errno = EBADF; + return -1; + } + + sock_stat.nb_readv++; + + sz = tle_tcp_stream_readv(ts->s, iov, iovcnt); + if (sz < 0) + errno = rte_errno; + else if (sz == 0 && ts->posterr == 0) { + errno = EAGAIN; + sz = -1; + } + + FE_TRACE("worker#%lu: %s(%d, %p, %d) returns %zd;\n", + ngx_worker, __func__, sd, iov, iovcnt, sz); + return sz; +} + +ssize_t +writev(int sd, const struct iovec *iov, int iovcnt) +{ + ssize_t sz; + struct tldk_sock *ts; + struct tldk_ctx *tcx; + + FE_TRACE("worker#%lu: %s(%d, %p, %d);\n", + ngx_worker, __func__, sd, iov, iovcnt); + + tcx = wrk2ctx + ngx_worker; + ts = sd_to_sock(sd); + if (ts == NULL) + return real_writev(sd, iov, iovcnt); + else if (ts->s == NULL || tcx == NULL) { + errno = EBADF; + return -1; + } + + sock_stat.nb_writev++; + + sz = tle_tcp_stream_writev(ts->s, tcx->mpool, iov, iovcnt); + if (sz < 0) + errno = rte_errno; + + FE_TRACE("worker#%lu: %s(%d, %p, %d) returns %zd;\n", + ngx_worker, __func__, sd, iov, iovcnt, sz); + return sz; +} + +int +setsockopt(int sd, int level, int optname, const void *optval, socklen_t optlen) +{ + struct tldk_sock *ts; + + FE_TRACE("worker#%lu: %s(%d, %#x, %#x, %p, %d);\n", + ngx_worker, __func__, sd, level, optname, optval, optlen); + + ts = sd_to_sock(sd); + if (ts == NULL) + return real_setsockopt(sd, level, optname, optval, optlen); + else if (ts->s == NULL) { + errno = EBADF; + return -1; + } + + return 0; +} -- cgit 1.2.3-korg