diff options
Diffstat (limited to 'lib/libtle_glue/tcp.c')
-rw-r--r-- | lib/libtle_glue/tcp.c | 558 |
1 files changed, 558 insertions, 0 deletions
diff --git a/lib/libtle_glue/tcp.c b/lib/libtle_glue/tcp.c new file mode 100644 index 0000000..e5186c0 --- /dev/null +++ b/lib/libtle_glue/tcp.c @@ -0,0 +1,558 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdarg.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/tcp.h> +#include <netinet/udp.h> + +#include <tle_tcp.h> + +#include "sym.h" +#include "fd.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "sock.h" + +#define MAX_TCP_KEEPIDLE 32767 +#define MAX_TCP_KEEPINTVL 32767 +#define MAX_TCP_KEEPCNT 127 + +static inline void +foo_support(const char *msg) +{ + GLUE_LOG(WARNING, "%s, return ok without really supporting it", msg); +} + +static int +tcp_setsockopt(struct sock *sk, int optname, + const void *optval, socklen_t optlen) +{ + int val; + + val = 0; /* just to make compiler happy */ + if (optlen == sizeof(val)) + val = *(const int *)optval; + + /* man tcp(7) or see /usr/include/netinet/tcp.h */ + switch (optname) { + case TCP_NODELAY: /* antonym: TCP_CORK */ + if (val == 0) + sk->option.tcpnodelay = 0; + else + sk->option.tcpnodelay = 1; + if (sk->s != NULL) + sk->s->option.tcpnodelay = sk->option.tcpnodelay; + return 0; + case TCP_CORK: + if (val == 0) + sk->option.tcpcork = 0; + else + sk->option.tcpcork = 1; + if (sk->s != NULL) + sk->s->option.tcpcork = sk->option.tcpcork; + return 0; + case TCP_KEEPIDLE: + if (val <= 0 || val > MAX_TCP_KEEPIDLE) { + errno = EINVAL; + return -1; + } + sk->option.keepidle = val; + if (sk->s != NULL) { + sk->s->option.keepidle = sk->option.keepidle; + tle_tcp_stream_set_keepalive(sk->s); + } + return 0; + case TCP_KEEPINTVL: + if (val <= 0 || val > MAX_TCP_KEEPINTVL) { + errno = EINVAL; + return -1; + } + sk->option.keepintvl = val; + if (sk->s != NULL) { + sk->s->option.keepintvl = sk->option.keepintvl; + tle_tcp_stream_set_keepalive(sk->s); + } + return 0; + case TCP_KEEPCNT: + if (val <= 0 || val > MAX_TCP_KEEPCNT) { + errno = EINVAL; + return -1; + } + sk->option.keepcnt = val; + if (sk->s != NULL) + sk->s->option.keepcnt = sk->option.keepcnt; + return 0; + case TCP_USER_TIMEOUT: + foo_support("set TCP_USER_TIMEOUT"); + return 0; + case TCP_DEFER_ACCEPT: + if (val == 0) + return 0; + break; + case TCP_FASTOPEN: + case TCP_FASTOPEN_CONNECT: + if (val == 0) + return 0; + break; + case TCP_QUICKACK: + /* Based on below info, it's safe to just return 0: + * "This flag is not permanent, it only enables a + * switch to or from quickack mode. Subsequent + * operationof the TCP protocol will once again ..." + */ + if (val == 0) + sk->option.tcpquickack = 0; + else + sk->option.tcpquickack = 8; + if (sk->s != NULL) + sk->s->option.tcpquickack = sk->option.tcpquickack; + return 0; + case TCP_CONGESTION: + /* only support NewReno; but we return success for + * any kind of setting. + */ + foo_support("set TCP_CONGESTION"); + return 0; + default: + break; + } + + GLUE_LOG(WARNING, "setsockopt(%d) with level = SOL_TCP, optname = %d\n", + sock2fd(sk), optname); + errno = EOPNOTSUPP; + return -1; +} + +static int +tcp_getsockopt(struct sock *sk, int optname, + void *optval, socklen_t *optlen) +{ + int rc; + union { + int val; + uint64_t val64; + struct linger ling; + struct timeval tm; + } *p = optval; + + RTE_SET_USED(optlen); + + /* man tcp(7) or see /usr/include/netinet/tcp.h */ + switch (optname) { + case TCP_MAXSEG: + p->val = 64 * 1024; + return 0; + case TCP_FASTOPEN: + case TCP_FASTOPEN_CONNECT: + p->val = 0; + return 0; + case TCP_INFO: + /* needed by netperf */ + rc = tle_tcp_stream_get_info(sk->s, optval, optlen); + if (rc < 0) { + errno = -rc; + return -1; + } + return 0; + case TCP_CONGESTION: + strncpy(optval, "NewReno", *optlen); + ((char *)optval)[*optlen - 1] = '\0'; + return 0; + case TCP_CORK: + p->val = sk->option.tcpcork; + return 0; + case TCP_QUICKACK: + p->val = sk->option.tcpquickack != 0 ? 1 : 0; + return 0; + case TCP_NODELAY: + p->val = sk->option.tcpnodelay; + return 0; + case TCP_KEEPIDLE: + p->val = sk->option.keepidle; + return 0; + case TCP_KEEPINTVL: + p->val = sk->option.keepintvl; + return 0; + case TCP_KEEPCNT: + p->val = sk->option.keepcnt; + return 0; + default: + break; + } + + GLUE_LOG(WARNING, "getsockopt(%d) with level = SOL_TCP, optname = %d", + sock2fd(sk), optname); + errno = EOPNOTSUPP; + return -1; +} + +static int +tcp_getname(struct sock *sk, struct sockaddr *addr, int peer) +{ + int rc; + int addrlen; + struct tle_tcp_stream_addr a; + + rc = tle_tcp_stream_get_addr(sk->s, &a); + if (rc) { + errno = -rc; + return -1; + } + + if (a.local.ss_family == AF_INET) + addrlen = sizeof(struct sockaddr_in); + else + addrlen = sizeof(struct sockaddr_in6); + + if (peer) + memcpy(addr, &a.remote, addrlen); + else + memcpy(addr, &a.local, addrlen); + + addr->sa_family = a.local.ss_family; + + return 0; +} + +static int +tcp_bind(struct sock *sk, const struct sockaddr *addr) +{ + sk->s = open_bind(sk, addr, NULL); + if (sk->s == NULL) + return -1; + return 0; +} + +static int +tcp_listen(struct sock *sk, int backlog) +{ + int32_t rc; + + if (backlog < 0) { + errno = EINVAL; + return -1; + } + + /* + * if socket is unbind, should call open_bind to assign an ramdon addres + * before listening + */ + if (sk->s == NULL) { + sk->s = open_bind(sk, NULL, NULL); + if (sk->s == NULL) + return -1; + } + + rc = tle_tcp_stream_listen(sk->s); + if (rc) { + errno = -rc; + return -1; + } + + return 0; +} + +static int +tcp_connect(struct sock *sk, const struct sockaddr *addr) +{ + int rc; + int rx; + int ret; + struct epoll_event event; + struct sockaddr_storage laddr; + struct sockaddr_storage raddr; + struct sockaddr_in *addr4; + struct sockaddr_in6 *addr6; + struct sockaddr *local = NULL; + + /* TODO: For multi-thread case, we shall properly manage local + * L4 port so that packets coming back can be put into the same + * queue pair. + */ + if (sk->s) { + struct tle_tcp_stream *ts = TCP_STREAM(sk->s); + /* case 1: bind happens before connect; + * case 2: connect after a previous connect, failed + * or succeeded. + */ + if (ts->tcb.err != 0) { + errno = ts->tcb.err; + return -1; + } + + int state = ts->tcb.state; + + if (state >= TCP_ST_ESTABLISHED && sk->tcp_connected == 0) { + sk->tcp_connected = 1; + return 0; /* connect succeeds */ + } + + if (state == TCP_ST_CLOSED) { + if (tcp_getname(sk, (struct sockaddr *)&laddr, 0) == 0) + local = (struct sockaddr *)&laddr; + tle_tcp_stream_close(sk->s); + sk->s = NULL; + goto do_connect; /* case 1 */ + } else if (state >= TCP_ST_SYN_SENT && + state < TCP_ST_ESTABLISHED) + errno = EALREADY; + else if (state >= TCP_ST_ESTABLISHED) + errno = EISCONN; + else + errno = EINVAL; + return -1; + } + +do_connect: + sk->s = open_bind(sk, local, addr); + if (sk->s == NULL) /* errno is set */ + return -1; + + if (sk->domain == AF_INET) { + addr4 = (struct sockaddr_in*)&raddr; + addr4->sin_family = AF_INET; + addr4->sin_port = sk->s->port.src; + addr4->sin_addr.s_addr = sk->s->ipv4.addr.src; + } else { + addr6 = (struct sockaddr_in6*)&raddr; + addr6->sin6_family = AF_INET6; + addr6->sin6_port = sk->s->port.src; + rte_memcpy(&addr6->sin6_addr, &sk->s->ipv6.addr.src, + sizeof(struct in6_addr)); + } + rc = tle_tcp_stream_connect(sk->s, (const struct sockaddr*)&raddr); + if (rc < 0) { + errno = -rc; + return -1; + } + + if (is_nonblock(sk, 0)) { + be_tx_with_lock(CTX(sk)); + errno = EINPROGRESS; /* It could not be ready so fast */ + return -1; + } + + do { + be_process(CTX(sk)); + + if (tle_event_state(&sk->txev) == TLE_SEV_UP) { + sk->tcp_connected = 1; + tle_event_down(&sk->txev); + ret = 0; + break; + } + + if (tle_event_state(&sk->erev) == TLE_SEV_UP) { + tle_event_down(&sk->erev); + errno = ECONNREFUSED; + ret = -1; + break; + } + + /* fix me: timeout? */ + epoll_kernel_wait(CTX(sk), -1, &event, 1, 1, &rx); + } while (1); + + return ret; +} + +static void tcp_update_cfg(struct sock *sk); + +static int +tcp_accept(struct sock *sk, struct sockaddr *addr, + socklen_t *addrlen, int flags) +{ + int fd; + int rx; + struct sock *newsk; + struct tle_stream *rs; + struct epoll_event event; + struct tle_tcp_stream_addr a; + + if (sk->s == NULL) { + errno = EINVAL; + return -1; + } + + fd = get_unused_fd(); + if (fd < 0) { + errno = ENFILE; + return -1; + } + + newsk = fd2sock(fd); +again: + if (tle_tcp_stream_accept(sk->s, &rs, 1) == 0) { + if (rte_errno != EAGAIN) { + errno = rte_errno; + return -1; + } + + if (is_nonblock(sk, flags)) { + newsk->valid = 0; + put_free_fd(fd); + errno = EAGAIN; + return -1; + } + + epoll_kernel_wait(CTX(sk), -1, &event, 1, 1, &rx); + be_process(CTX(sk)); + goto again; + } + + newsk->s = rs; + newsk->cid = sk->cid; + newsk->domain = sk->domain; + newsk->proto = sk->proto; + newsk->option.raw = 0; + newsk->option.tcpquickack = 1; + newsk->option.mulloop = 1; + newsk->option.multtl = 1; + newsk->option.keepidle = 2 * 60 * 60; + newsk->option.keepintvl = 75; + newsk->option.keepcnt = 9; + newsk->s->option.raw = newsk->option.raw; + sock_alloc_events(newsk); + tcp_update_cfg(newsk); + + if (addr) { + /* We assume this function never fails */ + tle_tcp_stream_get_addr(rs, &a); + + *addrlen = sizeof(struct sockaddr_in); + memcpy(addr, &a.remote, *addrlen); + } + + GLUE_DEBUG("accept fd = %d", fd); + return fd; +} + +static ssize_t +tcp_send(struct sock *sk, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + uint16_t rc; + RTE_SET_USED(dst_addr); + + if (sk->s == NULL) { + errno = EPIPE; + return 0; + } + + rc = tle_tcp_stream_send(sk->s, pkt, num); + if (rc == 0) + errno = rte_errno; + return rc; +} + +static ssize_t +tcp_recv(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num, struct sockaddr *addr) +{ + uint16_t rc; + + RTE_SET_USED(addr); + + /* optimize me: merge multiple mbufs into one */ + rc = tle_tcp_stream_recv(s, pkt, num); + if (rc == 0) + errno = rte_errno; + + return rc; +} + +static ssize_t +tcp_readv(struct tle_stream *ts, struct msghdr *msg, int flags __rte_unused) +{ + ssize_t rc; + + rc = tle_tcp_stream_recvmsg(ts, msg); + if (rc < 0) + errno = rte_errno; + return rc; +} + +static ssize_t +tcp_writev(struct sock *sk, const struct iovec *iov, + int iovcnt, const struct sockaddr *dst_addr) +{ + ssize_t rc; + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + + RTE_SET_USED(dst_addr); + + if (sk->s == NULL) { + errno = EPIPE; + return -1; + } + + rc = tle_tcp_stream_writev(sk->s, mp, iov, iovcnt); + if (rc < 0) + errno = rte_errno; + return rc; +} + +static int +tcp_shutdown(struct sock *sk, int how) +{ + int ret; + + /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */ + if (how == SHUT_RD) + return 0; + + ret = tle_tcp_stream_shutdown(sk->s, how); + if (ret < 0) + errno = rte_errno; + else + be_tx_with_lock(CTX(sk)); /* Make sure fin is sent */ + return ret; + +} + +static void +tcp_update_cfg(struct sock *sk) +{ + struct tle_tcp_stream_cfg prm = {0}; + + prm.recv_ev = &sk->rxev; + prm.send_ev = &sk->txev; + prm.err_ev = &sk->erev; + tle_tcp_stream_update_cfg(&sk->s, &prm, 1); +} + +struct proto tcp_prot = { + .name = "TCP", + .setsockopt = tcp_setsockopt, + .getsockopt = tcp_getsockopt, + .getname = tcp_getname, + .bind = tcp_bind, + .listen = tcp_listen, + .connect = tcp_connect, + .accept = tcp_accept, + .recv = tcp_recv, + .send = tcp_send, + .readv = tcp_readv, + .writev = tcp_writev, + .shutdown = tcp_shutdown, + .close = tle_tcp_stream_close, + .update_cfg = tcp_update_cfg, +}; |