diff options
Diffstat (limited to 'lib/libtle_glue')
32 files changed, 8983 insertions, 0 deletions
diff --git a/lib/libtle_glue/Makefile b/lib/libtle_glue/Makefile new file mode 100644 index 0000000..13ceb82 --- /dev/null +++ b/lib/libtle_glue/Makefile @@ -0,0 +1,62 @@ +# Copyright (c) 2018 Ant Financial Services Group. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overwritten by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = libtle_glue.a + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) + +EXPORT_MAP := tle_glue_version.map + +LIBABIVER := 1 + +# source files +SRCS-y += fd.c +SRCS-y += ctx.c +SRCS-y += arp.c +SRCS-y += icmp.c +SRCS-y += rxcb.c +SRCS-y += port.c +SRCS-y += sym.c +SRCS-y += init.c +SRCS-y += be.c +SRCS-y += epoll.c +SRCS-y += socket.c +SRCS-y += rxtx.c +SRCS-y += poll.c +SRCS-y += util.c +SRCS-y += tcp.c +SRCS-y += udp.c +SRCS-y += select.c + +ifeq ($(PACKETDRILL),y) +SRCS-y += packetdrill.c +endif + +# install this header file +SYMLINK-y-include += tle_glue.h + +# this lib dependencies +DEPDIRS-y += lib/libtle_l4p + +include $(TLDK_ROOT)/mk/tle.lib.mk diff --git a/lib/libtle_glue/arp.c b/lib/libtle_glue/arp.c new file mode 100644 index 0000000..9b13d9e --- /dev/null +++ b/lib/libtle_glue/arp.c @@ -0,0 +1,935 @@ +/* + * Copyright (c) 2019 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/icmp6.h> + +#include <rte_ethdev.h> +#include <rte_arp.h> +#include <rte_ip.h> +#include <rte_hash.h> +#include <rte_byteorder.h> + +#include "log.h" +#include "ctx.h" +#include "internal.h" +#include "tle_timer.h" +#include "util.h" +#include "ndp.h" +#include "gateway.h" + +#define IPV6_MULTI_MASK_LEN 13 + +const struct in6_addr ipv6_all_multi = {{{ + 0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01 +}}}; + +const struct in6_addr ipv6_multi_mask = {{{ + 0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 +}}}; + +static inline void +set_multicast_mac_v6(struct ether_addr *addr, const struct in6_addr *ip6_addr) +{ + unaligned_uint16_t *ea_words = (unaligned_uint16_t *)addr; + + ea_words[0] = 0x3333; + ea_words[1] = ip6_addr->__in6_u.__u6_addr16[6]; + ea_words[2] = ip6_addr->__in6_u.__u6_addr16[7]; +} + +static inline void +set_multicast_ipv6(uint8_t ipv6[16]) +{ + rte_memcpy(ipv6, &ipv6_multi_mask, IPV6_MULTI_MASK_LEN); +} + +static inline void +set_broadcast_addr(struct ether_addr *addr) +{ + unaligned_uint16_t *ea_words = (unaligned_uint16_t *)addr; + + ea_words[0] = 0xFFFF; + ea_words[1] = 0xFFFF; + ea_words[2] = 0xFFFF; +} + +static inline bool +match_addr(struct glue_ctx *ctx, struct rte_mbuf *pkt, const struct in_addr *addr) +{ + struct ipv4_hdr *ip4h; + const struct in_addr *gw; + + ip4h = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *, pkt->l2_len); + if ((ip4h->version_ihl >> 4) != 4) + return false; + + gw = ipv4_gateway_lookup(ctx, (struct in_addr *)&ip4h->dst_addr); + if (gw->s_addr != addr->s_addr) + return false; + + return true; +} + +static inline bool +match_addr6(struct glue_ctx *ctx, struct rte_mbuf *pkt, + const struct in6_addr *addr) +{ + struct ipv6_hdr *ip6h; + const struct in6_addr *gw; + + ip6h = rte_pktmbuf_mtod_offset(pkt, struct ipv6_hdr *, pkt->l2_len); + if (((ip6h->vtc_flow & 0xffffff00) >> 4) != 6) + return false; + + gw = ipv6_gateway_lookup(ctx, (struct in6_addr *)&ip6h->dst_addr); + if (memcmp(gw, addr, sizeof(struct in6_addr)) != 0) + return false; + + return true; +} + +static inline void +send_pkts(struct glue_ctx *ctx, struct rte_mbuf **pkts, uint16_t nb, + const char *prefix) +{ + uint16_t i, sent; + + sent = rte_eth_tx_burst(ctx->port_id, ctx->queue_id, pkts, nb); + for (i = sent; i < nb; i++) + rte_pktmbuf_free(pkts[i]); + + RTE_SET_USED(prefix); + TRACE("%s, send %u/%u pkts", prefix, sent, nb); +} + +static void +flush_arp_wait(int af, struct glue_ctx *ctx, const void *addr, + struct ether_addr *e_addr) +{ + struct rte_mbuf *pkt, *pre, *pkts[MAX_PKTS_BURST]; + struct ether_hdr *eth; + uint32_t nb_pkts; + + pre = NULL; + nb_pkts = 0; + for (pkt = ctx->arp_wait; pkt; pkt = pkt->next_pkt) { + if ((af == AF_INET && + !match_addr(ctx, pkt, (const struct in_addr *)addr)) || + (af == AF_INET6 && + !match_addr6(ctx, pkt, (const struct in6_addr *)addr))) { + pre = pkt; + continue; + } + + if (pre == NULL) + ctx->arp_wait = pkt->next_pkt; + else + pre->next_pkt = pkt->next_pkt; + eth = rte_pktmbuf_mtod(pkt, struct ether_hdr *); + ether_addr_copy(e_addr, ð->d_addr); + pkts[nb_pkts++] = pkt; + if (nb_pkts == MAX_PKTS_BURST) { + send_pkts(ctx, pkts, nb_pkts, "ARP learned"); + nb_pkts = 0; + } + } + if (nb_pkts) + send_pkts(ctx, pkts, nb_pkts, "ARP learned"); +} + +static inline void +ipv4_dst_set(struct glue_ctx *ctx, struct tle_dest *dst, + const struct in_addr *addr, struct ether_addr *e_addr) +{ + struct ether_hdr *eth; + struct ipv4_hdr *ip4h; + + if (is_ipv4_loopback_addr(addr->s_addr, ctx)) + dst->mtu = MTU_LOOPBACK; + else + dst->mtu = MTU_NORMAL; + dst->l2_len = sizeof(*eth); + dst->head_mp = get_mempool_by_socket(0); /* fix me */ + + eth = (struct ether_hdr *)dst->hdr; + ether_addr_copy(&ctx->mac, ð->s_addr); + if (e_addr == NULL) + set_broadcast_addr(ð->d_addr); + else + ether_addr_copy(e_addr, ð->d_addr); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + + dst->l3_len = sizeof(*ip4h); + ip4h = (struct ipv4_hdr *)(eth + 1); + ip4h->dst_addr = addr->s_addr; + ip4h->version_ihl = 4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER; + ip4h->time_to_live = 64; + ip4h->next_proto_id = IPPROTO_TCP; +} + +static inline void +ipv6_dst_set(struct glue_ctx *ctx, struct tle_dest *dst, + const struct in6_addr *addr, struct ether_addr *e_addr) +{ + struct ether_hdr *eth; + struct ipv6_hdr *ip6h; + + if (is_ipv6_loopback_addr(addr, ctx)) + dst->mtu = MTU_LOOPBACK; + else + dst->mtu = MTU_NORMAL; + dst->l2_len = sizeof(*eth); + dst->head_mp = get_mempool_by_socket(0); /* fix me */ + + eth = (struct ether_hdr *)dst->hdr; + ether_addr_copy(&ctx->mac, ð->s_addr); + if (e_addr == NULL) + set_broadcast_addr(ð->d_addr); + else + ether_addr_copy(e_addr, ð->d_addr); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); + + dst->l3_len = sizeof(*ip6h); + ip6h = (struct ipv6_hdr *)(eth + 1); + rte_memcpy(ip6h->dst_addr, addr, sizeof(struct in6_addr)); + ip6h->vtc_flow = 6 << 4; + ip6h->hop_limits = 255; + ip6h->proto = IPPROTO_TCP; +} + +#define arp_timer(ctx, entry, interval) \ + tle_timer_start(ctx->arp_tmw, entry, interval) + +void +ipv4_dst_add(struct glue_ctx *ctx, const struct in_addr *addr, + struct ether_addr *e_addr) +{ + struct arp_entry *entry; + struct tle_dest *dst; + struct ether_hdr *eth; + uint64_t idx; + bool check_wait; + int rc; + + rc = rte_hash_lookup_data(ctx->arp_hash, addr, (void**)&idx); + if (rc >= 0) { + entry = &ctx->arp4[idx]; + dst = &entry->dst; + eth = (struct ether_hdr *)dst->hdr; + check_wait = is_broadcast_ether_addr(ð->d_addr); + + /* update arp entry, reset timer */ + ether_addr_copy(e_addr, ð->d_addr); + print_arp(AF_INET, addr, ð->d_addr, "UPDATE"); + if(entry->timer != NULL) + tle_timer_stop(ctx->arp_tmw, entry->timer); + entry->timer = arp_timer(ctx, entry, ARP_ENTRY_EXPIRE); + entry->inuse = 0; + entry->req_time = 0; + + if(check_wait) + flush_arp_wait(AF_INET, ctx, addr, e_addr); + + return; + } + + idx = ctx->arp4_num; + entry = &ctx->arp4[idx]; + dst = &entry->dst; + + ipv4_dst_set(ctx, dst, addr, e_addr); + if (e_addr == NULL) { + entry->timer = arp_timer(ctx, entry, ARP_REQUEST_EXPIRE); + entry->req_time = 1; + } else { + entry->timer = arp_timer(ctx, entry, ARP_ENTRY_EXPIRE); + entry->inuse = 0; + } + + rc = rte_hash_add_key_data(ctx->arp_hash, addr, (void *)idx); + if (rc < 0) + rte_panic("Failed to add ARP entry"); + + ctx->arp4_num++; + eth = (struct ether_hdr *)dst->hdr; + print_arp(AF_INET, addr, ð->d_addr, "ADD"); +} + +void +ipv6_dst_add(struct glue_ctx *ctx, const struct in6_addr *addr, + struct ether_addr *e_addr) +{ + struct arp_entry* entry; + struct tle_dest *dst; + struct ether_hdr *eth; + uint64_t idx; + bool check_wait; + int rc; + + rc = rte_hash_lookup_data(ctx->arp6_hash, addr, (void**)&idx); + if (rc >= 0) { + entry = &ctx->arp6[idx]; + dst = &entry->dst; + eth = (struct ether_hdr *)dst->hdr; + check_wait = is_broadcast_ether_addr(ð->d_addr); + + /* update arp entry, reset timer */ + ether_addr_copy(e_addr, ð->d_addr); + print_arp(AF_INET6, addr, ð->d_addr, "UPDATE"); + if(entry->timer != NULL) + tle_timer_stop(ctx->arp_tmw, entry->timer); + entry->timer = arp_timer(ctx, entry, ARP_ENTRY_EXPIRE); + entry->inuse = 0; + entry->req_time = 0; + + if(check_wait) + flush_arp_wait(AF_INET6, ctx, addr, e_addr); + + return; + } + + idx = ctx->arp6_num; + entry = &ctx->arp6[idx]; + dst = &entry->dst; + + ipv6_dst_set(ctx, dst, addr, e_addr); + if (e_addr == NULL) { + entry->timer = arp_timer(ctx, entry, ARP_REQUEST_EXPIRE); + entry->req_time = 1; + } else { + entry->timer = arp_timer(ctx, entry, ARP_ENTRY_EXPIRE); + entry->inuse = 0; + } + + rc = rte_hash_add_key_data(ctx->arp6_hash, addr, (void *)idx); + if (rc < 0) + rte_panic("Failed to add ARP6 entry"); + + eth = (struct ether_hdr *)dst->hdr; + print_arp(AF_INET6, addr, ð->d_addr, "ADD"); + ctx->arp6_num++; +} + +static inline int +arp_ip_exist(const struct rte_hash *h, const void *ip) +{ + return rte_hash_lookup(h, ip) >= 0; +} + +struct rte_mbuf * +ndp_recv(struct glue_ctx *ctx, struct rte_mbuf *m, + uint32_t l2len, uint32_t l3len) +{ + struct ether_hdr *eth_h; + struct ipv6_hdr *ipv6_h; + struct nd_neighbor_solicit *ns_h; + struct nd_opt_hdr *opth; + + eth_h = rte_pktmbuf_mtod(m, struct ether_hdr *); + ipv6_h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *, l2len); + ns_h = rte_pktmbuf_mtod_offset(m, struct nd_neighbor_solicit *, + l2len + l3len); + + if (ipv6_h->payload_len < sizeof(struct nd_neighbor_solicit)) + goto drop; + + /* We only learn mac when: + * 1. Normal NS for my ip, whose TargetAddr is me + * 2. Normal NA to my ip, whose DstIpv6 is me + * 3. Unsolicited NA, and we already have an entry for that IP + */ + + /* NS message */ + if (ns_h->nd_ns_hdr.icmp6_type == ND_NEIGHBOR_SOLICIT) { + /* not support Duplicate Address Detect NS yet */ + if (IN6_IS_ADDR_UNSPECIFIED(ipv6_h->src_addr)) + goto drop; + + if (memcmp(&ns_h->nd_ns_target, &ctx->ipv6, sizeof(ctx->ipv6))) + goto drop; + + /* NS message, target is my ipv6 addr */ + opth = (struct nd_opt_hdr*)(ns_h + 1); + ipv6_dst_add(ctx, (struct in6_addr *)ipv6_h->src_addr, + (struct ether_addr *)(opth + 1)); + + /* response NA message */ + ether_addr_copy(&ctx->mac, ð_h->s_addr); + ether_addr_copy((struct ether_addr*)(opth + 1), + ð_h->d_addr); + + rte_memcpy(ipv6_h->dst_addr, ipv6_h->src_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->src_addr, &ctx->ipv6, + sizeof(struct in6_addr)); + + ns_h->nd_ns_hdr.icmp6_type = ND_NEIGHBOR_ADVERT; + ns_h->nd_ns_hdr.icmp6_dataun.icmp6_un_data8[0] = 0x60; + ns_h->nd_ns_hdr.icmp6_cksum = 0; + + opth->nd_opt_type = ND_OPT_TARGET_LINKLAYER_ADDR; + ether_addr_copy(&ctx->mac, (struct ether_addr*)(opth + 1)); + + ns_h->nd_ns_hdr.icmp6_cksum = rte_ipv6_udptcp_cksum(ipv6_h, ns_h); + + if (m->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(m, ETHER_MIN_LEN - m->pkt_len); + + send_pkts(ctx, &m, 1, "NDP NA reply"); + return NULL; + } + + /* NA message */ + if (memcmp(ipv6_h->dst_addr, &ctx->ipv6, sizeof(ctx->ipv6)) == 0 || + (memcmp(ipv6_h->dst_addr, &ipv6_all_multi, sizeof(ctx->ipv6)) == 0 && + arp_ip_exist(ctx->arp6_hash, &ns_h->nd_ns_target))) { + opth = (struct nd_opt_hdr *)(ns_h + 1); + ipv6_dst_add(ctx, &ns_h->nd_ns_target, + (struct ether_addr *)(opth + 1)); + } + +drop: + rte_pktmbuf_free(m); + return NULL; +} + +struct rte_mbuf * +arp_recv(struct glue_ctx *ctx, struct rte_mbuf *m, uint32_t l2len) +{ + struct ether_hdr *eth; + struct arp_hdr *ahdr; + struct arp_ipv4 *adata; + uint32_t tip; + + eth = rte_pktmbuf_mtod(m, struct ether_hdr *); + ahdr = rte_pktmbuf_mtod_offset(m, struct arp_hdr *, l2len); + + if (ahdr->arp_hrd != rte_be_to_cpu_16(ARP_HRD_ETHER) || + ahdr->arp_pro != rte_be_to_cpu_16(ETHER_TYPE_IPv4)) + goto drop; + + adata = &ahdr->arp_data; + tip = adata->arp_tip; + + /* We only learn mac when: + * 1. tip is me, or + * 2. this is a RARP, and we already have an entry for that IP + */ + if (tip == ctx->ipv4 || + (tip == INADDR_ANY && arp_ip_exist(ctx->arp_hash, &adata->arp_sip))) + ipv4_dst_add(ctx, (struct in_addr *)&adata->arp_sip, + &adata->arp_sha); + + /* We only do ARP reply when: + * 1. tip is me. + */ + if (ahdr->arp_op == rte_be_to_cpu_16(ARP_OP_REQUEST) && + tip == ctx->ipv4) { + eth->d_addr = eth->s_addr; + eth->s_addr = ctx->mac; + ahdr->arp_op = rte_cpu_to_be_16(ARP_OP_REPLY); + + adata->arp_tip = adata->arp_sip; + adata->arp_sip = tip; + + adata->arp_tha = adata->arp_sha; + adata->arp_sha = ctx->mac; + if (m->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(m, ETHER_MIN_LEN - m->pkt_len); + send_pkts(ctx, &m, 1, "ARP reply"); + return NULL; + } +drop: + rte_pktmbuf_free(m); + return NULL; +} + +static void +arp6_send_request(struct glue_ctx *ctx, const struct in6_addr *addr) +{ + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + struct ether_hdr *eth; + struct ipv6_hdr *ip6h; + struct nd_neighbor_solicit *nsh; + struct nd_opt_hdr *opth; + struct ether_addr *sll_addr; + struct rte_mbuf *m; +#ifdef ENABLE_TRACE + char str_ip[64]; +#endif + + m = rte_pktmbuf_alloc(mp); + if (m == NULL) + rte_panic("Failed to alloc mbuf for ndp ns request"); + + eth = (struct ether_hdr *)rte_pktmbuf_append(m, sizeof(*eth)); + ether_addr_copy(&ctx->mac, ð->s_addr); + set_multicast_mac_v6(ð->d_addr, addr); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); + + ip6h = (struct ipv6_hdr*)rte_pktmbuf_append(m, sizeof(struct ipv6_hdr)); + ip6h->vtc_flow = 6 << 4; + ip6h->payload_len = sizeof(struct nd_neighbor_solicit) + + sizeof(struct nd_opt_hdr) + + sizeof(struct ether_addr); + ip6h->proto = IPPROTO_ICMPV6; + ip6h->hop_limits = 255; + rte_memcpy(ip6h->src_addr, &ctx->ipv6, sizeof(struct in6_addr)); + rte_memcpy(ip6h->dst_addr, addr, sizeof(struct in6_addr)); + set_multicast_ipv6(ip6h->dst_addr); + + nsh = (struct nd_neighbor_solicit *)rte_pktmbuf_append(m, sizeof(*nsh)); + nsh->nd_ns_hdr.icmp6_type = ND_NEIGHBOR_SOLICIT; + nsh->nd_ns_hdr.icmp6_code = 0; + nsh->nd_ns_hdr.icmp6_cksum = 0; + nsh->nd_ns_hdr.icmp6_dataun.icmp6_un_data32[0] = 0; + rte_memcpy(&nsh->nd_ns_target, addr, sizeof(struct in6_addr)); + + opth = (struct nd_opt_hdr *)rte_pktmbuf_append(m, sizeof(*opth)); + opth->nd_opt_type = ND_OPT_SOURCE_LINKLAYER_ADDR; + opth->nd_opt_len = 1; + + sll_addr = (struct ether_addr *)rte_pktmbuf_append(m, sizeof(*sll_addr)); + ether_addr_copy(&ctx->mac, sll_addr); + + nsh->nd_ns_hdr.icmp6_cksum = rte_ipv6_udptcp_cksum(ip6h, nsh); + + send_pkts(ctx, &m, 1, "ARP6 request"); +} + +static void +arp_send_request(struct glue_ctx *ctx, const struct in_addr *addr) +{ + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + struct ether_hdr *eth; + struct arp_hdr *ahdr; + struct arp_ipv4 *adata; + struct rte_mbuf *m; + uint16_t pad_len, i; + char *pad; + + m = rte_pktmbuf_alloc(mp); + if (m == NULL) + rte_panic("Failed to alloc mbuf for arp request"); + + eth = (struct ether_hdr *)rte_pktmbuf_append(m, sizeof(*eth)); + ether_addr_copy(&ctx->mac, ð->s_addr); + set_broadcast_addr(ð->d_addr); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + ahdr = (struct arp_hdr *)rte_pktmbuf_append(m, sizeof(*ahdr)); + ahdr->arp_hrd = rte_be_to_cpu_16(ARP_HRD_ETHER); + ahdr->arp_pro = rte_be_to_cpu_16(ETHER_TYPE_IPv4); + ahdr->arp_hln = sizeof(struct ether_addr); + ahdr->arp_pln = sizeof(*addr); + ahdr->arp_op = rte_be_to_cpu_16(ARP_OP_REQUEST); + adata = &ahdr->arp_data; + ether_addr_copy(&ctx->mac, &adata->arp_sha); + adata->arp_sip = ctx->ipv4; + set_broadcast_addr(&adata->arp_tha); + adata->arp_tip = addr->s_addr; + + pad_len = ETHER_MIN_LEN - sizeof(*eth) - sizeof(*ahdr); + pad = rte_pktmbuf_append(m, pad_len); + for (i = 0; i < pad_len; ++i) + pad[i] = 0; + + send_pkts(ctx, &m, 1, "ARP request"); +} + +#define addr2ipv4(addr) (&((const struct sockaddr_in *)addr)->sin_addr) +#define addr2ipv6(addr) (&((const struct sockaddr_in6 *)addr)->sin6_addr) +void +mac_check(struct glue_ctx *ctx, const struct sockaddr *addr) +{ + int rc; + const struct in_addr *addr4 = NULL; + const struct in6_addr *addr6 = NULL; + + if(addr->sa_family == AF_INET) { + addr4 = ipv4_gateway_lookup(ctx, addr2ipv4(addr)); + rc = rte_hash_lookup(ctx->arp_hash, addr4); + } else { + addr6 = ipv6_gateway_lookup(ctx, addr2ipv6(addr)); + rc = rte_hash_lookup(ctx->arp6_hash, addr6); + } + if (rc >= 0) + return; + + if(addr->sa_family == AF_INET) + arp_send_request(ctx, addr4); + else + arp6_send_request(ctx, addr6); +} + +static int +arp_inherit(struct glue_ctx *ctx, const struct in_addr *addr) +{ + struct glue_ctx *next; + struct tle_dest *dst; + struct ether_hdr *eth; + uint64_t idx; + uint16_t i; + int rc; + + for (i = 0; i < nb_ctx; i++) { + next = &ctx_array[i++]; + if (next == NULL || next == ctx) + continue; + + rc = rte_hash_lookup_data(next->arp_hash, addr, (void **)&idx); + if (rc < 0) + continue; + + dst = &next->arp4[idx].dst; + eth = (struct ether_hdr *)dst->hdr; + ipv4_dst_add(ctx, addr, ð->d_addr); + return 0; + } + + return -1; +} + +static int +arp6_inherit(struct glue_ctx *ctx, const struct in6_addr *addr) +{ + struct glue_ctx *next; + struct ether_hdr *eth; + struct tle_dest *dst; + uint64_t idx; + uint16_t i; + int rc; + + for (i = 0; i < nb_ctx; i++) { + next = &ctx_array[i++]; + if (next == NULL || next == ctx) + continue; + + rc = rte_hash_lookup_data(next->arp6_hash, addr, (void **)&idx); + if (rc < 0) + continue; + + dst = &next->arp6[idx].dst; + eth = (struct ether_hdr *)dst->hdr; + ipv6_dst_add(ctx, addr, ð->d_addr); + return 0; + } + + return -1; +} + +#define len_dest(dst) \ + (offsetof(struct tle_dest, hdr) + dst->l2_len + dst->l3_len) + +int +arp_ipv6_dst_lookup(void *data, const struct in6_addr *addr, + struct tle_dest *res, int proto) +{ + int32_t rc; + uint64_t idx; + struct tle_dest *dst; + struct ipv6_hdr *ip6h; + struct glue_ctx *ctx = data; + + if (is_ipv6_loopback_addr(addr, ctx)) { + dst = &ctx->lb_dst_v6; + rte_memcpy(res, dst, len_dest(dst)); + if (proto == IPPROTO_TCP) + res->dev = ctx->lb_tcp_dev; + else + res->dev = ctx->lb_udp_dev; + rc = 0; + goto set_proto; + } + + rc = rte_hash_lookup_data(ctx->arp6_hash, addr, (void **)&idx); + if (rc >= 0) { + if (!ctx->arp6[idx].inuse) + ctx->arp6[idx].inuse = 1; + dst = &ctx->arp6[idx].dst; + rte_memcpy(res, dst, len_dest(dst)); + } else { + memset(res, 0, sizeof(*res)); + ipv6_dst_set(ctx, res, addr, NULL); + rc = 0; + } + + if (proto == IPPROTO_TCP) + res->dev = ctx->tcp_dev; + else + res->dev = ctx->udp_dev; + +set_proto: + ip6h = (struct ipv6_hdr *)&res->hdr[res->l2_len]; + ip6h->proto = proto; + return rc; +} + +int +arp_ipv4_dst_lookup(void *data, const struct in_addr *addr, + struct tle_dest *res, int proto) +{ + int32_t rc; + uint64_t idx; + struct tle_dest *dst; + struct ipv4_hdr *ip4h; + struct glue_ctx *ctx = data; + + if (is_ipv4_loopback_addr(addr->s_addr, ctx)) { + dst = &ctx->lb_dst; + rte_memcpy(res, dst, len_dest(dst)); + if (proto == IPPROTO_TCP) + res->dev = ctx->lb_tcp_dev; + else + res->dev = ctx->lb_udp_dev; + rc = 0; + goto set_proto; + } + + rc = rte_hash_lookup_data(ctx->arp_hash, addr, (void **)&idx); + if (rc >= 0) { + if (!ctx->arp4[idx].inuse) + ctx->arp4[idx].inuse = 1; + dst = &ctx->arp4[idx].dst; + rte_memcpy(res, dst, len_dest(dst)); + } else { + memset(res, 0, sizeof(*res)); + ipv4_dst_set(ctx, res, addr, NULL); + rc = 0; + } + + if (proto == IPPROTO_TCP) + res->dev = ctx->tcp_dev; + else + res->dev = ctx->udp_dev; + +set_proto: + ip4h = (struct ipv4_hdr *)&res->hdr[res->l2_len]; + ip4h->next_proto_id = proto; + return rc; +} + +int +mac_fill(struct glue_ctx *ctx, struct rte_mbuf *m) +{ + int32_t rc; + uint64_t idx; + uint8_t ipver; + struct arp_entry* entry; + struct ether_addr *dst, *dst1; + struct ipv4_hdr *ipv4_hdr; + struct ipv6_hdr *ipv6_hdr; + const struct in_addr *addr4 = NULL; + const struct in6_addr *addr6 = NULL; + + dst = rte_pktmbuf_mtod(m, struct ether_addr *); + if (!is_broadcast_ether_addr(dst)) + return 0; + + ipv4_hdr = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len); + ipv6_hdr = (struct ipv6_hdr*)ipv4_hdr; + ipver = ipv4_hdr->version_ihl >> 4; + +retry: + if (ipver == 4) { + addr4 = (struct in_addr *)&ipv4_hdr->dst_addr; + addr4 = ipv4_gateway_lookup(ctx, addr4); + rc = rte_hash_lookup_data(ctx->arp_hash, addr4, (void **)&idx); + if (rc >= 0) + entry = &ctx->arp4[idx]; + } else { + addr6 = (struct in6_addr *)ipv6_hdr->dst_addr; + addr6 = ipv6_gateway_lookup(ctx, addr6); + rc = rte_hash_lookup_data(ctx->arp6_hash, addr6, (void **)&idx); + if (rc >= 0) + entry = &ctx->arp6[idx]; + } + + if (rc >= 0) { + dst1 = (struct ether_addr *)entry->dst.hdr; + if (!is_broadcast_ether_addr(dst1)) { + ether_addr_copy(dst1 , dst); + return 0; + } + + if (ipver == 4) + arp_send_request(ctx, addr4); + else + arp6_send_request(ctx, addr6); + entry->req_time++; + if (entry->timer != NULL) + tle_timer_stop(ctx->arp_tmw, entry->timer); + entry->timer = arp_timer(ctx, entry, ARP_REQUEST_EXPIRE); + } else { + if (ipver == 4) { + if (arp_inherit(ctx, addr4) == 0) + goto retry; + ipv4_dst_add(ctx, addr4, NULL); + arp_send_request(ctx, addr4); + } else { + if (arp6_inherit(ctx, addr6) == 0) + goto retry; + ipv6_dst_add(ctx, addr6, NULL); + arp6_send_request(ctx, addr6); + } + } + + return -1; +} + +static inline const struct in_addr * +get_addr_from_entry(struct arp_entry *e) +{ + const struct ipv4_hdr *ipv4; + const struct in_addr *addr; + + ipv4 = (struct ipv4_hdr *)(e->dst.hdr + e->dst.l2_len); + addr = (const struct in_addr *)&ipv4->dst_addr; + return addr; +} + +static inline const struct in6_addr * +get_addr6_from_entry(struct arp_entry *e) +{ + const struct ipv6_hdr *ipv6; + const struct in6_addr *addr; + + ipv6 = (struct ipv6_hdr *)(e->dst.hdr + e->dst.l2_len); + addr = (const struct in6_addr *)ipv6->dst_addr; + return addr; +} + +static void +drop_arp_wait(int af, struct glue_ctx *ctx, const void *addr) +{ + struct rte_mbuf *pkt, *pre; + + for (pre = NULL, pkt = ctx->arp_wait; pkt; pkt = pkt->next_pkt) { + if ((af == AF_INET && + !match_addr(ctx, pkt, (const struct in_addr *)addr)) || + (af == AF_INET6 && + !match_addr6(ctx, pkt, (const struct in6_addr *)addr))) { + pre = pkt; + continue; + } + + if (pre == NULL) + ctx->arp_wait = pkt->next_pkt; + else + pre->next_pkt = pkt->next_pkt; + + rte_pktmbuf_free(pkt); + } +} + +static void +arp_entry_del(struct glue_ctx *ctx, int af, struct arp_entry *e) +{ + const void *addr; + struct arp_entry *t; + uint32_t idx, last_idx; + const struct rte_hash *h; + + if (af == AF_INET) { + addr = get_addr_from_entry(e); + t = ctx->arp4; + h = ctx->arp_hash; + last_idx = ctx->arp4_num - 1; + } else { + addr = get_addr6_from_entry(e); + t = ctx->arp6; + h = ctx->arp6_hash; + last_idx = ctx->arp6_num - 1; + } + + idx = e - t; + if (idx > last_idx) /* entry has been moved */ + return; + + print_arp(af, addr, (struct ether_addr *)e->dst.hdr, "DELETE"); + + if (e->req_time > ARP_MAX_REQ_TIMES) + drop_arp_wait(af, ctx, addr); + + rte_hash_del_key(h, addr); + + if (idx < last_idx) { + /* replace current entry with last entry */ + rte_memcpy(e, t + last_idx, sizeof(*e)); + rte_hash_add_key_data(h, addr, (void *)(uintptr_t)idx); + tle_timer_stop(ctx->arp_tmw, t[last_idx].timer); + if (e->req_time > 0) + e->timer = arp_timer(ctx, e, ARP_REQUEST_EXPIRE); + else { + e->timer = arp_timer(ctx, e, ARP_ENTRY_EXPIRE); + e->inuse = 0; + } + } + + /* we always delete the last entry to keep it contiguous */ + t[last_idx].timer = NULL; + t[last_idx].inuse = 0; + t[last_idx].req_time = 0; + if (af == AF_INET) + ctx->arp4_num--; + else + ctx->arp6_num--; +} + +void +mac_timeout(struct glue_ctx *ctx) +{ +#define ARP_PROCESS_MAX 32 + struct arp_entry *entry[ARP_PROCESS_MAX], *e; + struct tle_timer_wheel *tw; + const struct in_addr *addr4; + const struct in6_addr *addr6; + uint32_t i, cnt; + uint8_t *l3h; + + tw = ctx->arp_tmw; + tle_timer_expire(tw, rte_get_tsc_cycles() >> ctx->cycles_ms_shift); + cnt = tle_timer_get_expired_bulk(tw, (void**)entry, ARP_PROCESS_MAX); + if (cnt == 0) + return; + + for(i = 0; i < cnt; i++) { + e = entry[i]; + e->timer = NULL; + l3h = e->dst.hdr + e->dst.l2_len; + if (e->inuse || + (e->req_time > 0 && e->req_time <= ARP_MAX_REQ_TIMES)) { + if (((struct ipv4_hdr *)l3h)->version_ihl >> 4 == 4) { + addr4 = get_addr_from_entry(e); + arp_send_request(ctx, addr4); + } else { + addr6 = get_addr6_from_entry(e); + arp6_send_request(ctx, addr6); + } + + e->timer = arp_timer(ctx, e, ARP_REQUEST_EXPIRE); + e->inuse = 0; + e->req_time++; + } else { + if (((struct ipv4_hdr *)l3h)->version_ihl >> 4 == 4) + arp_entry_del(ctx, AF_INET, e); + else + arp_entry_del(ctx, AF_INET6, e); + } + } +} diff --git a/lib/libtle_glue/be.c b/lib/libtle_glue/be.c new file mode 100644 index 0000000..7e2227e --- /dev/null +++ b/lib/libtle_glue/be.c @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <rte_ethdev.h> +#include <rte_ip.h> + +#include <tle_tcp.h> +#include <tle_udp.h> + +#include "config.h" +#include "log.h" +#include "util.h" +#include "internal.h" + +static inline void +rte_pktmbuf_copy_seg(struct rte_mbuf *dst, struct rte_mbuf* src) +{ + size_t offset = offsetof(struct rte_mbuf, data_off); + rte_memcpy((char*)dst + offset, (char*)src + offset, + sizeof(struct rte_mbuf) - offset); + rte_mbuf_refcnt_set(dst, 1); + dst->ol_flags &= ~IND_ATTACHED_MBUF; + rte_memcpy(rte_pktmbuf_mtod(dst, void*), rte_pktmbuf_mtod(src, void*), + src->data_len); +} + +static inline struct rte_mbuf* +rte_pktmbuf_copy(struct rte_mbuf *md, struct rte_mempool* mp) +{ + struct rte_mbuf *mc, *mi, **prev; + uint32_t pktlen; + uint16_t nseg; + + if (unlikely ((mc = rte_pktmbuf_alloc(mp)) == NULL)) + return NULL; + + mi = mc; + prev = &mi->next; + pktlen = md->pkt_len; + nseg = 0; + + do { + nseg++; + rte_pktmbuf_copy_seg(mi, md); + *prev = mi; + prev = &mi->next; + } while ((md = md->next) != NULL && + (mi = rte_pktmbuf_alloc(mp)) != NULL); + + *prev = NULL; + mc->nb_segs = nseg; + mc->pkt_len = pktlen; + + /* Allocation of new indirect segment failed */ + if (unlikely(mi == NULL)) { + rte_pktmbuf_free(mc); + return NULL; + } + + __rte_mbuf_sanity_check(mc, 1); + return mc; +} + +static inline int +process_rx_pkts(struct glue_ctx *ctx, struct rte_mbuf *pkts[], + uint32_t n, uint8_t from_loopback) +{ + uint32_t i, j, k, jt, ju, jd; + struct rte_mbuf *tcp[MAX_PKTS_BURST]; + struct rte_mbuf *udp[MAX_PKTS_BURST]; + struct rte_mbuf *drop[MAX_PKTS_BURST]; + int32_t rc[MAX_PKTS_BURST]; + struct tle_dev *tcp_dev, *udp_dev; + struct rte_mempool *mp; + struct rte_mbuf *tmp; + uint64_t ts; + + if (n == 0) + return 0; + + if (unlikely(from_loopback)) { + tcp_dev = ctx->lb_tcp_dev; + udp_dev = ctx->lb_udp_dev; + mp = pkts[0]->pool; + for (i = 0; i < n; i++) { + tmp = rte_pktmbuf_copy(pkts[i], mp); + if (tmp != NULL) { + rte_pktmbuf_free(pkts[i]); + pkts[i] = tmp; + pkts[i]->ol_flags |= PKT_RX_IP_CKSUM_GOOD; + pkts[i]->ol_flags |= PKT_RX_L4_CKSUM_GOOD; + } else { + k = i; + for (; i < n; i++) { + rte_pktmbuf_free(pkts[i]); + } + n = k; + } + } + } else { + tcp_dev = ctx->tcp_dev; + udp_dev = ctx->udp_dev; + } + + ts = rte_get_tsc_cycles() >> (ctx->cycles_ms_shift - 10); + + for (j = 0, jt = 0, ju = 0, jd = 0; j < n; j++) { + pkts[j]->timestamp = ts; + switch (pkts[j]->packet_type & RTE_PTYPE_L4_MASK) { + case RTE_PTYPE_L4_TCP: + tcp[jt++] = pkts[j]; + break; + case RTE_PTYPE_L4_UDP: + udp[ju++] = pkts[j]; + break; + case RTE_PTYPE_L4_ICMP: + /* TODO */ + case RTE_PTYPE_L4_FRAG: + /* TODO */ + default: + drop[jd++] = pkts[j]; + } + } + + if (jt > 0) { + k = tle_tcp_rx_bulk(tcp_dev, tcp, drop + jd, rc, jt); + jd += jt - k; + + TRACE("(port=%u, queue=%u), %u/%u (TCP) pkts are received", + port_id, queue_id, k, n); + } + + if (ju > 0) { + k = tle_udp_rx_bulk(udp_dev, udp, drop + jd, rc, ju); + jd += ju - k; + + TRACE("(port=%u, queue=%u), %u/%u (UDP) pkts are received", + port_id, queue_id, k, n); + } + + for (j = 0; j < jd; j++) + rte_pktmbuf_free(drop[j]); + + return jt + ju - jd; +} + +static inline int +be_rx(struct glue_ctx *ctx) +{ + int ret; + uint32_t n; + struct rte_mbuf *pkts[MAX_PKTS_BURST]; + uint16_t port_id = ctx->port_id; + uint16_t queue_id = ctx->queue_id; + + n = rte_eth_rx_burst(port_id, queue_id, pkts, RTE_DIM(pkts)); + ret = process_rx_pkts(ctx, pkts, n, 0); + + return ret; +} + +int +be_tx(struct glue_ctx *ctx) +{ + uint32_t n, j, k, s, ret; + const uint16_t max_pkts = MAX_PKTS_BURST; + struct rte_mbuf *pkts[max_pkts]; + struct rte_mbuf *_pkts[max_pkts]; + uint16_t port_id = ctx->port_id; + uint16_t queue_id = ctx->queue_id; + + ret = 0; + tle_tcp_process(ctx->tcp_ctx, TCP_MAX_PROCESS); + + n = tle_tcp_tx_bulk(ctx->lb_tcp_dev, pkts, max_pkts); + n += tle_udp_tx_bulk(ctx->lb_udp_dev, pkts + n, max_pkts - n); + if (n > 0) { + ret += n; + rte_eth_tx_burst(ctx->lb_port_id, 0, pkts, n); + /* loopback device could receive after transmit immediately */ + n = rte_eth_rx_burst(ctx->lb_port_id, 0, pkts, RTE_DIM(pkts)); + process_rx_pkts(ctx, pkts, n, 1); + + /* wake up look-aside backend */ + wake_lookaside_backend(ctx); + } + + n = tle_tcp_tx_bulk(ctx->tcp_dev, pkts, max_pkts); + n += tle_udp_tx_bulk(ctx->udp_dev, pkts + n, max_pkts - n); + if (n == 0) + return 0; + + ret += n; + s = 0; + for (j = 0; j != n; j++) { + if (mac_fill(ctx, pkts[j]) == 0) { + PKT_DUMP(pkts[j]); + _pkts[s++] = pkts[j]; + continue; + } + + pkts[j]->next_pkt = ctx->arp_wait; + ctx->arp_wait = pkts[j]; + } + + /* For virtio-user/vhost-kernel test case, it's normal that vhost + * kthread cannot catch up with packets generation speed in stack. + * Shall we drop those packets immdiately or retry some times to + * keep those packets? We find dropping packets here is not a good + * idea, which leads to lots of retrans and inefficiency of vhost + * kthread. Even below code does not work well: + * + * for (k = 0, retry = 0; k < s && retry < 10000; retry++) + * k += rte_eth_tx_burst(port_id, queue_id, _pkts + k, s - k); + * + * So we choose to blockingly send out packes. + */ + k = 0; + while (k < s) + k += rte_eth_tx_burst(port_id, queue_id, _pkts + k, s - k); + + for (j = k; j != s; j++) + rte_pktmbuf_free(_pkts[j]); + + TRACE("(port=%u, queue=%u), %u/%u pkts are sent", + port_id, queue_id, k, s); + + return ret; +} + +int +be_process(struct glue_ctx *ctx) +{ + int ret; + + if (unlikely(stopped)) + return 0; + + ret = be_rx(ctx); + mac_timeout(ctx); + ret += be_tx(ctx); + + return ret; +} diff --git a/lib/libtle_glue/config.h b/lib/libtle_glue/config.h new file mode 100644 index 0000000..976495e --- /dev/null +++ b/lib/libtle_glue/config.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_CONFIG_H_ +#define _TLE_GLUE_CONFIG_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_STREAMS_PER_CORE 64 * 1024 +#define MIN_STREAMS_PER_CORE 16 +#define DELTA_STREAMS 64 +#define FRAG_BUCKET 8 +#define FRAG_ENTRIES_PER_BUCKET 8 +#define MAX_ARP_ENTRY (1 << 10) + +/* RCV buffer & SND buffer + * This is not a reall rcv/snd buffer implementation. Below number means + * the slots to store mbufs of sent or received data. Each slot could + * contains a single mbuf with size of (1500B or 2048B) or a chained + * mbuf with size <= 64KB. + * + * TODO: add real snd/rcv buffer + */ +#define MAX_RECV_BUFS_PER_STREAM 256 +#define MAX_SEND_BUFS_PER_STREAM 256 + +#ifdef LOOK_ASIDE_BACKEND +#define MAX_NB_CTX 1 +#else +#define MAX_NB_CTX 16 +#endif + +#define MAX_MBUFS 0x80000 +/* should calculated by: + * MAX_NB_CTX * MAX_STREAMS_PER_CORE * (MAX_RECV_BUFS_PER_STREAM + MAX_SEND_BUFS_PER_STREAM)) + */ + +#define MBUF_DYNAMIC_SIZE 0x800 + +#define MBUF_PERCORE_CACHE 32 + +#define MAX_PKTS_BURST 0x20 + +#define TCP_MAX_PROCESS 32 + +#define ARP_ENTRY_EXPIRE 60000U +#define ARP_REQUEST_EXPIRE 1000U /* ms */ +#define ARP_MAX_REQ_TIMES 5 + +#define MTU_NORMAL 1500 +#define MTU_LOOPBACK 65535 + +#ifdef __cplusplus +} +#endif + +#endif /*_TLE_GLUE_CONFIG_H_ */ diff --git a/lib/libtle_glue/ctx.c b/lib/libtle_glue/ctx.c new file mode 100644 index 0000000..dc78f39 --- /dev/null +++ b/lib/libtle_glue/ctx.c @@ -0,0 +1,535 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdlib.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <rte_malloc.h> +#include <rte_random.h> +#include <rte_cycles.h> +#include <rte_ethdev.h> +#include <rte_hash.h> +#include <rte_spinlock.h> + +#include "config.h" +#include "ctx.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "gateway.h" +#include "tle_timer.h" + +RTE_DEFINE_PER_LCORE(struct glue_ctx *, glue_ctx); + +int nb_ctx; +struct glue_ctx ctx_array[MAX_NB_CTX]; +struct glue_ctx *default_ctx = &ctx_array[0]; + +static int +ipv4_dst_lookup_tcp(void *data, const struct in_addr *addr, + struct tle_dest *res) +{ + addr = ipv4_gateway_lookup(data, addr); + return arp_ipv4_dst_lookup(data, addr, res, IPPROTO_TCP); +} + +static int +ipv4_dst_lookup_udp(void *data, const struct in_addr *addr, + struct tle_dest *res) +{ + addr = ipv4_gateway_lookup(data, addr); + return arp_ipv4_dst_lookup(data, addr, res, IPPROTO_UDP); +} + +static int +ipv6_dst_lookup_tcp(void *data, const struct in6_addr *addr, + struct tle_dest *res) +{ + addr = ipv6_gateway_lookup(data, addr); + return arp_ipv6_dst_lookup(data, addr, res, IPPROTO_TCP); +} + +static int +ipv6_dst_lookup_udp(void *data, const struct in6_addr *addr, + struct tle_dest *res) +{ + addr = ipv6_gateway_lookup(data, addr); + return arp_ipv6_dst_lookup(data, addr, res, IPPROTO_UDP); +} + +static struct tle_ctx * +proto_ctx_create(uint32_t socket_id, uint32_t proto, void *data) +{ + struct tle_ctx_param cprm; + + if (proto != TLE_PROTO_TCP && proto != TLE_PROTO_UDP) + rte_panic("Invalid proto [%u]\n", proto); + + cprm.socket_id = socket_id; + cprm.proto = proto; + cprm.max_streams = MAX_STREAMS_PER_CORE; + cprm.min_streams = MIN_STREAMS_PER_CORE; + cprm.delta_streams = DELTA_STREAMS; + cprm.max_stream_rbufs = MAX_RECV_BUFS_PER_STREAM; + cprm.max_stream_sbufs = MAX_SEND_BUFS_PER_STREAM; + if (proto == TLE_PROTO_TCP) { + cprm.lookup4 = ipv4_dst_lookup_tcp; + cprm.lookup6 = ipv6_dst_lookup_tcp; + } else { + cprm.lookup4 = ipv4_dst_lookup_udp; + cprm.lookup6 = ipv6_dst_lookup_udp; + } + cprm.lookup4_data = data; + cprm.lookup6_data = data; +#ifdef LOOK_ASIDE_BACKEND + cprm.flags = 0; +#else + cprm.flags = TLE_CTX_FLAG_ST; /* ctx will be used by single thread*/ +#endif + cprm.send_bulk_size = 0; /* 32 if 0 */ + cprm.hash_alg = TLE_SIPHASH; + cprm.secret_key.u64[0] = rte_rand(); + cprm.secret_key.u64[1] = rte_rand(); + cprm.icw = 0; /**< congestion window, default is 2*MSS if 0. */ + cprm.timewait = 1; /* TLE_TCP_TIMEWAIT_DEFAULT */ + + return tle_ctx_create(&cprm); +} + +static int +evq_init(struct glue_ctx *ctx, uint32_t socket_id) +{ + struct tle_evq_param eprm = { + .socket_id = socket_id, + .max_events = 0, /* We don't pre-allocate any event */ + }; + + ctx->ereq = tle_evq_create(&eprm); + if (ctx->ereq == NULL) + rte_panic("Cannot create ereq"); + + ctx->rxeq = tle_evq_create(&eprm); + if (ctx->rxeq == NULL) + rte_panic("Cannot create rxeq"); + + ctx->txeq = tle_evq_create(&eprm); + if (ctx->txeq == NULL) + rte_panic("Cannot create txeq"); + + return 0; +} + +static void +tle_ctx_init(struct glue_ctx *ctx, uint32_t socket_id) +{ + struct tle_dev_param dprm; + struct rte_eth_dev_info dev_info; + uint16_t port_id = 0; /* currently only use one port */ + + ctx->tcp_ctx = proto_ctx_create(socket_id, TLE_PROTO_TCP, ctx); + if (ctx->tcp_ctx == NULL) + rte_panic("Cannot create tle_ctx for tcp"); + + ctx->udp_ctx = proto_ctx_create(socket_id, TLE_PROTO_UDP, ctx); + if (ctx->udp_ctx == NULL) + rte_panic("Cannot create tle_ctx for udp"); + + memset(&dprm, 0, sizeof(dprm)); + + /* offloading check and set */ + rte_eth_dev_info_get(port_id, &dev_info); + dprm.rx_offload = dev_info.rx_offload_capa & rx_offload; + dprm.tx_offload = dev_info.tx_offload_capa & tx_offload; + + dprm.local_addr4.s_addr = ctx->ipv4; + rte_memcpy(&dprm.local_addr6, &ctx->ipv6, sizeof(struct in6_addr)); + dprm.bl4.nb_port = 0; + dprm.bl4.port = NULL; + dprm.bl6.nb_port = 0; + dprm.bl6.port = NULL; + + ctx->tcp_dev = tle_add_dev(ctx->tcp_ctx, &dprm); + if (ctx->tcp_dev == NULL) + rte_panic("add tle_dev for tcp failed: %u", rte_errno); + + ctx->udp_dev = tle_add_dev(ctx->udp_ctx, &dprm); + if (ctx->udp_dev == NULL) + rte_panic("add tle_dev for udp failed: %u", rte_errno); + + if (ctx == default_ctx) { + dprm.rx_offload = rx_offload; + dprm.tx_offload = tx_offload; + dprm.local_addr4.s_addr = htonl(INADDR_LOOPBACK); + rte_memcpy(&dprm.local_addr6, &in6addr_loopback, + sizeof(struct in6_addr)); + + ctx->lb_tcp_dev = tle_add_dev(ctx->tcp_ctx, &dprm); + if (ctx->lb_tcp_dev == NULL) + rte_panic("failed to add loopback tcp dev: %u\n", + rte_errno); + + ctx->lb_udp_dev = tle_add_dev(ctx->udp_ctx, &dprm); + if (ctx->lb_udp_dev == NULL) + rte_panic("failed to add loopback udp dev: %u\n", + rte_errno); + } + + evq_init(ctx, socket_id); +} + +static uint32_t +get_ip(void) +{ + struct in_addr addr; + const char *ip_str = getenv(DPDK_IP); + + if (ip_str == NULL) { + ip_str = DPDK_IP_DEF; + GLUE_LOG(INFO, "will use the default IP %s", DPDK_IP_DEF); + } else + GLUE_LOG(INFO, "will use the IP %s", ip_str); + + if (inet_aton(ip_str, &addr) == 0) + rte_panic("Invalid addr from env DPDK_IP: %s", ip_str); + + return addr.s_addr; +} + +static uint8_t +get_ip_mask(void) +{ + const char *mask_str = getenv(DPDK_IP_MASK); + + if (mask_str == NULL) { + mask_str = DPDK_IP_MASK_DEF; + GLUE_LOG(INFO, "will use the default IP Mask %s", DPDK_IP_MASK_DEF); + } else + GLUE_LOG(INFO, "will use the IP Mask %s", mask_str); + + return (uint8_t)atoi(mask_str); +} + +static uint32_t +get_ip_gate(void) +{ + struct in_addr addr; + const char *ip_str = getenv(DPDK_IP_GATEWAY); + + if (ip_str == NULL) { + ip_str = DPDK_IP_GATEWAY_DEF; + GLUE_LOG(INFO, "will use the default IP gateway %s", + DPDK_IP_GATEWAY_DEF); + } else + GLUE_LOG(INFO, "will use the IP gateway %s", ip_str); + + if (inet_aton(ip_str, &addr) == 0) + rte_panic("Invalid addr from env DPDK_IP_GATEWAY: %s", ip_str); + + return addr.s_addr; +} + +static struct in6_addr* +get_ipv6(void) +{ + static struct in6_addr addr; + const char *ip_str = getenv(DPDK_IPV6); + + if (ip_str == NULL) { + ip_str = DPDK_IPV6_DEF; + GLUE_LOG(INFO, "will use the default IP(V6) %s", DPDK_IPV6_DEF); + } else + GLUE_LOG(INFO, "will use the IP(V6) %s", ip_str); + + if (inet_pton(AF_INET6, ip_str, &addr) == 0) + rte_panic("Invalid addr from env DPDK_IPV6: %s", ip_str); + + return &addr; +} + +static uint8_t +get_ipv6_mask(void) +{ + const char *mask_str = getenv(DPDK_IPV6_MASK); + + if (mask_str == NULL) { + mask_str = DPDK_IPV6_MASK_DEF; + GLUE_LOG(INFO, "will use the default IPV6 Mask %s", + DPDK_IPV6_MASK_DEF); + } else + GLUE_LOG(INFO, "will use the IPV6 Mask %s", mask_str); + + return (uint8_t)atoi(mask_str); +} + +static struct in6_addr* +get_ipv6_gate(void) +{ + static struct in6_addr addr; + const char *ip_str = getenv(DPDK_IPV6_GATEWAY); + + if (ip_str == NULL) { + ip_str = DPDK_IPV6_GATEWAY_DEF; + GLUE_LOG(INFO, "will use the default IP(V6) gateway %s", + DPDK_IPV6_GATEWAY_DEF); + } else + GLUE_LOG(INFO, "will use the IP(V6) gateway %s", ip_str); + + if (inet_pton(AF_INET6, ip_str, &addr) == 0) + rte_panic("Invalid addr from env DPDK_IPV6_GATEWAY: %s", ip_str); + + return &addr; +} + +static bool +lo4_enabled(void) +{ + const char *str = getenv("DPDK_LO4_ENABLED"); + if (str != NULL && strcmp(str, "0") == 0) + return false; + return true; +} + +static bool +lo6_enabled(void) +{ + const char *str = getenv("DPDK_LO6_ENABLED"); + if (str == NULL || strcmp(str, "1") != 0) + return false; + return true; +} + +static void +loopback_dst_init(struct glue_ctx *ctx) +{ + struct tle_dest *dst; + struct ether_hdr *eth; + struct ipv4_hdr *ip4h; + struct ipv6_hdr *ip6h; + + /* init ipv4 dst */ + dst = &ctx->lb_dst; + dst->mtu = 65535; + + dst->l2_len = sizeof(*eth); + dst->head_mp = get_mempool_by_socket(0); /* fix me */ + eth = (struct ether_hdr *)dst->hdr; + memset(eth, 0, 2 * sizeof(eth->d_addr)); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + + dst->l3_len = sizeof(*ip4h); + ip4h = (struct ipv4_hdr *)(eth + 1); + ip4h->dst_addr = htonl(INADDR_LOOPBACK); + ip4h->version_ihl = 4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER; + ip4h->time_to_live = 64; + ip4h->next_proto_id = IPPROTO_TCP; + + /* init ipv6 dst */ + dst = &ctx->lb_dst_v6; + dst->mtu = 65535; + + dst->l2_len = sizeof(*eth); + dst->head_mp = get_mempool_by_socket(0); /* fix me */ + eth = (struct ether_hdr *)dst->hdr; + memset(eth, 0, 2 * sizeof(eth->d_addr)); + eth->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); + + dst->l3_len = sizeof(*ip6h); + ip6h = (struct ipv6_hdr *)(eth + 1); + rte_memcpy(ip6h->dst_addr, &in6addr_loopback, sizeof(struct in6_addr)); + ip6h->vtc_flow = 6 << 4; + ip6h->hop_limits = 255; + ip6h->proto = IPPROTO_TCP; +} + +static void +arp_hash_init(struct glue_ctx *ctx, unsigned socket_id) +{ + char str[RTE_HASH_NAMESIZE]; + struct rte_hash_parameters hprm; + + /* init ipv4 arp hash */ + snprintf(str, sizeof(str), "arp_hash_4@ctx%u", ctx->queue_id); + memset(&hprm, 0, sizeof(hprm)); + hprm.name = str; + hprm.entries = MAX_ARP_ENTRY * 2; + hprm.socket_id = socket_id; + hprm.key_len = sizeof(struct in_addr); + ctx->arp_hash = rte_hash_create(&hprm); + if (ctx->arp_hash == NULL) { + rte_panic("Failed to init hashtable for ARP"); + } + + /* init ipv6 arp hash */ + snprintf(str, sizeof(str), "arp_hash_6@ctx%u", ctx->queue_id); + memset(&hprm, 0, sizeof(hprm)); + hprm.name = str; + hprm.entries = MAX_ARP_ENTRY * 2; + hprm.socket_id = socket_id; + hprm.key_len = sizeof(struct in6_addr); + ctx->arp6_hash = rte_hash_create(&hprm); + if (ctx->arp6_hash == NULL) { + rte_panic("Failed to init hashtable for ARP6"); + } +} + +/* get current timestamp in ms, see tcp_get_tms() */ +static inline uint64_t +arp_get_tms(uint32_t mshift) +{ + uint64_t ts; + + ts = rte_get_tsc_cycles() >> mshift; + return ts; +} + +static void +arp_timer_init(struct glue_ctx *ctx, unsigned socket_id) +{ + struct tle_timer_wheel_args twprm; + + twprm.tick_size = 1000U; + twprm.max_timer = MAX_ARP_ENTRY + 8; + twprm.socket_id = socket_id; + ctx->arp_tmw = tle_timer_create(&twprm, + arp_get_tms(ctx->cycles_ms_shift)); + if (ctx->arp_tmw == NULL) + rte_panic("Failed to init timer wheel for ARP"); +} + +static void +glue_ctx_init(struct glue_ctx *ctx, uint32_t socket_id) +{ + uint64_t ms; + + ctx->arp4 = rte_zmalloc_socket(NULL, + sizeof(struct arp_entry) * MAX_ARP_ENTRY, + RTE_CACHE_LINE_SIZE, socket_id); + ctx->arp6 = rte_zmalloc_socket(NULL, + sizeof(struct arp_entry) * MAX_ARP_ENTRY, + RTE_CACHE_LINE_SIZE, socket_id); + if (!ctx->arp4 || !ctx->arp6) + rte_panic("Failed to allocate arp table"); + + ctx->port_id = 0; + ctx->queue_id = nb_ctx - 1; + ctx->ipv4 = get_ip(); + ctx->ipv4_ml = get_ip_mask(); + ctx->ipv4_gw.s_addr = get_ip_gate(); + ctx->lo4_enabled = lo4_enabled(); + rte_memcpy(&ctx->ipv6, get_ipv6(), sizeof(struct in6_addr)); + ctx->ipv6_ml = get_ipv6_mask(); + rte_memcpy(&ctx->ipv6_gw, get_ipv6_gate(), sizeof(struct in6_addr)); + ctx->lo6_enabled = lo6_enabled(); + + /* caclulate closest shift to convert from cycles to ms (approximate) */ + ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S; + ctx->cycles_ms_shift = sizeof(ms) * CHAR_BIT - __builtin_clzll(ms) - 1; + + arp_hash_init(ctx, socket_id); + arp_timer_init(ctx, socket_id); + ctx->arp_wait = NULL; + + ctx->frag_tbl = rte_ip_frag_table_create(FRAG_BUCKET, + FRAG_ENTRIES_PER_BUCKET, + FRAG_BUCKET * FRAG_ENTRIES_PER_BUCKET, + rte_get_tsc_hz(), + socket_id); + if (ctx->frag_tbl == NULL) + rte_panic("Failed to create ip defrag table"); + + PERCPU_MIB = &ctx->mib; +} + +static int ctx_seq; +static rte_spinlock_t ctx_lock = RTE_SPINLOCK_INITIALIZER; + +uint8_t +glue_ctx_alloc(void) +{ + uint32_t socket_id; + struct glue_ctx *ctx; + + /* fix me: we need a fine grainer lock */ + rte_spinlock_lock(&ctx_lock); + + GLUE_LOG(INFO, "allocate ctx: %d", ctx_seq); + if (ctx_seq == 0) + /* Called from constructor init() */ + ctx_seq = 1; + else if (ctx_seq == 1) { + /* Called from first epoll_create() or poll() */ + ctx_seq = 2; + ctx = default_ctx; + goto unlock; + } + + if (nb_ctx >= MAX_NB_CTX) + rte_panic("Exceed the max number of ctx"); + + ctx = &ctx_array[nb_ctx++]; + GLUE_LOG(INFO, "%u ctx allocated, and will init", nb_ctx); + + socket_id = get_socket_id(); + + glue_ctx_init(ctx, socket_id); + + /* reconfigure the "physical" port whenever # of ctx changes */ + port_reconfig(); + + if (ctx == default_ctx) { + loopback_dst_init(ctx); + + ctx->lb_port_id = create_loopback(socket_id); + GLUE_LOG(INFO, "loopback port_id: %u", ctx->lb_port_id); + } + + rte_eth_macaddr_get(ctx->port_id, &ctx->mac); + + tle_ctx_init(ctx, socket_id); + +unlock: + rte_spinlock_unlock(&ctx_lock); + return ctx - ctx_array; +} + +void +glue_ctx_free(struct glue_ctx *ctx __rte_unused) +{ + if (nb_ctx == 1 && ctx_seq == 2) { + GLUE_LOG(INFO, "free ctx"); + ctx_seq = 1; + return; + } + + rte_panic("close epoll fd on running is not supported\n"); +} + +struct glue_ctx * +glue_ctx_lookup(uint16_t port_id, uint16_t queue_id) +{ + int i; + + if (port_id == 1) /* loopback */ + return default_ctx; + + for (i = 0; i < nb_ctx; i++) { + if (ctx_array[i].port_id == port_id && + ctx_array[i].queue_id == queue_id) + return &ctx_array[i]; + } + + return NULL; +} diff --git a/lib/libtle_glue/ctx.h b/lib/libtle_glue/ctx.h new file mode 100644 index 0000000..e78b68f --- /dev/null +++ b/lib/libtle_glue/ctx.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_SOCK_H_ +#define _TLE_GLUE_SOCK_H_ + +#include <stdbool.h> +#include <pthread.h> + +#include <rte_memzone.h> +#include <rte_mempool.h> +#include <rte_ether.h> +#include <rte_ip_frag.h> + +#include <tle_ctx.h> +#include <tle_event.h> +#include <tle_stats.h> + +#include <sys/queue.h> + +#include "config.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define DPDK_IP "DPDK_IP" +#define DPDK_IP_DEF "0.0.0.0" +#define DPDK_IP_MASK "DPDK_IP_MASK" +#define DPDK_IP_MASK_DEF "16" +#define DPDK_IP_GATEWAY "DPDK_IP_GATEWAY" +#define DPDK_IP_GATEWAY_DEF "0.0.0.0" +#define DPDK_IPV6 "DPDK_IPV6" +#define DPDK_IPV6_DEF "::" +#define DPDK_IPV6_MASK "DPDK_IPV6_MASK" +#define DPDK_IPV6_MASK_DEF "64" +#define DPDK_IPV6_GATEWAY "DPDK_IPV6_GATEWAY" +#define DPDK_IPV6_GATEWAY_DEF "::" + +struct arp_entry { + struct tle_dest dst; + uint8_t inuse; + uint8_t req_time; + void* timer; +}; + +struct glue_ctx { + struct tle_ctx *tcp_ctx; + struct tle_dev *tcp_dev; + struct tle_dev *lb_tcp_dev; + struct tle_ctx *udp_ctx; + struct tle_dev *udp_dev; + struct tle_dev *lb_udp_dev; + + struct tle_evq *ereq; + struct tle_evq *rxeq; + struct tle_evq *txeq; + + uint16_t port_id; + uint16_t queue_id; + uint16_t lb_port_id; + + struct { + uint8_t ipv4_ml; + uint8_t ipv6_ml; + }; + + struct ether_addr mac; + struct rte_mbuf *arp_wait; + struct tle_timer_wheel *arp_tmw; + uint32_t cycles_ms_shift; /* to convert from cycles to ms */ + + struct { + uint32_t ipv4; + struct in_addr ipv4_gw; + bool lo4_enabled; + + uint32_t arp4_num; + struct arp_entry *arp4; + struct rte_hash *arp_hash; + }; + + struct { + struct in6_addr ipv6; + struct in6_addr ipv6_gw; + bool lo6_enabled; + + uint32_t arp6_num; + struct arp_entry *arp6; + struct rte_hash *arp6_hash; + }; + + struct { + rte_spinlock_t frag_lock; + struct rte_ip_frag_tbl *frag_tbl; + struct rte_ip_frag_death_row frag_dr; + }; + + struct tle_dest lb_dst; + struct tle_dest lb_dst_v6; + + struct tle_mib mib; +} __rte_cache_aligned; + +extern int nb_ctx; +extern struct glue_ctx *default_ctx; +extern struct glue_ctx ctx_array[MAX_NB_CTX]; + +RTE_DECLARE_PER_LCORE(struct glue_ctx *, glue_ctx); + +static inline struct glue_ctx * +get_ctx(void) +{ + if (RTE_PER_LCORE(glue_ctx)) + return RTE_PER_LCORE(glue_ctx); + return default_ctx; +} + +static inline uint8_t +get_cid(void) +{ + return get_ctx() - ctx_array; +} + +uint8_t glue_ctx_alloc(void); + +struct glue_ctx * glue_ctx_lookup(uint16_t port_id, uint16_t queue_id); + +void glue_ctx_free(struct glue_ctx *ctx); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_GLUE_SOCK_H_ */ diff --git a/lib/libtle_glue/epoll.c b/lib/libtle_glue/epoll.c new file mode 100644 index 0000000..1c8751b --- /dev/null +++ b/lib/libtle_glue/epoll.c @@ -0,0 +1,577 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <errno.h> + +#include <rte_common.h> +#include <rte_spinlock.h> +#include <rte_malloc.h> +#include <rte_ethdev.h> +#include <rte_atomic.h> +#include <rte_eal_interrupts.h> + +#include "fd.h" +#include "ctx.h" +#include "sym.h" +#include "log.h" +#include "util.h" +#include "sock.h" +#include "internal.h" +#include "tle_glue.h" +#include "../libtle_l4p/udp_stream.h" +#include "../libtle_l4p/tcp_stream.h" + +#define EPOLL_DATA_SPECIAL 0xFFFFFFFFFFFFFF01 + +/* We don't use rte_eth_dev_rx_intr_ctl_q as it has its + * own way to specify event.data + */ +static int +dev_rx_intr_ctl_q(uint16_t port_id, uint16_t queue_id, int efd, int op, int rx) +{ + int fd, ret; + uint32_t vec, efd_idx; + struct rte_eth_dev *dev; + struct rte_intr_handle *intr_handle; + static struct epoll_event ev = { + .events = EPOLLIN | EPOLLPRI | EPOLLET, + .data = { + .u64 = EPOLL_DATA_SPECIAL, + }, + }; + char buf[32]; + + RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV); + + dev = &rte_eth_devices[port_id]; + if (queue_id >= dev->data->nb_rx_queues) + return -EINVAL; + + if (!dev->intr_handle) + return -ENOTSUP; + + intr_handle = dev->intr_handle; + if (!intr_handle->intr_vec) + return -EPERM; + + vec = intr_handle->intr_vec[queue_id]; + + efd_idx = (vec >= RTE_INTR_VEC_RXTX_OFFSET) ? + (vec - RTE_INTR_VEC_RXTX_OFFSET) : vec; + + fd = intr_handle->efds[efd_idx]; + + if (rx) { + /* almost all devices use eventfd, we shall read out */ + ret = read(fd, buf, sizeof(uint64_t)); + RTE_SET_USED(ret); + } + + return k_epoll_ctl(efd, op, fd, &ev); +} + +int +PRE(epoll_create)(int size) +{ + int epfd; + struct sock *so; + + if (!fd_table_initialized) + return k_epoll_create(size); + + epfd = get_unused_fd(); + if (epfd == -1) { + errno = EMFILE; + return -1; + } + + + so = fd2sock(epfd); + so->cid = glue_ctx_alloc(); + + so->shadow_efd = k_epoll_create(1); + if (so->shadow_efd < 0) + rte_panic("Failed to create shadow efd"); + + if (dev_rx_intr_ctl_q(CTX(so)->port_id, CTX(so)->queue_id, + so->shadow_efd, RTE_INTR_EVENT_ADD, 0) < 0) + rte_panic("Failed to epoll_ctl rxq interrupt fd"); + + so->epoll = 1; + + return epfd; +} + +int +PRE(epoll_create1)(int flags __rte_unused) +{ + return PRE(epoll_create)(1); +} + +int +PRE(epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event) +{ + struct sock *so_ep; + struct sock *so; + + if (is_kernel_fd(epfd)) { + if (!is_kernel_fd(fd)) + rte_panic("kernel epoll (%d) on an userspace fd: %d", + epfd, fd); + + return k_epoll_ctl(epfd, op, fd, event); + } + + so_ep = fd2sock(epfd); + + if (is_kernel_fd(fd)) { + /* Use a shadow epoll fd for possible kernel I/O events. */ + return k_epoll_ctl(so_ep->shadow_efd, op, fd, event); + } + + so = fd2sock(fd); + + if (unlikely(so->cid != so_ep->cid)) + rte_panic("Different ctx %d and %d for epoll fd and socket fd", + so_ep->cid, so->cid); + + GLUE_DEBUG("epoll_ctl: op = %x, fd = %d, event = %x", + op, fd, event->events); + switch (op) { + case EPOLL_CTL_ADD: + if (so->event.events) { + errno = EEXIST; + return -1; + } + +#ifdef LOOK_ASIDE_BACKEND + if (event->events & EPOLLIN) + tle_event_active(&so->rxev, TLE_SEV_DOWN); + if (event->events & EPOLLOUT) + tle_event_active(&so->txev, TLE_SEV_DOWN); +#endif + so->event = *event; + + break; + case EPOLL_CTL_MOD: + if (so->event.events == 0) { + errno = ENOENT; + return -1; + } + +#ifdef LOOK_ASIDE_BACKEND + if (event->events & EPOLLIN) + tle_event_active(&so->rxev, TLE_SEV_DOWN); + else + tle_event_idle(&so->rxev); + if (event->events & EPOLLOUT) + tle_event_active(&so->txev, TLE_SEV_DOWN); + else + tle_event_idle(&so->txev); +#endif + so->event = *event; + break; + case EPOLL_CTL_DEL: + if (so->event.events == 0) { + errno = ENOENT; + return -1; + } + +#ifdef LOOK_ASIDE_BACKEND + if (so->event.events & EPOLLIN) + tle_event_idle(&so->rxev); + if (so->event.events & EPOLLOUT) + tle_event_idle(&so->txev); +#endif + so->event.events = 0; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + +static inline int32_t +tle_evq_fetch(struct tle_evq *evq, const void *evd[], + uint32_t num, uint32_t event) +{ + uint32_t i, k; + uint32_t polled; + struct tle_event *ev; + struct tle_event *next; + + if (evq->nb_armed == 0) + return 0; + + rte_compiler_barrier(); + + rte_spinlock_lock(&evq->lock); + ev = TAILQ_FIRST(&evq->armed); + for (i = 0, k = 0; i != evq->nb_armed; i++) { + next = TAILQ_NEXT(ev, ql); + polled = ((const struct sock *)ev->data)->event.events; + /* Always report EPOLLHUP, see man epoll_ctl(2) */ + if (polled && ((polled | EPOLLHUP) & event)) { + evd[k++] = ev->data; + TAILQ_REMOVE(&evq->armed, ev, ql); + /* don't down erev; and assign NULL to data means this + * ev is already removed from the queue, refer to + * tle_event_idle_err(). + */ + if (event != EPOLLHUP) + ev->state = TLE_SEV_DOWN; + else + ev->data = NULL; + } + if (k == num) + break; + ev = next; + } + evq->nb_armed -= k; + rte_spinlock_unlock(&evq->lock); + return k; +} + +static int +evq_drain(struct tle_evq *q, uint32_t event, + struct epoll_event *events, int maxevents) +{ + uint32_t i, n; + struct sock *socks[maxevents]; + + n = tle_evq_fetch(q, (const void **)(uintptr_t)socks, maxevents, event); + for (i = 0; i < n; ++i) { + events[i].events = event; + events[i].data = socks[i]->event.data; + + /* when EPOLLHUP happens, also return EPOLLIN and EPOLLOUT + * if they are registered. So as to emulate behaviour of linux + * kernel. + * Some applications (e.g. redis) need these events to determine + * following works. + */ + if (event & EPOLLHUP) + events[i].events |= (socks[i]->event.events & + (EPOLLIN | EPOLLOUT)); + + /* if multiple events of single socket are triggered, + * return single event with multiple event types rather than + * multiple events. + * + * we drain evq in order of EPOLLOUT -> EPOLLIN -> EPOLLHUP, + * so only need to check event in evq that has not been drained. + */ + switch (event) { + case EPOLLOUT: + if ((socks[i]->event.events & EPOLLIN) && + tle_event_state(&socks[i]->rxev) == TLE_SEV_UP) { + tle_event_down(&socks[i]->rxev); + events[i].events |= EPOLLIN; + } + /* fallthrough */ + case EPOLLIN: + if (tle_event_state(&socks[i]->erev) == TLE_SEV_UP) { + rte_spinlock_lock(&socks[i]->erev.head->lock); + if (socks[i]->erev.data != NULL && + tle_event_state(&socks[i]->erev) == TLE_SEV_UP) { + TAILQ_REMOVE(&socks[i]->erev.head->armed, + &socks[i]->erev, ql); + socks[i]->erev.head->nb_armed--; + socks[i]->erev.data = NULL; + } + rte_spinlock_unlock(&socks[i]->erev.head->lock); + events[i].events |= EPOLLHUP; + } + } + + GLUE_DEBUG("event for fd = %d, event = %x", + socks[i]->event.data.fd, event); + } + return n; +} + +#ifdef LOOK_ASIDE_BACKEND +rte_atomic32_t flag_sleep; + +int +epoll_kernel_wait(struct glue_ctx *ctx, int efd, + struct epoll_event *events, + int maxevents, int timeout, int *rx) +{ + struct epoll_event event; + uint16_t port_id = ctx->port_id; + uint16_t queue_id = ctx->queue_id; + + RTE_SET_USED(events); + RTE_SET_USED(maxevents); + RTE_SET_USED(rx); + + rte_eth_dev_rx_intr_enable(port_id, queue_id); + + /* TODO: timeout shall be limited by the latest tcp timer */ + + if (be_process(ctx) > 0) /* use this way to avoid concurrency */ { + /* Do nothing */ + } else + sleep_with_lock(efd, &event, 1, timeout); + + rte_eth_dev_rx_intr_disable(port_id, queue_id); + /* We don't have kernel events for report, so just return zero */ + return 0; +} +#else +int +epoll_kernel_wait(struct glue_ctx *ctx, int efd, + struct epoll_event *events, + int maxevents, int timeout, int *rx) +{ + int i, j, rc; + int flag_tmp = 0; + uint16_t port_id = ctx->port_id; + uint16_t queue_id = ctx->queue_id; +#define LEAST_EVENTS 8 + struct epoll_event s_events[LEAST_EVENTS]; + struct epoll_event *r_events; + int r_maxevents; + int fastpath = 0; + + *rx = 0; + + if (efd == -1) { + flag_tmp = 1; + efd = k_epoll_create(1); + if (efd < 0) + rte_panic("Failed to create tmp efd"); + } + + if (stopped) { + rc = k_epoll_pwait(efd, events, maxevents, timeout, NULL); + goto check; + } + + if (maxevents < LEAST_EVENTS) { + r_events = s_events; + r_maxevents = maxevents + 1; + } else { + r_events = events; + r_maxevents = maxevents; + } + + if (flag_tmp && + dev_rx_intr_ctl_q(port_id, queue_id, efd, RTE_INTR_EVENT_ADD, 0) < 0) + /* TODO: fall back to busy polling */ + rte_panic("Failed to enable rxq interrupt"); + + rte_eth_dev_rx_intr_enable(port_id, queue_id); + + /* TODO: timeout shall be limited by the latest tcp timer */ + + if (timeout != 0 && be_process(ctx) > 0) { + /* use this way to avoid concurrency */ + rc = 0; + fastpath = 1; + } else + rc = sleep_with_lock(efd, r_events, r_maxevents, timeout); + + rte_eth_dev_rx_intr_disable(port_id, queue_id); + + /* filter out rxq event */ + for (i = 0, j = 0; i < rc; ++i) { + if (r_events[i].data.u64 == EPOLL_DATA_SPECIAL) { + *rx = true; + if (i + 1 < rc) { + memcpy(&r_events[j], &r_events[i+1], + (rc-i-1) * sizeof(*events)); + } + rc -= 1; + break; + } else { + if (i != j) + r_events[j] = r_events[i]; + j++; + } + } + + if (rc > 0 && maxevents < LEAST_EVENTS) + memcpy(events, r_events, rc * sizeof(*events)); + + if (flag_tmp) + dev_rx_intr_ctl_q(port_id, queue_id, efd, + RTE_INTR_EVENT_DEL, *rx); + + if (fastpath) + *rx = true; +check: + if (flag_tmp) + close(efd); + + return rc; +} +#endif + +/* If only there are some packets to process, we don't sleep; we will poll + * for some number of iterations to check packets. + * + * TODO: change to wait for a period of time? + */ +#define IDLE_ITERATIONS 5 + +int +poll_common(struct glue_ctx *ctx, struct epoll_event *events, + int maxevents, int timeout, int shadow_efd) +{ + int rx; + int total = 0; + int idle = IDLE_ITERATIONS; + +again: + /* We will start with send, then recv, and last err queue, as we want + * to serve exiting connections firstly, then new connections, and + * lastly, the wrong connections. + */ + + /* 0. send evq */ + total += evq_drain(ctx->txeq, EPOLLOUT, + events + total, maxevents-total); + if (total == maxevents) + return total; + + /* 1. recv evq */ + total += evq_drain(ctx->rxeq, EPOLLIN, + events + total, maxevents-total); + if (total == maxevents) + return total; + + /* 2. err evq */ + total += evq_drain(ctx->ereq, EPOLLHUP, + events + total, maxevents-total); + + if (total > 0) + return total; + + if (idle > 0) { + if (be_process(ctx) == 0) + idle--; + else + idle = IDLE_ITERATIONS; + goto again; + } + + if (timeout == 0) + return 0; + + /* Setup rxq interrupt mode, and check kernel I/O events */ + total = epoll_kernel_wait(ctx, shadow_efd, events, + maxevents, timeout, &rx); + + /* Kernel I/O events are available (total > 0) or + * timeout (total < 0) or something bad happens. + */ + if (total != 0) + return total; + + /* Check userspace I/O events */ + idle = IDLE_ITERATIONS; + be_process(ctx); + goto again; +} + +int +PRE(epoll_wait)(int epfd, struct epoll_event *events, + int maxevents, int timeout) +{ + struct sock *so; + + if (is_kernel_fd(epfd)) + return k_epoll_pwait(epfd, events, maxevents, timeout, NULL); + + so = fd2sock(epfd); + + /* thread <> context binding happens here */ + if (RTE_PER_LCORE(glue_ctx) == NULL) + RTE_PER_LCORE(glue_ctx) = CTX(so); + + return poll_common(CTX(so), events, maxevents, timeout, so->shadow_efd); +} + +int +PRE(epoll_pwait)(int epfd, struct epoll_event *events, + int maxevents, int timeout, const sigset_t *sigmask) +{ + if (sigmask != NULL) { + rte_panic("epoll_pwait with signal is not supported"); + } + + return epoll_wait(epfd, events, maxevents, timeout); +} + +int +fd_ready(int fd, int events) +{ + int ret = 0; + struct sock *so = fd2sock(fd); + + if (unlikely(!so->s)) { + if (tle_event_state(&so->erev) == TLE_SEV_UP) + /* socket has been shutdown */ + return events | EPOLLHUP; + else /* socket is not set up yet */ + return 0; + } + + if (unlikely(IS_TCP(so) && + TCP_STREAM(so->s)->tcb.state == TCP_ST_CLOSED)) { + return events | EPOLLHUP | EPOLLERR; + } + + if (tle_event_state(&so->erev) == TLE_SEV_UP) + ret |= EPOLLHUP; + + if (events & EPOLLIN) { + if (so->rx_left || + (IS_TCP(so) && rte_ring_count(TCP_STREAM(so->s)->rx.q) > 0) || + (IS_UDP(so) && rte_ring_count(UDP_STREAM(so->s)->rx.q) > 0)) + ret |= EPOLLIN; + } + + if (events & EPOLLOUT) { + if ((IS_TCP(so) && + TCP_STREAM(so->s)->tcb.state >= TCP_ST_ESTABLISHED && + rte_ring_free_count(TCP_STREAM(so->s)->tx.q) > 0) || + (IS_UDP(so) && + rte_ring_count(UDP_STREAM(so->s)->tx.drb.r) > 0)) + ret |= EPOLLOUT; + } + + return ret; +} + +void +v_get_stats_snmp(unsigned long mibs[]) +{ + int i, j, k; + + memcpy(mibs, &default_mib, sizeof(default_mib)); + + for (i = 0; i < nb_ctx; ++i) { + for (j = 0; j < TCP_MIB_MAX; ++j) + mibs[j] += ctx_array[i].mib.tcp.mibs[j]; + + for (k = 0; k < UDP_MIB_MAX; ++k) + mibs[j+k] += ctx_array[i].mib.udp.mibs[k]; + } +} diff --git a/lib/libtle_glue/fd.c b/lib/libtle_glue/fd.c new file mode 100644 index 0000000..cc855f9 --- /dev/null +++ b/lib/libtle_glue/fd.c @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/time.h> +#include <sys/resource.h> +#include <string.h> + +#include "fd.h" +#include "log.h" +#include "util.h" +#include "config.h" + +bool fd_table_initialized; + +struct fd_table fd_table = { .fd_base = INT_MAX, }; + +static int +get_ulimit_nofile(void) +{ + struct rlimit rlim; + +#define GLUE_BASE_FD 1024 + if (getrlimit(RLIMIT_NOFILE, &rlim) < 0) + return GLUE_BASE_FD; + + return rlim.rlim_cur; /* soft limit, rlim_max is the hard limit */ +} + +static void +fd_num_set(int *fd_base, int *fd_num) +{ + int limit = get_ulimit_nofile(); + + /* fix me: alignment of power of two */ + /* fix me: use dup2 to occupy these fds */ + *fd_num = limit / 2; + *fd_num = RTE_MIN(MAX_STREAMS_PER_CORE * 2 * MAX_NB_CTX, *fd_num); + + *fd_base = limit - *fd_num; + GLUE_LOG(INFO, "fd_base = %d, fd_num = %d", *fd_base, *fd_num); +} + +static void +add_fd(struct rte_mempool *mp __rte_unused, void *opaque __rte_unused, + void *obj, unsigned obj_idx) +{ + ((struct sock *)obj)->fd = obj_idx + fd_table.fd_base; + fd_table.socks[obj_idx] = obj; +} + +void +fd_init(void) +{ + int ret; + size_t sz; + uint32_t socket_id; + int fd_base, fd_num; + struct rte_mempool *mp = NULL; + char name[RTE_MEMPOOL_NAMESIZE]; + + socket_id = get_socket_id(); + + fd_num_set(&fd_base, &fd_num); + + sz = sizeof(fd_table.socks[0]) * fd_num; + fd_table.socks = rte_zmalloc_socket("fdtable", sz, + RTE_CACHE_LINE_SIZE, socket_id); + if (fd_table.socks == NULL) { + GLUE_LOG(ERR, "Failed to malloc fd table"); + goto err; + } + + snprintf(name, RTE_MEMPOOL_NAMESIZE, "mp_fd_%d_%d", fd_base, fd_num); + mp = rte_mempool_create_empty(name, fd_num - 1, sizeof(struct sock), + 32, 0, socket_id, MEMPOOL_F_DYNAMIC); + if (mp == NULL) { + GLUE_LOG(ERR, "Failed to create mp for fd table"); + goto err; + } + + GLUE_LOG(INFO, "sizeof(struct sock): %lu, elt_size of fd table = %u", + sizeof(struct sock), mp->elt_size); + + ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL); + if (ret != 0) { + GLUE_LOG(ERR, "Failed to set mp ops: %d", ret); + goto err; + } + + rte_mempool_set_dynamic_size(mp, 1024); + rte_mempool_set_dynamic_cb(mp, add_fd); + + fd_table.mp = mp; + fd_table.fd_base = fd_base; + fd_table.fd_num = fd_num; + + /* should populate after fd_table is set */ + ret = rte_mempool_populate_default(mp); + if (ret < 0) { + GLUE_LOG(ERR, "Failed to populate mp: %d", ret); + goto err; + } + + fd_table_initialized = true; + + return; +err: + rte_mempool_free(mp); + rte_panic("Failed to init fd_table"); +} diff --git a/lib/libtle_glue/fd.h b/lib/libtle_glue/fd.h new file mode 100644 index 0000000..d0ac4fe --- /dev/null +++ b/lib/libtle_glue/fd.h @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_FD_H_ +#define _TLE_GLUE_FD_H_ + +#include <stdbool.h> +#include <sys/epoll.h> +#include <fcntl.h> + +#include <rte_mempool.h> +#include <rte_malloc.h> + +#include <tle_event.h> +#include <tle_ctx.h> +#include <tle_tcp.h> + +#include "log.h" +#include "sock.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct fd_table { + int fd_base; /* The mininum fd, 64 aligned */ + int fd_num; /* The number of fds, 64 aligned */ + struct rte_mempool *mp; /* O(1) get and put */ + struct sock **socks; +}; + +extern bool fd_table_initialized; +extern struct fd_table fd_table; + +static inline struct sock * +fd2sock(int fd) +{ + return fd_table.socks[fd - fd_table.fd_base]; +} + +static inline int +sock2fd(struct sock *so) +{ + return so->fd; +} + +static inline int +get_unused_fd(void) +{ + struct sock *so; + + if (unlikely(rte_mempool_get(fd_table.mp, (void **)&so) < 0)) { + GLUE_LOG(ERR, "FDs have been exhausted"); + return -1; + } + + so->valid = 1; + return sock2fd(so); +} + +static inline void +tle_event_idle_err(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state == TLE_SEV_IDLE) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP && ev->data) { + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + ev->state = TLE_SEV_IDLE; + rte_spinlock_unlock(&q->lock); +} + +static inline void +put_free_fd(int fd) +{ + struct sock *so = fd2sock(fd); + + rte_mempool_put(fd_table.mp, so); +} + +static inline bool +is_kernel_fd(int fd) +{ + return fd < fd_table.fd_base; +} + +void fd_init(void); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_GLUE_FD_H_ */ diff --git a/lib/libtle_glue/gateway.h b/lib/libtle_glue/gateway.h new file mode 100644 index 0000000..29de6b1 --- /dev/null +++ b/lib/libtle_glue/gateway.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GATEWAY_H_ +#define _TLE_GATEWAY_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +static inline bool +is_ipv4_loopback_addr(in_addr_t addr, struct glue_ctx *ctx) +{ + if (addr == ctx->ipv4 || addr == htonl(INADDR_LOOPBACK)) + return true; + else + return false; +} + +static inline bool +is_ipv6_loopback_addr(const struct in6_addr *addr, struct glue_ctx *ctx) +{ + if (memcmp(addr, &ctx->ipv6, sizeof(struct in6_addr)) == 0 || + IN6_IS_ADDR_LOOPBACK(addr) || + (IN6_IS_ADDR_V4COMPAT(addr) && + addr->__in6_u.__u6_addr32[3] == htonl(INADDR_LOOPBACK)) || + (IN6_IS_ADDR_V4MAPPED(addr) && + addr->__in6_u.__u6_addr32[3] == htonl(INADDR_LOOPBACK))) + return true; + else + return false; +} + +static inline const struct in_addr * +ipv4_gateway_lookup(void *data, const struct in_addr *addr) +{ + uint8_t ls; + struct glue_ctx *ctx = data; + + if (is_ipv4_loopback_addr(addr->s_addr, ctx)) + return addr; + + ls = 32 - ctx->ipv4_ml; + if ((addr->s_addr << ls) == (ctx->ipv4 << ls)) + return addr; + + if (ctx->ipv4_gw.s_addr != 0) + return &ctx->ipv4_gw; + + return addr; +} + +static inline const struct in6_addr * +ipv6_gateway_lookup(void *data, const struct in6_addr *addr) +{ + uint8_t ls; + struct glue_ctx *ctx = data; + + if (is_ipv6_loopback_addr(addr, ctx)) + return addr; + + if (ctx->ipv6_ml <= 64) { + ls = 64 - ctx->ipv6_ml; + if ((*(const uint64_t*)addr << ls) == + (*(const uint64_t*)&ctx->ipv6 << ls)) + return addr; + } else if (*(const uint64_t*)addr == *(const uint64_t*)&ctx->ipv6) { + ls = 128 - ctx->ipv6_ml; + if ((*((const uint64_t*)addr + 1) << ls) == + (*((const uint64_t*)&ctx->ipv6 + 1) << ls)) + return addr; + } + + if (!IN6_IS_ADDR_UNSPECIFIED(&ctx->ipv6_gw)) + return &ctx->ipv6_gw; + + return addr; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_GATEWAY_H_ */ diff --git a/lib/libtle_glue/icmp.c b/lib/libtle_glue/icmp.c new file mode 100644 index 0000000..aba1c4b --- /dev/null +++ b/lib/libtle_glue/icmp.c @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <time.h> +#include <netinet/icmp6.h> + +#include <rte_common.h> +#include <rte_byteorder.h> +#include <rte_ethdev.h> +#include <rte_icmp.h> +#include <rte_ip.h> + +#include "log.h" +#include "ctx.h" +#include "internal.h" + +#define ICMP_ECHOREPLY 0 /* Echo Reply */ +#define ICMP_ECHO 8 /* Echo Request */ +#define ICMP_TIMESTAMP 13 /* Timestamp Request */ +#define ICMP_TIMESTAMPREPLY 14 /* Timestamp Reply */ + +/* Codes for TIME_EXCEEDED. */ +#define ICMP_EXC_TTL 0 /* TTL count exceeded */ +#define ICMP_EXC_FRAGTIME 1 /* Fragment Reass time exceeded */ + +/* Parameters used to convert the timespec values */ +#define SECONDS_PER_DAY 86400L +#define MSEC_PER_SEC 1000L +#define USEC_PER_MSEC 1000L +#define NSEC_PER_USEC 1000L +#define NSEC_PER_MSEC (NSEC_PER_USEC * USEC_PER_MSEC) + +#define IS_IPV4_BCAST(x) ((x) == (uint32_t)0xFFFFFFFF) + +struct icmp_pkt { + struct icmp_hdr icmp_h; + uint32_t times[3]; +}; + +/* Return remainder for ``dividend / divisor`` */ +static inline uint32_t +div_uint64_rem(uint64_t dividend, uint32_t divisor) +{ + return dividend % divisor; +} + +/* Return milliseconds since midnight (UTC) in network byte order. */ +static uint32_t +current_timestamp(void) +{ + struct timespec ts; + uint32_t msecs; + uint32_t secs; + + (void)clock_gettime(CLOCK_REALTIME, &ts); + + /* Get secs since midnight. */ + secs = div_uint64_rem(ts.tv_sec, SECONDS_PER_DAY); + /* Convert to msecs. */ + msecs = secs * MSEC_PER_SEC; + /* Convert nsec to msec. */ + msecs += (uint32_t)ts.tv_nsec / NSEC_PER_MSEC; + + /* Convert to network byte order. */ + return rte_cpu_to_be_32(msecs); +} + +/* + * Process the checksum of an ICMP packet. The checksum field must be set + * to 0 by the caller. + */ +static uint16_t +icmp_cksum(const struct icmp_hdr *icmp, uint32_t data_len) +{ + uint16_t cksum; + + cksum = rte_raw_cksum(icmp, sizeof(struct icmp_hdr) + data_len); + return (cksum == 0xffff) ? cksum : ~cksum; +} + +/** + * Receive and handle an ICMP packet. + * + * @param ctx + * The pointer to the glue context. + * @param pkt + * The pointer to the raw packet data. + * @param l2_len + * The the size of the l2 header. + * @return + * MUST return NULL now. :-) + */ +struct rte_mbuf * +icmp_recv(struct glue_ctx *ctx, struct rte_mbuf *pkt, + uint32_t l2_len, uint32_t l3_len) +{ + struct ether_addr eth_addr; + struct icmp_pkt *icmp_pkt; + struct ether_hdr *eth_h; + struct icmp_hdr *icmp_h; + struct ipv4_hdr *ip_h; + uint32_t ip_addr; + uint32_t cksum; + + eth_h = rte_pktmbuf_mtod(pkt, struct ether_hdr *); + ip_h = (struct ipv4_hdr *) ((char *)eth_h + l2_len); + + icmp_h = (struct icmp_hdr *)((char *)ip_h + l3_len); + if (icmp_h->icmp_type != IP_ICMP_ECHO_REQUEST && + icmp_h->icmp_type != ICMP_TIMESTAMP) + goto drop_pkt; + + icmp_pkt = (struct icmp_pkt *)icmp_h; + + ether_addr_copy(ð_h->s_addr, ð_addr); + ether_addr_copy(ð_h->d_addr, ð_h->s_addr); + ether_addr_copy(ð_addr, ð_h->d_addr); + + /* + * Similar to Linux implementation, we silently drop the broadcast or + * multicast ICMP pakcets. + * + * RFC 1122: 3.2.2.6 An ICMP_ECHO to broadcast MAY be + * silently ignored. + * RFC 1122: 3.2.2.8 An ICMP_TIMESTAMP MAY be silently + * discarded if to broadcast/multicast. + */ + ip_addr = rte_be_to_cpu_32(ip_h->dst_addr); + if (IS_IPV4_MCAST(ip_addr) || IS_IPV4_BCAST(ip_addr)) + goto drop_pkt; + + ip_addr = ip_h->src_addr; + ip_h->src_addr = ip_h->dst_addr; + ip_h->dst_addr = ip_addr; + + if (icmp_h->icmp_type == IP_ICMP_ECHO_REQUEST && + icmp_h->icmp_code == 0) { + + /* Must clear checksum field before calling the helper. */ + ip_h->hdr_checksum = 0; + ip_h->hdr_checksum = rte_ipv4_cksum(ip_h); + + icmp_h->icmp_type = IP_ICMP_ECHO_REPLY; + icmp_h->icmp_code = 0; + + /* + * Fix me: the data part of an ICMP echo request/reply + * message is implementation specific, we don't know + * how to verify or calculate the checksum. + * + * Need to see BSD or LINUX implementation. + */ + cksum = ~icmp_h->icmp_cksum & 0xffff; + cksum += ~rte_cpu_to_be_16(IP_ICMP_ECHO_REQUEST << 8) & 0xffff; + cksum += rte_cpu_to_be_16(IP_ICMP_ECHO_REPLY << 8); + cksum = (cksum & 0xffff) + (cksum >> 16); + cksum = (cksum & 0xffff) + (cksum >> 16); + icmp_h->icmp_cksum = ~cksum; + + } else if (icmp_h->icmp_type == ICMP_TIMESTAMP && + icmp_h->icmp_code == 0) { + + /* + * RFC 1122: 3.2.2.8 MAY implement ICMP timestamp requests. + * SHOULD be in the kernel for minimum random latency. + * MUST be accurate to a few minutes. + * MUST be updated at least at 15Hz. + */ + icmp_h->icmp_type = ICMP_TIMESTAMPREPLY; + icmp_h->icmp_code = 0; + icmp_pkt->times[1] = current_timestamp(); + icmp_pkt->times[2] = icmp_pkt->times[1]; + + icmp_h->icmp_cksum = 0; + /* the data part of an ICMP timestamp reply is 12 bytes. */ + icmp_h->icmp_cksum = icmp_cksum(icmp_h, 12); + } else + goto drop_pkt; + + if (pkt->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(pkt, ETHER_MIN_LEN - pkt->pkt_len); + + if (rte_eth_tx_burst(ctx->port_id, ctx->queue_id, &pkt, 1)) + GLUE_LOG(DEBUG, "Send ICMP echo reply OK"); + + return NULL; + +drop_pkt: + rte_pktmbuf_free(pkt); + return NULL; +} + +/** + * Receive and handle an ICMPv6 packet. + * + * @param ctx + * The pointer to the glue context. + * @param pkt + * The pointer to the raw packet data. + * @param l2_len + * The the size of the l2 header. + * @return + * MUST return NULL now. :-) + */ +struct rte_mbuf * +icmp6_recv(struct glue_ctx *ctx, struct rte_mbuf *pkt, + uint32_t l2_len, uint32_t l3_len) +{ + struct ether_addr eth_addr; + struct ether_hdr *eth_h; + struct icmp6_hdr *icmp6_h; + struct ipv6_hdr *ipv6_h; + struct in6_addr ipv6_addr; + uint32_t cksum; + + eth_h = rte_pktmbuf_mtod(pkt, struct ether_hdr *); + ipv6_h = (struct ipv6_hdr *) ((char *)eth_h + l2_len); + + icmp6_h = (struct icmp6_hdr *)((char *)ipv6_h + l3_len); + + /* NDP pkt */ + if ((icmp6_h->icmp6_type == ND_NEIGHBOR_SOLICIT || + icmp6_h->icmp6_type == ND_NEIGHBOR_ADVERT) && + icmp6_h->icmp6_code == 0) + return ndp_recv(ctx, pkt, l2_len, l3_len); + + /* only support ECHO now, other types of pkts are dropped */ + if ((icmp6_h->icmp6_type != ICMP6_ECHO_REQUEST && + icmp6_h->icmp6_type != ICMP6_ECHO_REPLY) || + icmp6_h->icmp6_code != 0) + goto drop_pkt; + + ether_addr_copy(ð_h->s_addr, ð_addr); + ether_addr_copy(ð_h->d_addr, ð_h->s_addr); + ether_addr_copy(ð_addr, ð_h->d_addr); + + /* + * Now, we silently drop the anycast or multicast ICMP pakcets. + * But it does not conform to RFC 4443. Maybe fix it latter. + * + * RFC 4443: 4.2 An Echo Reply SHOULD be sent in response to an + * Echo Request message sent to an IPv6 multicast or anycast address. + * In this case, thesource address of the reply MUST be a unicast + * address belonging to the interface on which the Echo Request + * message was received. + */ + switch (icmp6_h->icmp6_type) { + case ICMP6_ECHO_REQUEST: + if (memcmp(ipv6_h->dst_addr, &ctx->ipv6, + sizeof(struct in6_addr)) != 0) + goto drop_pkt; + + rte_memcpy(&ipv6_addr, ipv6_h->src_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->src_addr, ipv6_h->dst_addr, + sizeof(struct in6_addr)); + rte_memcpy(ipv6_h->dst_addr, &ipv6_addr, + sizeof(struct in6_addr)); + + icmp6_h->icmp6_type = ICMP6_ECHO_REPLY; + + cksum = ~icmp6_h->icmp6_cksum & 0xffff; + cksum += ~rte_cpu_to_be_16(ICMP6_ECHO_REQUEST << 8) & 0xffff; + cksum += rte_cpu_to_be_16(ICMP6_ECHO_REPLY << 8); + cksum = (cksum & 0xffff) + (cksum >> 16); + cksum = (cksum & 0xffff) + (cksum >> 16); + icmp6_h->icmp6_cksum = ~cksum; + + break; + default: + goto drop_pkt; + } + + if (pkt->pkt_len < ETHER_MIN_LEN) + rte_pktmbuf_append(pkt, ETHER_MIN_LEN - pkt->pkt_len); + + if (rte_eth_tx_burst(ctx->port_id, ctx->queue_id, &pkt, 1)) + GLUE_LOG(DEBUG, "Send ICMP echo reply OK"); + + return NULL; + +drop_pkt: + rte_pktmbuf_free(pkt); + return NULL; +} diff --git a/lib/libtle_glue/init.c b/lib/libtle_glue/init.c new file mode 100644 index 0000000..d845ef8 --- /dev/null +++ b/lib/libtle_glue/init.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <sched.h> +#include <pthread.h> +#include <stdlib.h> + +#include <rte_compat.h> +#include <rte_common.h> +#include <rte_debug.h> +#include <rte_eal.h> + +#include "util.h" +#include "fd.h" +#include "ctx.h" +#include "sym.h" +#include "log.h" +#include "internal.h" +#include "tle_glue.h" + +void +glue_init1(int argc, char **argv) +{ + GLUE_LOG(INFO, "init: DPDK and fd table..."); + + if (rte_eal_init(argc, argv) < 0) + rte_panic("Failed to init DPDK"); + + fd_init(); +} + +static void __attribute__((constructor(1000))) +glue_init(void) +{ + char *p; + int i, err, argc = 0; + char **argv = NULL, **argv_to_release = NULL; + char *vnic, *params, *no_huge; + cpu_set_t cpuset; + pthread_t tid = pthread_self(); + + symbol_init(); + +#define DPDK_PARAMS "DPDK_PARAMS" + params = getenv(DPDK_PARAMS); +#define DPDK_NO_HUGE "DPDK_NO_HUGE" + no_huge = getenv(DPDK_NO_HUGE); +#define DPDK_VNIC "DPDK_VNIC" + vnic = getenv(DPDK_VNIC); + + if (params == NULL && no_huge == NULL && vnic == NULL) + return; + + argv = grow_argv(argv, argc, 1); + argv[argc++] = xstrdup("userspace-stack"); + + /* Get the main thread affinity */ + CPU_ZERO(&cpuset); + err = pthread_getaffinity_np(tid, sizeof(cpu_set_t), &cpuset); + if (!err) { + for (i = 0; i < CPU_SETSIZE; i++) { + if (CPU_ISSET(i, &cpuset)) { + argv = grow_argv(argv, argc, 2); + argv[argc++] = xstrdup("-l"); + argv[argc++] = xasprintf("%d", i); + i = CPU_SETSIZE; + } + } + } else { + argv = grow_argv(argv, argc, 2); + argv[argc++] = xstrdup("-l"); + argv[argc++] = xasprintf("0"); + } + + if (params) + p = strtok(params, " "); + else + p = NULL; + while (p != NULL) { + argv = grow_argv(argv, argc, 1); + argv[argc++] = xstrdup(p); + p = strtok(NULL, " "); + } + + if (no_huge) { + argv = grow_argv(argv, argc, 3); + argv[argc++] = xstrdup("-m"); + argv[argc++] = xstrdup("2048"); + argv[argc++] = xstrdup("--no-huge"); + } + + if (vnic) { + argv = grow_argv(argv, argc, 2); + argv[argc++] = xstrdup(vnic); + argv[argc++] = xstrdup("--no-pci"); + } + + argv = grow_argv(argv, argc, 1); + argv[argc++] = xstrdup("--"); + + argv_to_release = grow_argv(argv_to_release, 0, argc); + for (i = 0; i < argc; ++i) + argv_to_release[i] = argv[i]; + + glue_init1(argc, argv); + + /* Alloc and setup this default ctx for any sockets operations before + * thread/ctx binding which happens when epoll_wait. + */ + glue_ctx_alloc(); + + release_argv(argc, argv_to_release, argv); + + /* Set back the affinity */ + err = pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset); + if (err) + GLUE_LOG(ERR, "Failed to set back affinity"); +} + +static void __attribute__((destructor)) +glue_uninit(void) +{ + struct sock *so; + struct glue_ctx *ctx; + int i, max = fd_table.fd_base + fd_table.fd_num; + + /* TODO: lets optimize it */ + for (i = fd_table.fd_base; i < max; i++) { + so = fd2sock(i); + if (!so || !so->valid) + continue; + if (IS_TCP(so)) + tle_tcp_stream_kill(so->s); + } + + for (i = 0; i < nb_ctx; ++i) { + ctx = glue_ctx_lookup(0, i); + while (be_process(ctx)) { /* empty */ }; + } +} diff --git a/lib/libtle_glue/internal.h b/lib/libtle_glue/internal.h new file mode 100644 index 0000000..91fe784 --- /dev/null +++ b/lib/libtle_glue/internal.h @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_INTERNAL_H_ +#define _TLE_GLUE_INTERNAL_H_ + +#include <rte_mbuf.h> +#include <rte_atomic.h> + +#include <tle_ctx.h> + +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/epoll.h> + +#include "ctx.h" +#include "sym.h" +#include <rte_mempool.h> + +#ifdef __cplusplus +extern "C" { +#endif + +extern int stopped; + +extern uint64_t rx_offload; +extern uint64_t tx_offload; + +void port_reconfig(void); + +uint16_t create_loopback(uint32_t socket_id); + +struct rte_mempool * get_mempool_by_socket(int32_t socket_id); + +int be_process(struct glue_ctx *ctx); + +int be_tx(struct glue_ctx *ctx); + +struct rte_mbuf * arp_recv(struct glue_ctx *ctx, + struct rte_mbuf *m, uint32_t l2len); + +struct rte_mbuf * ndp_recv(struct glue_ctx *ctx, + struct rte_mbuf *m, uint32_t l2len, uint32_t l3len); + + +void mac_check(struct glue_ctx *ctx, const struct sockaddr* addr); + +int arp_ipv4_dst_lookup(void *data, const struct in_addr *addr, + struct tle_dest *res, int proto); + +int arp_ipv6_dst_lookup(void *data, const struct in6_addr *addr, + struct tle_dest *res, int proto); + +int mac_fill(struct glue_ctx *ctx, struct rte_mbuf *m); + +void mac_timeout(struct glue_ctx *ctx); + +int setup_rx_cb(uint16_t port_id, uint16_t qid); + +int epoll_kernel_wait(struct glue_ctx *ctx, int efd, + struct epoll_event *events, + int maxevents, int timeout, int *rx); + +int poll_common(struct glue_ctx *ctx, struct epoll_event *events, + int maxevents, int timeout, int shadow_efd); + +int dev_rxq_wakeup(uint16_t port_id); + +struct rte_mbuf * icmp_recv(struct glue_ctx *ctx, struct rte_mbuf *pkt, + uint32_t l2len, uint32_t l3len); + +struct rte_mbuf * icmp6_recv(struct glue_ctx *ctx, struct rte_mbuf *pkt, + uint32_t l2len, uint32_t l3len); + +uint16_t typen_rx_callback(uint16_t port, uint16_t queue, + struct rte_mbuf *pkt[], uint16_t nb_pkts, + uint16_t max_pkts, void *user_param); + +void ipv4_dst_add(struct glue_ctx *ctx, const struct in_addr *addr, + struct ether_addr *e_addr); + +void ipv6_dst_add(struct glue_ctx *ctx, const struct in6_addr *addr, + struct ether_addr *e_addr); + +#ifdef LOOK_ASIDE_BACKEND +extern rte_atomic32_t flag_sleep; + +enum { + IOTHREAD_BUSY = 0, /* io thread is busy */ + IOTHREAD_SLEEP, /* io thread is sleeping */ + IOTHREAD_PREEMPT, /* io thread is preempted by another worker thread */ +}; + +static inline int +sleep_with_lock(int efd, struct epoll_event *events, int max, int to) +{ + int rc; + + rte_atomic32_set(&flag_sleep, IOTHREAD_SLEEP); + rc = k_epoll_pwait(efd, events, max, to, NULL); + while (rte_atomic32_cmpset((volatile uint32_t *)&flag_sleep, + IOTHREAD_SLEEP, IOTHREAD_BUSY) == 0); + + return rc; +} + +static inline void +be_tx_with_lock(struct glue_ctx *ctx) +{ + if (rte_atomic32_cmpset((volatile uint32_t *)&flag_sleep, + IOTHREAD_SLEEP, IOTHREAD_PREEMPT)) { + while (be_tx(ctx) > 0) {}; + rte_atomic32_set(&flag_sleep, IOTHREAD_SLEEP); + } +} + +static inline void +wake_lookaside_backend(struct glue_ctx *ctx) +{ + if (rte_atomic32_read(&flag_sleep) == IOTHREAD_PREEMPT) + dev_rxq_wakeup(ctx->port_id); +} + +static inline bool +io_thread_in_sleep(void) +{ + return rte_atomic32_read(&flag_sleep) == IOTHREAD_SLEEP; +} +#else +#define sleep_with_lock k_epoll_wait +#define be_tx_with_lock(ctx) do {} while(0) +#define wake_lookaside_backend(ctx) do {} while(0) +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_GLUE_INTERNAL_H_ */ diff --git a/lib/libtle_glue/log.h b/lib/libtle_glue/log.h new file mode 100644 index 0000000..da31ea3 --- /dev/null +++ b/lib/libtle_glue/log.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GLUE_LOG_H_ +#define _GLUE_LOG_H_ + +#include <arpa/inet.h> +#include <stdint.h> +#include <stdio.h> + +#include <rte_vect.h> +#include <rte_memcpy.h> +#include <rte_spinlock.h> +#include <rte_log.h> +#include <rte_errno.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * logging related macros. + */ + +#define GLUE_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt "\n", ##args) + +#define DUMMY_MACRO do {} while (0) + +#ifdef ENABLE_DEBUG +#define GLUE_DEBUG(fmt, arg...) fprintf(stderr, fmt "\n", ##arg) +#else +#define GLUE_DEBUG(fmt, arg...) DUMMY_MACRO +#endif + +#ifdef ENABLE_TRACE +#define TRACE(fmt, arg...) fprintf(stderr, fmt "\n", ##arg) +#define PKT_DUMP(p) rte_pktmbuf_dump(stderr, (p), 64) +#else +#define TRACE(fmt, arg...) DUMMY_MACRO +#define PKT_DUMP(p) DUMMY_MACRO +#endif + +#ifdef DEBUG_ARP +static inline void +print_arp(int af, const void *src, const struct ether_addr *mac, + const char *action) +{ + char str_ip[64]; + char str_mac[32]; + socklen_t sz; + + ether_format_addr(str_mac, sizeof(str_mac), mac); + sz = (af == AF_INET) ? sizeof(struct in_addr) : sizeof(struct in6_addr); + inet_ntop(af, src, str_ip, sz); + RTE_LOG(INFO, "%s ARP entry: %s\tmac=%s", action, str_ip, str_mac); +} +#else +#define print_arp(arg...) DUMMY_MACRO +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _GLUE_LOG_H_ */ diff --git a/lib/libtle_glue/ndp.h b/lib/libtle_glue/ndp.h new file mode 100644 index 0000000..a61ff5b --- /dev/null +++ b/lib/libtle_glue/ndp.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_NDP_H_ +#define _TLE_NDP_H_ + +#define ND_OPT_SOURCE_LINKLAYER_ADDR 1 +#define ND_OPT_TARGET_LINKLAYER_ADDR 2 +#define ND_OPT_PREFIX_INFORMATION 3 +#define ND_OPT_REDIRECTED_HEADER 4 +#define ND_OPT_MTU 5 + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_NDP_H_ */ diff --git a/lib/libtle_glue/packetdrill.c b/lib/libtle_glue/packetdrill.c new file mode 100644 index 0000000..79d1d52 --- /dev/null +++ b/lib/libtle_glue/packetdrill.c @@ -0,0 +1,544 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdarg.h> +#include <stdlib.h> +#include <sys/time.h> +#include <arpa/inet.h> + +#include "packetdrill.h" +#include "tle_glue.h" +#include "internal.h" +#include "fd.h" + +#include <rte_arp.h> +#include <rte_common.h> +#include <rte_ethdev.h> +#include <rte_ip.h> +#include <rte_vhost.h> + +static int vhost_vid; +enum {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM}; +static const char *sockname = "/tmp/sock0"; + +static int +new_device(int vid) +{ + vhost_vid = vid; + + /* Disable notifications. */ + rte_vhost_enable_guest_notification(vid, VIRTIO_RXQ, 0); + rte_vhost_enable_guest_notification(vid, VIRTIO_TXQ, 0); + + return 0; +} + +static void +destroy_device(int vid) +{ + RTE_SET_USED(vid); +} + +static const struct vhost_device_ops device_ops = +{ + .new_device = new_device, + .destroy_device = destroy_device, +}; + +static void +vhost_init(void) +{ + unlink(sockname); + + if (rte_vhost_driver_register(sockname, 0) != 0) + rte_exit(EXIT_FAILURE, "failed to register vhost driver \n"); + + if (rte_vhost_driver_callback_register(sockname, &device_ops) != 0) + rte_exit(EXIT_FAILURE, "failed to register vhost driver callbacks.\n"); + + if (rte_vhost_driver_start(sockname) < 0) + rte_exit(EXIT_FAILURE, "failed to start vhost driver.\n"); + + rte_log_set_level(RTE_LOGTYPE_USER1, RTE_LOG_NOTICE); +} + +static uint64_t +now_usecs(void) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + return ((uint64_t) tv.tv_sec * 1000000) + tv.tv_usec; +} + +static void +pd_free(void *userdata) +{ + RTE_SET_USED(userdata); +} + +static int +pd_socket(void *userdata, int domain, int type, int protocol) +{ + RTE_SET_USED(userdata); + return PRE(socket)(domain, type, protocol); +} + +static int +pd_bind(void *userdata, int sockfd, const struct sockaddr *addr, + socklen_t addrlen) +{ + RTE_SET_USED(userdata); + return PRE(bind)(sockfd, addr, addrlen); +} + +static int +pd_listen(void *userdata, int sockfd, int backlog) +{ + RTE_SET_USED(userdata); + return PRE(listen)(sockfd, backlog); +} + +static int +pd_accept(void *userdata, int sockfd, struct sockaddr *addr, + socklen_t *addrlen) +{ + RTE_SET_USED(userdata); + return PRE(accept)(sockfd, addr, addrlen); +} + +static int +pd_connect(void *userdata, int sockfd, const struct sockaddr *addr, + socklen_t addrlen) +{ + RTE_SET_USED(userdata); + return PRE(connect)(sockfd, addr, addrlen); +} + +static ssize_t +pd_read(void *userdata, int fd, void *buf, size_t count) +{ + RTE_SET_USED(userdata); + return PRE(read)(fd, buf, count); +} + +static ssize_t +pd_readv(void *userdata, int fd, const struct iovec *iov, int iovcnt) +{ + RTE_SET_USED(userdata); + return PRE(readv)(fd, iov, iovcnt); +} + +static ssize_t +pd_recv(void *userdata, int sockfd, void *buf, size_t len, int flags) +{ + RTE_SET_USED(userdata); + return PRE(recv)(sockfd, buf, len, flags); +} + +static ssize_t +pd_recvfrom(void *userdata, int sockfd, void *buf, size_t len, + int flags, struct sockaddr *src_addr, socklen_t *addrlen) +{ + RTE_SET_USED(userdata); + return PRE(recvfrom)(sockfd, buf, len, flags, src_addr, addrlen); +} + +static ssize_t +pd_recvmsg(void *userdata, int sockfd, struct msghdr *msg, int flags) +{ + RTE_SET_USED(userdata); + return PRE(recvmsg)(sockfd, msg, flags); +} + +static ssize_t +pd_write(void *userdata, int fd, const void *buf, size_t count) +{ + RTE_SET_USED(userdata); + return PRE(write)(fd, buf, count); +} + +static ssize_t +pd_writev(void *userdata, int fd, const struct iovec *iov, int iovcnt) +{ + RTE_SET_USED(userdata); + return PRE(writev)(fd, iov, iovcnt); +} + +static ssize_t +pd_send(void *userdata, int sockfd, const void *buf, size_t len, int flags) +{ + RTE_SET_USED(userdata); + return PRE(send)(sockfd, buf, len, flags); +} + +static ssize_t +pd_sendto(void *userdata, int sockfd, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + RTE_SET_USED(userdata); + return PRE(sendto)(sockfd, buf, len, flags, dest_addr, addrlen); +} + +static ssize_t +pd_sendmsg(void *userdata, int sockfd, const struct msghdr *msg, int flags) +{ + RTE_SET_USED(userdata); + return PRE(sendmsg)(sockfd, msg, flags); +} + +static int +pd_fcntl(void *userdata, int fd, int cmd, ...) +{ + void *arg; + va_list ap; + + va_start(ap, cmd); + arg = va_arg(ap, void *); + va_end(ap); + + RTE_SET_USED(userdata); + return PRE(fcntl)(fd, cmd, arg); +} + +static int +pd_ioctl(void *userdata, int fd, unsigned long request, ...) +{ + void *arg; + va_list ap; + + va_start(ap, request); + arg = va_arg(ap, void *); + va_end(ap); + + RTE_SET_USED(userdata); + return PRE(ioctl)(fd, request, arg); +} + +static int +pd_close(void *userdata, int fd) +{ + RTE_SET_USED(userdata); + return PRE(close)(fd); +} + +static int +pd_shutdown(void *userdata, int sockfd, int how) +{ + RTE_SET_USED(userdata); + return PRE(shutdown)(sockfd, how); +} + +static int +pd_getsockopt(void *userdata, int sockfd, int level, int optname, + void *optval, socklen_t *optlen) +{ + RTE_SET_USED(userdata); + return PRE(getsockopt)(sockfd, level, optname, optval, optlen); +} + +static int +pd_setsockopt(void *userdata, int sockfd, int level, int optname, + const void *optval, socklen_t optlen) +{ + RTE_SET_USED(userdata); + return PRE(setsockopt)(sockfd, level, optname, optval, optlen); +} + +static int +pd_poll(void *userdata, struct pollfd *fds, nfds_t nfds, int timeout) +{ + RTE_SET_USED(userdata); + return PRE(poll)(fds, nfds, timeout); +} + +static struct rte_mbuf * +from_buf_to_mbuf(const void *buf, size_t count) +{ + struct rte_mempool *mp = get_mempool_by_socket(0); + uint16_t nb_mbufs = (count + RTE_MBUF_DEFAULT_DATAROOM - 1) / + RTE_MBUF_DEFAULT_DATAROOM; + struct rte_mbuf *mbufs[nb_mbufs + 1]; + uint16_t i, copy_len; + size_t done = 0; + char *dst; + + if (unlikely(rte_pktmbuf_alloc_bulk(mp, mbufs, nb_mbufs) < 0)) + rte_exit(EXIT_FAILURE, "allocate mbuf fails\n"); + + for (i = 0; i < nb_mbufs; ++i) { + copy_len = RTE_MIN((size_t)RTE_MBUF_DEFAULT_DATAROOM, + count - done); + dst = rte_pktmbuf_mtod(mbufs[i], char *); + rte_memcpy(dst, (const char *)buf + done, copy_len); + done += copy_len; + mbufs[i]->data_len = copy_len; + if (i > 0) + mbufs[i-1]->next = mbufs[i]; + } + + mbufs[0]->pkt_len = count; + mbufs[0]->nb_segs = nb_mbufs; + + return mbufs[0]; +} + +/* Send @count bytes of data starting from @buf to the TCP stack. + * Return 0 on success or -1 on error. + */ +static int +pd_netdev_send(void *userdata, const void *buf, size_t count) +{ + struct ether_hdr *hdr; + struct rte_mbuf *m; + + RTE_SET_USED(userdata); + + m = from_buf_to_mbuf(buf, count); + + // add l2 header + hdr = (struct ether_hdr *)rte_pktmbuf_prepend(m, sizeof(struct ether_hdr)); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + + if (rte_vhost_enqueue_burst(vhost_vid, VIRTIO_RXQ, &m, 1) == 1) + return 0; + + return -1; +} + +static inline struct rte_mbuf * +from_mbuf_to_buf(struct rte_mbuf *m, char *buf, size_t len, int ispeek, int needcpy) +{ + void *src; + uint32_t done = 0; + uint32_t left = len, orig_pkt_len; + uint16_t copy_len, seg_len; + struct rte_mbuf *m_next, *orig_pkt; + + if (len == 0) + return m; + + orig_pkt = m; + orig_pkt_len = m->pkt_len; + + do { + seg_len = rte_pktmbuf_data_len(m); + copy_len = RTE_MIN(seg_len, left); + src = rte_pktmbuf_mtod(m, void *); + if (needcpy) + rte_memcpy(buf + done, src, copy_len); + done += copy_len; + left -= copy_len; + if (copy_len < seg_len) { + if (!ispeek) { + rte_pktmbuf_adj(m, copy_len); + } + break; + } + m_next = m->next; + if (!ispeek) { + rte_pktmbuf_free_seg(m); + } + m = m_next; + } while (left && m); + + if (m && !ispeek) + m->pkt_len = orig_pkt_len - done; + + if(ispeek) + return orig_pkt; + else + return m; +} + +/* Sniff the next packet leaving the TCP stack. + * Put packet data in @buf. @count is passed in as the buffer size. + * The actual number of bytes received should be put in @count. + * Set @count to 0 if received nothing. + * Set @time_usecs to the receive timestamp. + * Return 0 on success or -1 on error. */ +static int +pd_netdev_recv(void *userdata, void *buf, size_t *count, long long *time_usecs) +{ + struct rte_mbuf *m; + struct rte_mempool *mp = get_mempool_by_socket(0); + + RTE_SET_USED(userdata); + + while (rte_vhost_dequeue_burst(vhost_vid, VIRTIO_TXQ, mp, &m, 1) == 0); + + // remove l2 header + rte_pktmbuf_adj(m, sizeof(struct ether_hdr)); + + *count = m->pkt_len; + from_mbuf_to_buf(m, buf, *count, 0, 1); + + *time_usecs = now_usecs(); + return 0; +} + +static int +pd_usleep(void *userdata, useconds_t usec) +{ + RTE_SET_USED(userdata); + return usleep(usec); +} + +static int +pd_gettimeofday(void *userdata, struct timeval *tv, struct timezone *tz) +{ + RTE_SET_USED(userdata); + return gettimeofday(tv, tz); +} + +static int +pd_epoll_create(void *userdata, int size) +{ + RTE_SET_USED(userdata); + return PRE(epoll_create)(size); +} + +static int +pd_epoll_ctl(void *userdata, int epfd, int op, int fd, + struct epoll_event *event) +{ + RTE_SET_USED(userdata); + return PRE(epoll_ctl)(epfd, op, fd, event); +} + +static int +pd_epoll_wait(void *userdata, int epfd, struct epoll_event *events, + int maxevents, int timeout) +{ + RTE_SET_USED(userdata); + return PRE(epoll_wait)(epfd, events, maxevents, timeout); +} + +static int +pd_pipe(void *userdata, int pipefd[2]) +{ + RTE_SET_USED(userdata); + return pipe(pipefd); +} + +static int +pd_splice(void *userdata, int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) +{ + RTE_SET_USED(userdata); + return PRE(splice)(fd_in, off_in, fd_out, off_out, len, flags); +} + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +static void * +io(void *arg) +{ + int epfd; + struct in_addr ipv4; + struct ether_addr mac = { .addr_bytes = { 0xee, 0xff, 0xff, 0xff, 0xff, 0xff}, }; + struct epoll_event events[128]; + + RTE_SET_USED(arg); + + setenv(DPDK_IP, "192.168.0.2", 1); + setenv(DPDK_IP_MASK, "16", 1); + setenv(DPDK_IP_GATEWAY, "192.168.0.1", 1); + + setenv(DPDK_IPV6, "fd3d:fa7b:d17d::0", 1); + setenv(DPDK_IPV6_MASK, "48", 1); + setenv(DPDK_IPV6_GATEWAY, "fd3d:fa7b:d17d:8888::0", 1); + + epfd = PRE(epoll_create)(0); + + inet_pton(AF_INET, "192.168.0.1", &ipv4); + + ipv4_dst_add(default_ctx, &ipv4, &mac); + + pthread_mutex_unlock(&lock); + + while (1) { + PRE(epoll_wait)(epfd, events, 128, 0); + } + + return NULL; +} + +void +packetdrill_interface_init(const char *flags, + struct packetdrill_interface *ifc) +{ + int argc = 0; + char *argv[16]; + pthread_t tid; + + RTE_SET_USED(flags); + + argv[argc++] = strdup("test"); + argv[argc++] = strdup("-l"); + argv[argc++] = strdup("0"); + argv[argc++] = strdup("--no-pci"); + argv[argc++] = strdup("--in-memory"); + argv[argc++] = strdup("--single-file-segments"); + argv[argc++] = strdup("--"); + + if (rte_eal_init(argc, argv) < 0) + rte_exit(EXIT_FAILURE, "Failed to init DPDK\n"); + + fd_init(); + + vhost_init(); + + if (rte_eal_hotplug_add("vdev", "virtio_user0", "path=/tmp/sock0") < 0) + rte_exit(EXIT_FAILURE, "hot plug virtio-user failed\n"); + + pthread_mutex_lock(&lock); + + pthread_create(&tid, NULL, io, NULL); + + pthread_mutex_lock(&lock); + + ifc->free = pd_free; + ifc->socket = pd_socket; + ifc->bind = pd_bind; + ifc->listen = pd_listen; + ifc->accept = pd_accept; + ifc->connect = pd_connect; + ifc->read = pd_read; + ifc->readv = pd_readv; + ifc->recv = pd_recv; + ifc->recvfrom = pd_recvfrom; + ifc->recvmsg = pd_recvmsg; + ifc->write = pd_write; + ifc->writev = pd_writev; + ifc->send = pd_send; + ifc->sendto = pd_sendto; + ifc->sendmsg = pd_sendmsg; + ifc->fcntl = pd_fcntl; + ifc->ioctl = pd_ioctl; + ifc->close = pd_close; + ifc->shutdown = pd_shutdown; + ifc->getsockopt = pd_getsockopt; + ifc->setsockopt = pd_setsockopt; + ifc->poll = pd_poll; + ifc->netdev_send = pd_netdev_send; + ifc->netdev_receive = pd_netdev_recv; + ifc->usleep = pd_usleep; + ifc->gettimeofday = pd_gettimeofday; + ifc->epoll_create = pd_epoll_create; + ifc->epoll_ctl = pd_epoll_ctl; + ifc->epoll_wait = pd_epoll_wait; + ifc->pipe = pd_pipe; + ifc->splice = pd_splice; +} diff --git a/lib/libtle_glue/packetdrill.h b/lib/libtle_glue/packetdrill.h new file mode 100644 index 0000000..6f84a87 --- /dev/null +++ b/lib/libtle_glue/packetdrill.h @@ -0,0 +1,111 @@ +/* + * Copyright 2015 Google Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ +/* + * Author: xiaoj@google.com (Xiao Jia) + * + * Interface for packetdrill. + * + * To be tested against as a shared object (*.so) file, implement this + * interface, export a function "packetdrill_interface_init", and + * initialize the interface struct passed in with your own functions. + */ + +#ifndef __PACKETDRILL_H__ +#define __PACKETDRILL_H__ + +#include <poll.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/epoll.h> +#include <unistd.h> + +struct packetdrill_interface { + void *userdata; + void (*free)(void *userdata); + int (*socket)(void *userdata, int domain, int type, int protocol); + int (*bind)(void *userdata, int sockfd, const struct sockaddr *addr, + socklen_t addrlen); + int (*listen)(void *userdata, int sockfd, int backlog); + int (*accept)(void *userdata, int sockfd, struct sockaddr *addr, + socklen_t *addrlen); + int (*connect)(void *userdata, int sockfd, const struct sockaddr *addr, + socklen_t addrlen); + ssize_t (*read)(void *userdata, int fd, void *buf, size_t count); + ssize_t (*readv)(void *userdata, int fd, const struct iovec *iov, + int iovcnt); + ssize_t (*recv)(void *userdata, int sockfd, void *buf, size_t len, + int flags); + ssize_t (*recvfrom)(void *userdata, int sockfd, void *buf, size_t len, + int flags, struct sockaddr *src_addr, + socklen_t *addrlen); + ssize_t (*recvmsg)(void *userdata, int sockfd, struct msghdr *msg, + int flags); + ssize_t (*write)(void *userdata, int fd, const void *buf, size_t count); + ssize_t (*writev)(void *userdata, int fd, const struct iovec *iov, + int iovcnt); + ssize_t (*send)(void *userdata, int sockfd, const void *buf, size_t len, + int flags); + ssize_t (*sendto)(void *userdata, int sockfd, const void *buf, + size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen); + ssize_t (*sendmsg)(void *userdata, int sockfd, const struct msghdr *msg, + int flags); + int (*fcntl)(void *userdata, int fd, int cmd, ...); + int (*ioctl)(void *userdata, int fd, unsigned long request, ...); + int (*close)(void *userdata, int fd); + int (*shutdown)(void *userdata, int sockfd, int how); + int (*getsockopt)(void *userdata, int sockfd, int level, int optname, + void *optval, socklen_t *optlen); + int (*setsockopt)(void *userdata, int sockfd, int level, int optname, + const void *optval, socklen_t optlen); + int (*poll)(void *userdata, struct pollfd *fds, nfds_t nfds, + int timeout); + /* Send @count bytes of data starting from @buf to the TCP stack. + * Return 0 on success or -1 on error. */ + int (*netdev_send)(void *userdata, const void *buf, size_t count); + /* Sniff the next packet leaving the TCP stack. + * Put packet data in @buf. @count is passed in as the buffer size. + * The actual number of bytes received should be put in @count. + * Set @count to 0 if received nothing. + * Set @time_usecs to the receive timestamp. + * Return 0 on success or -1 on error. */ + int (*netdev_receive)(void *userdata, void *buf, size_t *count, + long long *time_usecs); + int (*usleep)(void *userdata, useconds_t usec); + int (*gettimeofday)(void *userdata, struct timeval *tv, + struct timezone *tz); + int (*epoll_create)(void *userdata, int size); + int (*epoll_ctl)(void *userdata, int epfd, int op, int fd, + struct epoll_event *event); + int (*epoll_wait)(void *userdata, int epfd, struct epoll_event *events, + int maxevents, int timeout); + int (*pipe)(void *userdata, int pipefd[2]); + int (*splice)(void *userdata, int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags); +}; + +typedef void (*packetdrill_interface_init_t)(const char *flags, + struct packetdrill_interface *); + +void +packetdrill_interface_init(const char *flags, struct packetdrill_interface *ifc); + +#endif /* __PACKETDRILL_H__ */ diff --git a/lib/libtle_glue/poll.c b/lib/libtle_glue/poll.c new file mode 100644 index 0000000..ebc0110 --- /dev/null +++ b/lib/libtle_glue/poll.c @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <signal.h> +#include <poll.h> + +#include "fd.h" +#include "ctx.h" +#include "sym.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "tle_glue.h" + +int +PRE(poll)(struct pollfd *fds, nfds_t nfds, int timeout) +{ + int efd; + int total = 0, j; + int tmp_ev; + uint32_t i; + uint32_t k_n = 0; + int k_fds[nfds]; + struct sock *so; + struct glue_ctx *ctx; + struct epoll_event k_ev; + struct epoll_event events[nfds]; + + for (i = 0; i < nfds; ++i) { + if (is_kernel_fd(fds[i].fd)) { + k_fds[k_n++] = i; + continue; + } + + so = fd2sock(fds[i].fd); + if (!so->valid) + continue; + + fds[i].revents = fd_ready(fds[i].fd, fds[i].events); + if (fds[i].revents) { + total++; + continue; + } + + /* We fill sock->event here as we need this when + * we filter events in poll_common(). But it was + * originally set by epoll_ctl(). Now we have to + * assume that there are no application which + * uses epoll and poll at the same time. + */ + so->event.events = fds[i].events; + so->event.data.u32 = i; /* store idx */ + } + + if (k_n == nfds) + return k_poll(fds, nfds, timeout); + + if (total > 0) + return total; + + /* thread <> context binding happens here */ + if (RTE_PER_LCORE(glue_ctx) == NULL) { + ctx = &ctx_array[glue_ctx_alloc()]; + RTE_PER_LCORE(glue_ctx) = ctx; + } else + ctx = RTE_PER_LCORE(glue_ctx); + + total = poll_common(ctx, events, nfds, 0, -1); + + /* We assume kernel I/O events are not as important as user ones */ + if (total > 0) + goto format; + + efd = k_epoll_create(1); + if (efd < 0) + rte_panic("k_epoll_create failed %d", errno); + + for (i = 0; i < k_n; ++i) { + k_ev.events = fds[k_fds[i]].events; + k_ev.data.u32 = k_fds[i]; /* store idx */ + k_epoll_ctl(efd, EPOLL_CTL_ADD, fds[k_fds[i]].fd, &k_ev); + } + + total = poll_common(ctx, events, nfds, timeout, efd); + k_close(efd); +format: + for (j = 0; j < total; ++j) { + tmp_ev = events[j].events; + if (tmp_ev == POLLHUP) { + tmp_ev |= POLLERR | (fds[events[j].data.u32].events & + (POLLIN | POLLOUT)); + } + fds[events[j].data.u32].revents = tmp_ev; + } + + return total; +} + +int +PRE(ppoll)(struct pollfd *fds, nfds_t nfds, + const struct timespec *tmo_p, const sigset_t *sigmask) +{ + int timeout; + + if (sigmask != NULL) + rte_panic("ppoll with signal is not supported"); + + if (tmo_p == NULL) + timeout = -1; + else + timeout = tmo_p->tv_sec * 1000 + tmo_p->tv_nsec / 1000000; + + return poll(fds, nfds, timeout); +} + +extern int __poll_chk(struct pollfd *fds, nfds_t nfds, int timeout, + __SIZE_TYPE__ fdslen); +int +__poll_chk(struct pollfd *fds, nfds_t nfds, int timeout, + __SIZE_TYPE__ fdslen __rte_unused) +{ + return poll(fds, nfds, timeout); +} diff --git a/lib/libtle_glue/port.c b/lib/libtle_glue/port.c new file mode 100644 index 0000000..7a4cf2e --- /dev/null +++ b/lib/libtle_glue/port.c @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/eventfd.h> +#include <unistd.h> + +#include <rte_ethdev.h> +#include <rte_eth_ring.h> + +#include "log.h" +#include "ctx.h" +#include "config.h" +#include "internal.h" + +int stopped; + +static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES]; + +struct rte_mempool * +get_mempool_by_socket(int32_t socket_id) +{ + struct rte_mempool *mp; + char name[RTE_MEMPOOL_NAMESIZE]; + + if (socket_id == SOCKET_ID_ANY) + socket_id = 0; + + if (mpool[socket_id]) + return mpool[socket_id]; + + snprintf(name, sizeof(name), "MP%u", socket_id); + mp = rte_pktmbuf_dynamic_pool_create(name, MAX_MBUFS - 1, + MBUF_PERCORE_CACHE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, + socket_id, MBUF_DYNAMIC_SIZE); + + if (mp == NULL) + rte_panic("Failed to create mbuf mempool"); + + mpool[socket_id] = mp; + return mp; +} + +static void +update_rss_conf(uint16_t port_id) +{ + struct rte_eth_rss_conf rss_conf = { + .rss_key = NULL, + .rss_key_len = 0, + .rss_hf = ETH_RSS_IP | ETH_RSS_TCP | ETH_RSS_UDP, + }; + + if (rte_eth_dev_rss_hash_update(port_id, &rss_conf) < 0) + rte_panic("Failed to update rss hash"); +} + +static void +queue_init(uint16_t port_id, uint16_t nb_queues, + struct rte_eth_dev_info *dev_info, + struct rte_eth_conf *port_conf) +{ + uint16_t q; + int32_t socket_id, rc; + uint16_t nb_rxd = 1024, nb_txd = 1024; + struct rte_mempool *mp; + struct rte_eth_txconf txq_conf = dev_info->default_txconf; + struct rte_eth_rxconf rxq_conf = dev_info->default_rxconf; + + socket_id = rte_eth_dev_socket_id(port_id); + mp = get_mempool_by_socket(socket_id); + + dev_info->default_rxconf.rx_drop_en = 1; + + rc = rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd); + if (rc < 0) + rte_panic("Cannot adjust number of desc"); + + rxq_conf.offloads = port_conf->rxmode.offloads; + txq_conf.offloads = port_conf->txmode.offloads; + + /* faster free of tx entries */ + txq_conf.tx_free_thresh = nb_txd - 64; + + for (q = 0; q < nb_queues; q++) { + rc = rte_eth_rx_queue_setup(port_id, q, nb_rxd, + socket_id, &rxq_conf, mp); + if (rc < 0) + rte_panic("rx queue=%u setup failed: %d", q, rc); + + rc = setup_rx_cb(port_id, q); + if (rc < 0) + rte_panic("rx queue=%u rx setup failed: %d", q, rc); + } + + for (q = 0; q < nb_queues; q++) { + rc = rte_eth_tx_queue_setup(port_id, q, nb_txd, + socket_id, &txq_conf); + if (rc < 0) + rte_panic("tx queue=%u setup failed: %d", q, rc); + } +} + +uint64_t rx_offload = + DEV_RX_OFFLOAD_IPV4_CKSUM | + DEV_RX_OFFLOAD_UDP_CKSUM | + DEV_RX_OFFLOAD_TCP_CKSUM; +/* nice to have: + DEV_RX_OFFLOAD_CRC_STRIP | + DEV_RX_OFFLOAD_TCP_LRO | + DEV_RX_OFFLOAD_HEADER_SPLIT | + DEV_RX_OFFLOAD_SCATTER | + DEV_RX_OFFLOAD_TIMESTAMP +*/ + +uint64_t tx_offload = + DEV_TX_OFFLOAD_UDP_CKSUM | + DEV_TX_OFFLOAD_TCP_CKSUM | + DEV_TX_OFFLOAD_TCP_TSO | + DEV_TX_OFFLOAD_MULTI_SEGS; + +int +dev_rxq_wakeup(uint16_t port_id) +{ + int fd; + uint16_t qid; + uint32_t vec, efd_idx; + struct rte_eth_dev *dev; + struct rte_intr_handle *intr_handle; + + RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV); + + dev = &rte_eth_devices[port_id]; + intr_handle = dev->intr_handle; + if (!intr_handle) + return -ENOTSUP; + if (!intr_handle->intr_vec) + return -EPERM; + + for (qid = 0; qid < dev->data->nb_rx_queues; qid++) { + vec = intr_handle->intr_vec[qid]; + efd_idx = (vec >= RTE_INTR_VEC_RXTX_OFFSET) ? + (vec - RTE_INTR_VEC_RXTX_OFFSET) : vec; + fd = intr_handle->efds[efd_idx]; + if (eventfd_write(fd, (eventfd_t) 1) < 0) + return -errno; + } + + return 0; +} + +void +port_reconfig(void) +{ + int32_t rc; + struct rte_eth_dev_info dev_info; + uint16_t port_id = 0; /* We use and only use port 0 */ + uint16_t nb_port; + uint16_t nb_queues = nb_ctx; + + struct rte_eth_conf port_conf = { + .intr_conf = { + .rxq = 1, + }, + }; + + /* 0. dev number check */ + nb_port = rte_eth_dev_count_avail(); + if (nb_port < 1 || nb_port >2) + rte_panic("One port is mandatory with an optional loopback device\n"); + + stopped = 1; + rte_wmb(); + /* wake up all rxqs */ + if (nb_ctx > 1) + dev_rxq_wakeup(port_id); + + usleep(1); /* fix me: this cannot gurantee correctness */ + + rte_eth_dev_stop(port_id); + + /* 1. offloading check and set*/ + rte_eth_dev_info_get(port_id, &dev_info); + rx_offload &= dev_info.rx_offload_capa; + port_conf.rxmode.offloads = rx_offload; + tx_offload &= dev_info.tx_offload_capa; + port_conf.txmode.offloads = tx_offload; + + GLUE_LOG(INFO, "configure queues = %d, offloads: rx = %"PRIx64", tx = %"PRIx64, + nb_queues, rx_offload, tx_offload); + + /* 2. dev configure */ + rc = rte_eth_dev_configure(port_id, nb_queues, nb_queues, &port_conf); + if (rc != 0) + rte_panic("Failed to configure device, %d", rc); + + /* 3. queue setup */ + queue_init(port_id, nb_queues, &dev_info, &port_conf); + + /* 4. rss conf */ + if (nb_queues > 1) + update_rss_conf(port_id); + + /* 5. dev start */ + if (rte_eth_dev_start(port_id) < 0) + rte_panic("Failed to start device"); + + stopped = 0; +} + +uint16_t +create_loopback(uint32_t socket_id) +{ + int ret; + struct rte_ring* lb_queue; + static uint16_t lb_port_id = 0xFFFF; + const char *ring_name = "loopback-ring"; + + if (lb_port_id != 0xFFFF) + return lb_port_id; + + lb_queue = rte_ring_create(ring_name, MAX_PKTS_BURST * 8, socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (!lb_queue) + rte_panic("Failed to create ring for loopback\n"); + ret = rte_eth_from_ring(lb_queue); + if (ret < 0) + rte_panic("Failed to create ethdev from ring\n"); + lb_port_id = ret; + + if (setup_rx_cb(lb_port_id, 0) < 0) + rte_panic("Failed to set up rx cb for loopback\n"); + + return lb_port_id; +} diff --git a/lib/libtle_glue/rxcb.c b/lib/libtle_glue/rxcb.c new file mode 100644 index 0000000..51f31c9 --- /dev/null +++ b/lib/libtle_glue/rxcb.c @@ -0,0 +1,834 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <rte_ethdev.h> +#include <rte_arp.h> +#include <rte_ip.h> +#include <rte_tcp.h> +#include <rte_udp.h> + +#include <netinet/in.h> +#include <netinet/ip6.h> + +#include "log.h" +#include "ctx.h" +#include "internal.h" + +struct ptype2cb { + uint32_t mask; + const char *name; + rte_rx_callback_fn fn; +}; + +enum { + ETHER_ARP_PTYPE = 0x1, + IPV4_PTYPE = 0x2, + IPV4_EXT_PTYPE = 0x4, + IPV6_PTYPE = 0x8, + IPV6_EXT_PTYPE = 0x10, + TCP_PTYPE = 0x20, + UDP_PTYPE = 0x40, + ICMP_PTYPE = 0x80, +}; + +static inline uint64_t +_mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, + uint64_t ol3, uint64_t ol2) +{ + return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; +} + +static inline int32_t +fill_pkt_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t l3, uint32_t l4) +{ + if (l2 + l3 + l4 > m->pkt_len) + return -1; + m->tx_offload = _mbuf_tx_offload(l2, l3, l4, 0, 0, 0); + return 0; +} + +static inline int +is_ipv4_frag(const struct ipv4_hdr *iph) +{ + const uint16_t mask = rte_cpu_to_be_16(~IPV4_HDR_DF_FLAG); + + return ((mask & iph->fragment_offset) != 0); +} + +static inline uint32_t +get_tcp_header_size(struct rte_mbuf *m, uint32_t l2_len, uint32_t l3_len) +{ + const struct tcp_hdr *tcp; + + tcp = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, l2_len + l3_len); + return (tcp->data_off >> 4) * 4; +} + +static inline int32_t +adjust_ipv4_pktlen(struct rte_mbuf *m, uint32_t l2_len) +{ + uint32_t plen, trim; + const struct ipv4_hdr *iph; + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, l2_len); + plen = rte_be_to_cpu_16(iph->total_length) + l2_len; + if (plen < m->pkt_len) { + trim = m->pkt_len - plen; + rte_pktmbuf_trim(m, trim); + } else if (plen > m->pkt_len) + return -1; + + return 0; +} + +static inline int32_t +adjust_ipv6_pktlen(struct rte_mbuf *m, uint32_t l2_len) +{ + uint32_t plen, trim; + const struct ipv6_hdr *iph; + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, l2_len); + plen = rte_be_to_cpu_16(iph->payload_len) + sizeof(*iph) + l2_len; + if (plen < m->pkt_len) { + trim = m->pkt_len - plen; + rte_pktmbuf_trim(m, trim); + } else if (plen > m->pkt_len) + return -1; + + return 0; +} + +static inline uint32_t +get_ipv4_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t proto, uint32_t frag) +{ + const struct ipv4_hdr *iph; + int32_t dlen, len; + + dlen = rte_pktmbuf_data_len(m); + dlen -= l2; + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, l2); + len = (iph->version_ihl & IPV4_HDR_IHL_MASK) * IPV4_IHL_MULTIPLIER; + + if (frag != 0 && is_ipv4_frag(iph)) { + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_FRAG; + } + + if (len > dlen || (proto <= IPPROTO_MAX && iph->next_proto_id != proto)) + m->packet_type = RTE_PTYPE_UNKNOWN; + + return len; +} + +static inline uint32_t +get_ipv6x_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t *fproto) +{ + const struct ipv6_hdr *ip6h; + const struct ip6_ext *ipx; + uint32_t nproto; + int32_t dlen, len, ofs; + + ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*, l2); + nproto = ip6h->proto; + len = sizeof(struct ipv6_hdr); + + dlen = rte_pktmbuf_data_len(m); + dlen -= l2; + + ofs = l2 + len; + ipx = rte_pktmbuf_mtod_offset(m, const struct ip6_ext *, ofs); + + while (ofs > 0 && len < dlen) { + switch (nproto) { + case IPPROTO_HOPOPTS: + case IPPROTO_ROUTING: + case IPPROTO_DSTOPTS: + ofs = (ipx->ip6e_len + 1) << 3; + break; + case IPPROTO_AH: + ofs = (ipx->ip6e_len + 2) << 2; + break; + case IPPROTO_FRAGMENT: + /* + * tso_segsz is not used by RX, so use it as temporary + * buffer to store the fragment offset. + */ + m->tso_segsz = l2 + len; + ofs = sizeof(struct ip6_frag); + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_FRAG; + break; + case IPPROTO_TCP: + case IPPROTO_UDP: + case IPPROTO_ICMPV6: + ofs = 0; + if (*fproto == 0) + *fproto = nproto; + break; + default: + ofs = 0; + } + + if (ofs > 0) { + nproto = ipx->ip6e_nxt; + len += ofs; + ipx += ofs / sizeof(*ipx); + } + } + + /* unrecognized or invalid packet. */ + if (*fproto == 0 || len > dlen) + m->packet_type = RTE_PTYPE_UNKNOWN; + + return len; +} + +static inline uint32_t +get_ipv6_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t fproto) +{ + const struct ipv6_hdr *iph; + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, + sizeof(struct ether_hdr)); + + if (iph->proto == fproto) + return sizeof(struct ipv6_hdr); + else + return get_ipv6x_hdr_len(m, l2, &fproto); +} + +static inline struct rte_mbuf* +process_ipv4_frag(struct rte_mbuf *m, struct glue_ctx *ctx, + uint32_t l2_len, uint32_t l3_len) +{ + struct ipv4_hdr* iph; + + m->l2_len = l2_len; + m->l3_len = l3_len; + /* fixme: ip checksum should be checked here. + * After reassemble, the ip checksum would be invalid. + */ + m = rte_ipv4_frag_reassemble_packet(ctx->frag_tbl, + &ctx->frag_dr, m, rte_rdtsc(), + rte_pktmbuf_mtod_offset(m, struct ipv4_hdr*, m->l2_len)); + rte_ip_frag_free_death_row(&ctx->frag_dr, 3); + if (m == NULL) + return NULL; + iph = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr*, m->l2_len); + switch (iph->next_proto_id) { + case IPPROTO_TCP: + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_TCP; + break; + case IPPROTO_UDP: + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_UDP; + break; + } + return m; +} + +static inline struct rte_mbuf* +process_ipv6_frag(struct rte_mbuf *m, struct glue_ctx *ctx, + uint32_t l2_len, uint32_t l3_len) +{ + struct ipv6_hdr* ip6h; + + m->l2_len = l2_len; + m->l3_len = l3_len; + m = rte_ipv6_frag_reassemble_packet(ctx->frag_tbl, + &ctx->frag_dr, m, rte_rdtsc(), + rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*, l2_len), + rte_pktmbuf_mtod_offset(m, struct ipv6_extension_fragment*, + m->tso_segsz)); + rte_ip_frag_free_death_row(&ctx->frag_dr, 3); + if (m == NULL) + return NULL; + ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr*, m->l2_len); + switch (ip6h->proto) { + case IPPROTO_TCP: + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_TCP; + break; + case IPPROTO_UDP: + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_UDP; + break; + } + return m; +} + +static inline struct rte_mbuf * +fill_ptypes_and_hdr_len(struct glue_ctx *ctx, struct rte_mbuf *m) +{ + uint32_t dlen, l2_len, l3_len, l4_len, proto; + const struct ether_hdr *eth; + uint32_t ptypes; + uint16_t etp; + int32_t error = 0; + + dlen = rte_pktmbuf_data_len(m); + + /* L2 */ + l2_len = sizeof(*eth); + + eth = rte_pktmbuf_mtod(m, const struct ether_hdr *); + etp = eth->ether_type; + while (etp == rte_be_to_cpu_16(ETHER_TYPE_VLAN)) { + etp = rte_pktmbuf_mtod_offset(m, struct vlan_hdr*, l2_len)->eth_proto; + l2_len += sizeof(struct vlan_hdr); + } + + if (etp == rte_be_to_cpu_16(ETHER_TYPE_ARP)) + return arp_recv(ctx, m, l2_len); + + if (etp == rte_be_to_cpu_16(ETHER_TYPE_IPv4)) { + const struct ipv4_hdr *hdr; + + /* L3 */ + hdr = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, l2_len); + error = adjust_ipv4_pktlen(m, l2_len); + if (error) { + rte_pktmbuf_free(m); + return NULL; + } + l3_len = get_ipv4_hdr_len(m, l2_len, IPPROTO_MAX + 1, 1); + + if ((m->packet_type & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_FRAG) { + m = process_ipv4_frag(m, ctx, l2_len, l3_len); + if (m == NULL) + return NULL; + hdr = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr*, + m->l2_len); + l3_len = get_ipv4_hdr_len(m, m->l2_len, + IPPROTO_MAX + 1, 0); + } + + /* L4 */ + switch (hdr->next_proto_id) { + case IPPROTO_ICMP: + return icmp_recv(ctx, m, l2_len, l3_len); + case IPPROTO_TCP: + ptypes = RTE_PTYPE_L4_TCP | + RTE_PTYPE_L3_IPV4_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + l4_len = get_tcp_header_size(m, l2_len, l3_len); + break; + case IPPROTO_UDP: + ptypes = RTE_PTYPE_L4_UDP | + RTE_PTYPE_L3_IPV4_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + l4_len = sizeof(struct udp_hdr); + break; + default: + GLUE_LOG(ERR, "drop ipv4 pkt of unknow L4: (%d)", + hdr->next_proto_id); + rte_pktmbuf_free(m); + return NULL; + } + + } else if (etp == rte_be_to_cpu_16(ETHER_TYPE_IPv6) && + dlen >= l2_len + sizeof(struct ipv6_hdr) + sizeof(struct udp_hdr)) { + /* L3 */ + error = adjust_ipv6_pktlen(m, l2_len); + if (error) { + rte_pktmbuf_free(m); + return NULL; + } + proto = 0; + l3_len = get_ipv6x_hdr_len(m, l2_len, &proto); + + if ((m->packet_type & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_FRAG) { + m = process_ipv6_frag(m, ctx, l2_len, l3_len); + if (m == NULL) + return NULL; + l3_len = get_ipv6x_hdr_len(m, m->l2_len, &proto); + } + + /* L4 */ + switch (proto) { + case IPPROTO_TCP: + ptypes = RTE_PTYPE_L4_TCP | + RTE_PTYPE_L3_IPV6_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + l4_len = get_tcp_header_size(m, l2_len, l3_len); + break; + case IPPROTO_UDP: + ptypes = RTE_PTYPE_L4_UDP | + RTE_PTYPE_L3_IPV6_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + l4_len = sizeof(struct udp_hdr); + break; + case IPPROTO_ICMPV6: + return icmp6_recv(ctx, m, l2_len, l3_len); + default: + GLUE_DEBUG("drop ipv6 pkt of unknown L4: (%x)", proto); + rte_pktmbuf_free(m); + return NULL; + } + } else { + GLUE_DEBUG("Drop unknown L3 packet: %x", etp); + rte_pktmbuf_free(m); + return NULL; + } + + m->packet_type = ptypes; + error = fill_pkt_hdr_len(m, l2_len, l3_len, l4_len); + if (error) { + rte_pktmbuf_free(m); + return NULL; + } + + return m; +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + j = i + 1; + } + } + + return nb_pkt; +} + +static inline struct rte_mbuf * +common_fill_hdr_len(struct rte_mbuf *m, uint32_t tp, struct glue_ctx *ctx) +{ + uint32_t l4_len, l3_len, l2_len = sizeof(struct ether_hdr); + int32_t error = 0; + + switch (tp) { + /* possibly fragmented packets. */ + case (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV4_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv4_hdr_len(m, l2_len, IPPROTO_MAX + 1, 1); + if ((m->packet_type & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_FRAG) { + m = process_ipv4_frag(m, ctx, l2_len, l3_len); + if (m == NULL) + return NULL; + tp = m->packet_type & (RTE_PTYPE_L2_MASK | + RTE_PTYPE_L3_MASK | + RTE_PTYPE_L4_MASK); + } + break; + case (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV6_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv6_hdr_len(m, l2_len, IPPROTO_MAX + 1); + if ((m->packet_type & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_FRAG) { + m = process_ipv6_frag(m, ctx, l2_len, l3_len); + if (m == NULL) + return NULL; + tp = m->packet_type & (RTE_PTYPE_L2_MASK | + RTE_PTYPE_L3_MASK | + RTE_PTYPE_L4_MASK); + } + break; + } + + switch (tp) { + /* non fragmented tcp packets. */ + case (RTE_PTYPE_L4_TCP | RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L2_ETHER): + l3_len = sizeof(struct ipv4_hdr); + l4_len = get_tcp_header_size(m, l2_len, l3_len); + error = adjust_ipv4_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_TCP | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L2_ETHER): + l3_len = sizeof(struct ipv6_hdr); + l4_len = get_tcp_header_size(m, l2_len, l3_len); + error = adjust_ipv6_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_TCP | RTE_PTYPE_L3_IPV4_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv4_hdr_len(m, l2_len, + IPPROTO_TCP, 0); + l4_len = get_tcp_header_size(m, l2_len, l3_len); + error = adjust_ipv4_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_TCP | RTE_PTYPE_L3_IPV6_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv6_hdr_len(m, l2_len, IPPROTO_TCP); + l4_len = get_tcp_header_size(m, l2_len, l3_len); + error = adjust_ipv6_pktlen(m, l2_len); + break; + + /* non fragmented udp packets. */ + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L2_ETHER): + l3_len = sizeof(struct ipv4_hdr); + l4_len = sizeof(struct udp_hdr); + error = adjust_ipv4_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L2_ETHER): + l3_len = sizeof(struct ipv6_hdr); + l4_len = sizeof(struct udp_hdr); + error = adjust_ipv6_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV4_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv4_hdr_len(m, l2_len, + IPPROTO_UDP, 0); + l4_len = sizeof(struct udp_hdr); + error = adjust_ipv4_pktlen(m, l2_len); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV6_EXT | RTE_PTYPE_L2_ETHER): + l3_len = get_ipv6_hdr_len(m, l2_len, IPPROTO_UDP); + l4_len = sizeof(struct udp_hdr); + error = adjust_ipv6_pktlen(m, l2_len); + break; + default: + GLUE_LOG(ERR, "drop unknown pkt"); + rte_pktmbuf_free(m); + return NULL; + } + + if (error) { + rte_pktmbuf_free(m); + return NULL; + } + error = fill_pkt_hdr_len(m, l2_len, l3_len, l4_len); + if (error) { + rte_pktmbuf_free(m); + return NULL; + } + return m; +} + + +/* + * HW can recognize L2-arp/L3 with/without extensions/L4 (i40e) + */ +static uint16_t +type0_rx_callback(uint16_t port, + uint16_t queue, + struct rte_mbuf *pkt[], + uint16_t nb_pkts, + uint16_t max_pkts, + void *user_param) +{ + uint32_t j, tp, l2_len, l3_len; + struct glue_ctx *ctx; + uint16_t nb_zero = 0; + + RTE_SET_USED(port); + RTE_SET_USED(queue); + RTE_SET_USED(max_pkts); + + ctx = user_param; + + for (j = 0; j != nb_pkts; j++) { + tp = pkt[j]->packet_type & (RTE_PTYPE_L4_MASK | + RTE_PTYPE_L3_MASK | RTE_PTYPE_L2_MASK); + + switch (tp) { + case (RTE_PTYPE_L2_ETHER_ARP): + arp_recv(ctx, pkt[j], sizeof(struct ether_hdr)); + pkt[j] = NULL; + nb_zero++; + break; + case (RTE_PTYPE_L4_ICMP | RTE_PTYPE_L3_IPV4 | + RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L4_ICMP | RTE_PTYPE_L3_IPV4_EXT | + RTE_PTYPE_L2_ETHER): + l2_len = sizeof(struct ether_hdr); + l3_len = get_ipv4_hdr_len(pkt[j], l2_len, IPPROTO_ICMP, 0); + icmp_recv(ctx, pkt[j], l2_len, l3_len); + pkt[j] = NULL; + nb_zero++; + break; + case (RTE_PTYPE_L4_ICMP | RTE_PTYPE_L3_IPV6 | + RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L4_ICMP | RTE_PTYPE_L3_IPV6_EXT | + RTE_PTYPE_L2_ETHER): + l2_len = sizeof(struct ether_hdr); + l3_len = get_ipv6_hdr_len(pkt[j], l2_len, IPPROTO_ICMPV6); + icmp6_recv(ctx, pkt[j], l2_len, l3_len); + pkt[j] = NULL; + nb_zero++; + break; + default: + if (common_fill_hdr_len(pkt[j], tp, ctx) == NULL) { + pkt[j] = NULL; + nb_zero++; + } + break; + } + } + + if (nb_zero == 0) + return nb_pkts; + + return compress_pkt_list(pkt, nb_pkts, nb_zero); +} + +/* + * HW can recognize L2/L3/L4 and fragments; but cannot recognize ARP + * nor ICMP (ixgbe). + */ +static uint16_t +type1_rx_callback(uint16_t port, + uint16_t queue, + struct rte_mbuf *pkt[], + uint16_t nb_pkts, + uint16_t max_pkts, + void *user_param) +{ + uint32_t j, tp, l2_len, l3_len; + struct glue_ctx *ctx; + uint16_t nb_zero = 0; + const struct ether_hdr *eth; + const struct ipv4_hdr *ip4; + const struct ipv6_hdr *ip6; + uint16_t etp; + + RTE_SET_USED(port); + RTE_SET_USED(queue); + RTE_SET_USED(max_pkts); + + ctx = user_param; + + for (j = 0; j != nb_pkts; j++) { + tp = pkt[j]->packet_type & (RTE_PTYPE_L4_MASK | RTE_PTYPE_L3_MASK | + RTE_PTYPE_L2_MASK); + + switch (tp) { + case RTE_PTYPE_L2_ETHER: + eth = rte_pktmbuf_mtod(pkt[j], const struct ether_hdr *); + etp = eth->ether_type; + if (etp == rte_be_to_cpu_16(ETHER_TYPE_ARP)) + arp_recv(ctx, pkt[j], sizeof(*eth)); + pkt[j] = NULL; + nb_zero++; + break; + case (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV4_EXT | RTE_PTYPE_L2_ETHER): + ip4 = rte_pktmbuf_mtod_offset(pkt[j], + const struct ipv4_hdr *, + sizeof(*eth)); + if (ip4->next_proto_id == IPPROTO_ICMP) { + l2_len = sizeof(struct ether_hdr); + l3_len = get_ipv4_hdr_len(pkt[j], l2_len, + IPPROTO_ICMP, 0); + icmp_recv(ctx, pkt[j], l2_len, l3_len); + } else + rte_pktmbuf_free(pkt[j]); + + pkt[j] = NULL; + nb_zero++; + break; + case (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV6_EXT | RTE_PTYPE_L2_ETHER): + ip6 = rte_pktmbuf_mtod_offset(pkt[j], + const struct ipv6_hdr *, + sizeof(*eth)); + if (ip6->proto == IPPROTO_ICMPV6) { + l2_len = sizeof(struct ether_hdr); + l3_len = get_ipv6_hdr_len(pkt[j], l2_len, + IPPROTO_ICMPV6); + icmp6_recv(ctx, pkt[j], l2_len, l3_len); + } else + rte_pktmbuf_free(pkt[j]); + + pkt[j] = NULL; + nb_zero++; + break; + default: + if (common_fill_hdr_len(pkt[j], tp, ctx) == NULL) { + pkt[j] = NULL; + nb_zero++; + } + break; + } + } + + if (nb_zero == 0) + return nb_pkts; + + return compress_pkt_list(pkt, nb_pkts, nb_zero); +} + +/* + * generic, assumes HW doesn't recognize any packet type. + */ +uint16_t +typen_rx_callback(uint16_t port, + uint16_t queue, + struct rte_mbuf *pkt[], + uint16_t nb_pkts, + uint16_t max_pkts, + void *user_param) +{ + uint32_t j; + uint16_t nb_zero; + struct glue_ctx *ctx; + + RTE_SET_USED(port); + RTE_SET_USED(queue); + RTE_SET_USED(max_pkts); + + ctx = user_param; + + nb_zero = 0; + for (j = 0; j != nb_pkts; j++) { + /* fix me: now we avoid checking ip checksum */ + pkt[j]->ol_flags &= (~PKT_RX_IP_CKSUM_BAD); + pkt[j]->packet_type = 0; + pkt[j] = fill_ptypes_and_hdr_len(ctx, pkt[j]); + nb_zero += (pkt[j] == NULL); + } + + if (nb_zero == 0) + return nb_pkts; + + return compress_pkt_list(pkt, nb_pkts, nb_zero); +} + +static uint32_t +get_ptypes(uint16_t port_id) +{ + uint32_t smask; + int32_t i, rc; + const uint32_t pmask = + RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK | RTE_PTYPE_L4_MASK; + + smask = 0; + rc = rte_eth_dev_get_supported_ptypes(port_id, pmask, NULL, 0); + if (rc < 0) { + RTE_LOG(ERR, USER1, + "%s(port=%u) failed to get supported ptypes;\n", + __func__, port_id); + return smask; + } + + uint32_t ptype[rc]; + rc = rte_eth_dev_get_supported_ptypes(port_id, pmask, ptype, rc); + + for (i = 0; i != rc; i++) { + switch (ptype[i]) { + case RTE_PTYPE_L2_ETHER_ARP: + smask |= ETHER_ARP_PTYPE; + break; + case RTE_PTYPE_L3_IPV4: + case RTE_PTYPE_L3_IPV4_EXT_UNKNOWN: + smask |= IPV4_PTYPE; + break; + case RTE_PTYPE_L3_IPV4_EXT: + smask |= IPV4_EXT_PTYPE; + break; + case RTE_PTYPE_L3_IPV6: + case RTE_PTYPE_L3_IPV6_EXT_UNKNOWN: + smask |= IPV6_PTYPE; + break; + case RTE_PTYPE_L3_IPV6_EXT: + smask |= IPV6_EXT_PTYPE; + break; + case RTE_PTYPE_L4_TCP: + smask |= TCP_PTYPE; + break; + case RTE_PTYPE_L4_UDP: + smask |= UDP_PTYPE; + break; + case RTE_PTYPE_L4_ICMP: + smask |= ICMP_PTYPE; + break; + } + } + + return smask; +} + +/* In rx callbacks, we need to check and make sure below things are done, + * either by hw or by sw: + * 1. filter out arp packets, and handle arp packets properly + * - for arp request packet, reply arp if it's requesting myself. + * 2. fill l2, l3, l4 header length + * + * 3. GSO/GRO setup (TODO) + * + */ +int +setup_rx_cb(uint16_t port_id, uint16_t qid) +{ + int32_t rc; + uint32_t i, n, smask; + const void *cb; + struct glue_ctx *ctx; + const struct ptype2cb *ptype2cb; + + static const struct ptype2cb tcp_arp_ptype2cb[] = { + { /* i40e */ + .mask = ETHER_ARP_PTYPE | + ICMP_PTYPE | + IPV4_PTYPE | IPV4_EXT_PTYPE | + IPV6_PTYPE | IPV6_EXT_PTYPE | + TCP_PTYPE | UDP_PTYPE, + .name = "HW l2-arp/l3x/l4-tcp ptype", + .fn = type0_rx_callback, + }, + { /* ixgbe does not support ARP ptype */ + .mask = IPV4_PTYPE | IPV4_EXT_PTYPE | + IPV6_PTYPE | IPV6_EXT_PTYPE | + TCP_PTYPE | UDP_PTYPE, + .name = "HW l3x/l4-tcp ptype", + .fn = type1_rx_callback, + }, + { /* virtio */ + .mask = 0, + .name = "HW does not support any ptype", + .fn = typen_rx_callback, + }, + }; + + ctx = glue_ctx_lookup(port_id, qid); + if (ctx == NULL) { + GLUE_LOG(ERR, "no ctx fount by port(%d) and queue (%d)", + port_id, qid); + return -EINVAL; + } + + smask = get_ptypes(port_id); + + ptype2cb = tcp_arp_ptype2cb; + n = RTE_DIM(tcp_arp_ptype2cb); + + for (i = 0; i != n; i++) { + if ((smask & ptype2cb[i].mask) == ptype2cb[i].mask) { + cb = rte_eth_add_rx_callback(port_id, qid, + ptype2cb[i].fn, ctx); + rc = -rte_errno; + GLUE_LOG(ERR, "%s(port=%u), setup RX callback \"%s\";", + __func__, port_id, ptype2cb[i].name); + return ((cb == NULL) ? rc : 0); + } + } + + GLUE_LOG(ERR, "%s(port=%u) failed to find an appropriate callback", + __func__, port_id); + return -ENOENT; +} diff --git a/lib/libtle_glue/rxtx.c b/lib/libtle_glue/rxtx.c new file mode 100644 index 0000000..b80a3ac --- /dev/null +++ b/lib/libtle_glue/rxtx.c @@ -0,0 +1,573 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sym.h" + +#include <rte_common.h> +#include <rte_mbuf.h> +#include <rte_ip.h> +#include <rte_udp.h> +#include <rte_atomic.h> + +#include <tle_tcp.h> + +#include <stddef.h> +#include <fcntl.h> + +#include "tle_glue.h" +#include "fd.h" +#include "util.h" +#include "internal.h" + +rte_atomic32_t thr_cnt; + +#define MAX_UDP_PKT_LEN ((2 << 16) - 1 - sizeof(struct ipv4_hdr) - sizeof(struct udp_hdr)) + +static inline struct rte_mbuf * +from_mbuf_to_buf(struct rte_mbuf *m, char *buf, + size_t len, int ispeek, int needcpy) +{ + void *src; + uint32_t done = 0; + uint32_t left = len, orig_pkt_len; + uint16_t copy_len, seg_len, segs; + struct rte_mbuf *m_next, *orig_pkt; + + if (len == 0) + return m; + + orig_pkt = m; + orig_pkt_len = m->pkt_len; + segs = m->nb_segs; + + do { + seg_len = rte_pktmbuf_data_len(m); + copy_len = RTE_MIN(seg_len, left); + src = rte_pktmbuf_mtod(m, void *); + if (needcpy) + rte_memcpy(buf + done, src, copy_len); + done += copy_len; + left -= copy_len; + if (copy_len < seg_len) { + if (!ispeek) + rte_pktmbuf_adj(m, copy_len); + break; + } + m_next = m->next; + if (!ispeek) { + rte_pktmbuf_free_seg(m); + segs--; + } + m = m_next; + } while (left && m); + + if (m && !ispeek) { + m->nb_segs = segs; + m->pkt_len = orig_pkt_len - done; + } + + if(ispeek) + return orig_pkt; + else + return m; +} + +static inline bool +is_peer_closed(struct sock *so) +{ + if (errno == EAGAIN && tle_event_state(&so->erev) == TLE_SEV_UP) + return true; + + return false; +} + +static ssize_t +_recv(int sockfd, void *buf, size_t len, struct sockaddr *src_addr, int flags) +{ + int rx; + ssize_t rc; + ssize_t recvlen; + size_t tmplen; + struct sock *so; + struct rte_mbuf *m; + struct epoll_event event; + int needcpy; + + if (RTE_PER_LCORE(_lcore_id) == LCORE_ID_ANY) { + RTE_PER_LCORE(_lcore_id) = rte_atomic32_add_return(&thr_cnt, 1); + } + + so = fd2sock(sockfd); + + if (so->s == NULL) { + if (IS_UDP(so) && is_nonblock(so, flags)) + errno = EAGAIN; + else + errno = ENOTCONN; + return -1; + } + + if (so->rx_left) { + m = so->rx_left; + so->rx_left = NULL; + if (src_addr) { + OPS(so)->getname(so, src_addr, 1); + /* fixme: cannot get addr for UDP in this way */ + } + } else { + rc = OPS(so)->recv(so->s, &m, 1, src_addr); + if (rc == 0) { + if (is_nonblock(so, flags)) { + /* socket closed, return 0 */ + if (is_peer_closed(so)) { + GLUE_DEBUG("peer closed: %d", sockfd); + return 0; + } + + /* According to linux stack, + * receive from shutdown tcp socket returns 0. + * And receive from shutdown udp socket generate + * EAGAIN. In special case, we return ESHUTDOWN + * to notify upper application. + */ + if (so->shutdown & RECV_SHUTDOWN) { + if (so->proto == PROTO_TCP) + return 0; + else { +#ifdef LOOK_ASIDE_BACKEND + errno = ESHUTDOWN; +#else + errno = EAGAIN; +#endif + return -1; + } + } + return -1; + } + + do { + /* in blocking mode, recv from shutdown socket + * return 0 immediately */ + if (so->shutdown & RECV_SHUTDOWN) + return 0; + + /* some error occured, return -1 */ + if (errno != EAGAIN) + return -1; + + /* socket closed, return 0 */ + if (is_peer_closed(so)) { + GLUE_DEBUG("peer closed: %d", sockfd); + return 0; + } + + epoll_kernel_wait(CTX(so), -1, &event, 1, 1, &rx); + + be_process(CTX(so)); + } while((rc = OPS(so)->recv(so->s, &m, 1, src_addr)) == 0); + } + } + + /* get one pkt */ + if (!so->option.timestamp) + so->s->timestamp = m->timestamp; + + needcpy = 1; + recvlen = RTE_MIN(m->pkt_len, len); + if (flags & MSG_TRUNC) { + if (IS_UDP(so)) + recvlen = m->pkt_len; + else + /* According to linux manual, data will be discarded + * if recv TCP stream with MSG_TRUNC flag */ + needcpy = 0; + } + + so->rx_left = from_mbuf_to_buf(m, buf, len, flags & MSG_PEEK, needcpy); + + if (((flags & MSG_PEEK) == 0) && IS_UDP(so) && so->rx_left) { + rte_pktmbuf_free(so->rx_left); + so->rx_left = NULL; + } + + /* UDP socket only receive one pkt at one time */ + if (IS_UDP(so) || (flags & MSG_PEEK)) { + return recvlen; + } + /* TCP socket: try best to fill buf */ + len -= recvlen; + buf = (char*)buf + recvlen; + while (len) { + if (OPS(so)->recv(so->s, &m, 1, src_addr) == 0) + break; + + tmplen = (m->pkt_len < len) ? m->pkt_len : len; + so->rx_left = from_mbuf_to_buf(m, buf, tmplen, 0, needcpy); + len -= tmplen; + recvlen += tmplen; + buf = (char*)buf + tmplen; + } + + if (so->rx_left) + tle_event_raise(&so->rxev); + + /* may send window increase ACK after receive*/ + if (recvlen > 0) + be_tx_with_lock(CTX(so)); + + return recvlen; +} + +ssize_t PRE(recv)(int sockfd, void *buf, size_t len, int flags) +{ + if (is_kernel_fd(sockfd)) + return k_read(sockfd, buf, len); + + return _recv(sockfd, buf, len, NULL, flags); +} + +ssize_t PRE(recvfrom)(int sockfd, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + ssize_t rc; + if (is_kernel_fd(sockfd)) + return k_recv(sockfd, buf, len, flags); + + if (src_addr && !addrlen) { + errno = EINVAL; + return -1; + } + rc = _recv(sockfd, buf, len, src_addr, flags); + if (rc >= 0 && src_addr) { + if (src_addr->sa_family == AF_INET) { + *addrlen = sizeof(struct sockaddr_in); + } else { + *addrlen = sizeof(struct sockaddr_in6); + } + } + return rc; +} + +#define RECV_CONTINUE (-2) +static inline ssize_t +try_recvmsg(struct sock *so, struct msghdr *msg, int flags) +{ + ssize_t sz; + + if (so->s == NULL) { + if (IS_UDP(so) && is_nonblock(so, flags)) + errno = EAGAIN; + else + errno = ENOTCONN; + return -1; + } + + sz = OPS(so)->readv(so->s, msg, flags); + if (sz >= 0) { /* get data */ + /* may send window increase ACK after receive*/ + if (sz > 0) + be_tx_with_lock(CTX(so)); + return sz; + } + else if (errno != EAGAIN) /* error occurred */ + return -1; + else if (is_peer_closed(so)) { + GLUE_DEBUG("peer closed: %d", so->fd); + return 0; + } else if (is_nonblock(so, flags)) + return -1; + + return RECV_CONTINUE; +} + +ssize_t PRE(recvmsg)(int sockfd, struct msghdr *msg, int flags) +{ + ssize_t sz; + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_recvmsg(sockfd, msg, flags); + + so = fd2sock(sockfd); + + if (so->rx_left == NULL && OPS(so)->readv && + (flags & MSG_PEEK) == 0 && + ((flags & MSG_TRUNC) == 0 || so->proto == PROTO_UDP)) { + /* udp_readv supports MSG_TRUNC, tcp_readv not yet. + * so only udp socket implement with readv interface. + */ + sz = try_recvmsg(so, msg, flags); + if (sz != RECV_CONTINUE) + return sz; + } + + /* 1. rx_left != NULL; 2. get no data, fall back to blocking read */ + + if (so->rx_left != NULL && msg != NULL && msg->msg_control != NULL) { + if (so->option.timestamp) + tle_set_timestamp(msg, so->rx_left); + else + msg->msg_controllen = 0; + } + + sz = PRE(recvfrom)(sockfd, msg->msg_iov[0].iov_base, + msg->msg_iov[0].iov_len, flags, + (struct sockaddr *)msg->msg_name, + &msg->msg_namelen); + + return sz; +} + +ssize_t PRE(read)(int fd, void *buf, size_t count) +{ + if (is_kernel_fd(fd)) + return k_read(fd, buf, count); + + return _recv(fd, buf, count, NULL, 0); +} + +#define DECONST(type, var) ((type)(uintptr_t)(const void *)(var)) + +ssize_t PRE(readv)(int fd, const struct iovec *iov, int iovcnt) +{ + ssize_t sz; + struct sock *so; + struct msghdr msg; + + if (is_kernel_fd(fd)) + return k_readv(fd, iov, iovcnt); + + if (RTE_PER_LCORE(_lcore_id) == LCORE_ID_ANY) { + RTE_PER_LCORE(_lcore_id) = rte_atomic32_add_return(&thr_cnt, 1); + } + + so = fd2sock(fd); + + if (so->rx_left == NULL && OPS(so)->readv) { + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = DECONST(struct iovec *, iov); + msg.msg_iovlen = iovcnt; + sz = try_recvmsg(so, &msg, 0); + if (sz != RECV_CONTINUE) + return sz; + } + + /* 1. rx_left != NULL; 2. get no data, fall back to blocking read */ + + /* fixme: when so->rx_left != NULL, also needs readv. + * maybe need to modify readv interface args of ops */ + return _recv(fd, iov[0].iov_base, iov[0].iov_len, NULL, 0); +} + +static ssize_t +_send(int sockfd, const void *buf, size_t len, + const struct sockaddr *peer, int flags) +{ + struct sock *so = fd2sock(sockfd); + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + uint16_t nb_mbufs = (len + RTE_MBUF_DEFAULT_DATAROOM - 1) + / RTE_MBUF_DEFAULT_DATAROOM; + uint16_t i, cnt, copy_len; + int rc; + struct rte_mbuf *mbufs[nb_mbufs + 1]; + size_t done = 0; + uint32_t left = 0; + char *dst; + int blocking = !is_nonblock(so, flags); + + if (RTE_PER_LCORE(_lcore_id) == LCORE_ID_ANY) { + RTE_PER_LCORE(_lcore_id) = rte_atomic32_add_return(&thr_cnt, 1); + } + + if (!blocking && len > def_sndbuf && so->proto == PROTO_TCP) { + len = def_sndbuf; + nb_mbufs = (len + RTE_MBUF_DEFAULT_DATAROOM - 1) + / RTE_MBUF_DEFAULT_DATAROOM; + } + + if (unlikely(len == 0)) { + if (so->proto == PROTO_TCP) + return 0; + else + nb_mbufs = 1; + } + + if (unlikely(len > MAX_UDP_PKT_LEN && IS_UDP(so))) { + errno = EMSGSIZE; + return -1; + } + + if (blocking) + be_process(get_ctx()); + + if (unlikely(rte_pktmbuf_alloc_bulk(mp, mbufs, nb_mbufs) < 0)) { + errno = ENOMEM; + return -1; + } + + for (i = 0; i < nb_mbufs; ++i) { + copy_len = RTE_MIN((size_t)RTE_MBUF_DEFAULT_DATAROOM, + len - done); + dst = rte_pktmbuf_mtod(mbufs[i], char *); + rte_memcpy(dst, (const char *)buf + done, copy_len); + done += copy_len; + mbufs[i]->data_len = copy_len; + mbufs[i]->pkt_len = copy_len; + } + + cnt = 0; +do_send: + rc = OPS(so)->send(so, mbufs + cnt, nb_mbufs - cnt, peer); + + cnt += rc; + + if (cnt > 0) + be_tx_with_lock(CTX(so)); + + if (cnt > 0 && blocking) + be_process(get_ctx()); + + if (blocking && + cnt < nb_mbufs && + (rc > 0 || errno == EAGAIN) && + tle_event_state(&so->erev) != TLE_SEV_UP) { + be_process(get_ctx()); + goto do_send; + } + + for (i = cnt; i < nb_mbufs; ++i) { + left += mbufs[i]->pkt_len; + rte_pktmbuf_free_seg(mbufs[i]); + } + + if (cnt == 0) + return -1; + else + return len - left; +} + +ssize_t PRE(send)(int sockfd, const void *buf, size_t len, int flags) +{ + if (is_kernel_fd(sockfd)) + return k_write(sockfd, buf, len); + + /* MSG_NOSIGNAL means "Do not generate SIGPIPE". Ignore this flag */ + flags &= ~MSG_NOSIGNAL; + + return _send(sockfd, buf, len, NULL, flags); +} + +ssize_t PRE(sendto)(int sockfd, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + if (is_kernel_fd(sockfd)) + return k_sendto(sockfd, buf, len, flags, dest_addr, addrlen); + + /* MSG_NOSIGNAL means "Do not generate SIGPIPE". Ignore this flag */ + flags &= ~MSG_NOSIGNAL; + + return _send(sockfd, buf, len, dest_addr, flags); +} + +ssize_t PRE(sendmsg)(int sockfd, const struct msghdr *msg, int flags) +{ + ssize_t ret; + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_sendmsg(sockfd, msg, flags); + + /* MSG_NOSIGNAL means "Do not generate SIGPIPE". Ignore this flag */ + flags &= ~MSG_NOSIGNAL; + + so = fd2sock(sockfd); + if (OPS(so)->writev) { + ret = OPS(so)->writev(so, msg->msg_iov, msg->msg_iovlen, + msg->msg_name); + if (ret < 0) { + if (errno != EAGAIN || is_nonblock(so, flags)) + return -1; + } else { + /* TODO: blocking && ret < total length */ + be_tx_with_lock(CTX(so)); + return ret; + } + + /* fall through to blocking send */ + } + + return _send(sockfd, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len, + (struct sockaddr *)msg->msg_name, flags); +} + +ssize_t PRE(write)(int fd, const void *buf, size_t count) +{ + if (is_kernel_fd(fd)) + return k_write(fd, buf, count); + + return _send(fd, buf, count, NULL, 0); +} + +ssize_t PRE(writev)(int fd, const struct iovec *iov, int iovcnt) +{ + ssize_t ret; + struct sock *so; + + if (is_kernel_fd(fd)) + return k_writev(fd, iov, iovcnt); + + if (RTE_PER_LCORE(_lcore_id) == LCORE_ID_ANY) { + RTE_PER_LCORE(_lcore_id) = rte_atomic32_add_return(&thr_cnt, 1); + } + + so = fd2sock(fd); + if (OPS(so)->writev) { + ret = OPS(so)->writev(so, iov, iovcnt, NULL); + if (ret < 0) { + if (errno != EAGAIN || is_nonblock(so, 0)) + return -1; + } else { + /* TODO: blocking && ret < total length */ + be_tx_with_lock(CTX(so)); + return ret; + } + + /* fall through to blocking send */ + } + + return _send(fd, iov[0].iov_base, iov[0].iov_len, NULL, 0); +} + +/* advanced functions */ +ssize_t PRE(splice)(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) +{ + if (is_kernel_fd(fd_in) && is_kernel_fd(fd_out)) + return k_splice(fd_in, off_in, fd_out, off_out, len, flags); + + rte_panic("splice is not supported yet"); + errno = EOPNOTSUPP; + return -1; +} + +ssize_t PRE(sendfile)(int out_fd, int in_fd, off_t *offset, size_t count) +{ + if (is_kernel_fd(out_fd) && is_kernel_fd(in_fd)) + return k_sendfile(out_fd, in_fd, offset, count); + + rte_panic("sendfile is not supported yet"); + errno = EOPNOTSUPP; + return -1; +} diff --git a/lib/libtle_glue/select.c b/lib/libtle_glue/select.c new file mode 100644 index 0000000..b3b8539 --- /dev/null +++ b/lib/libtle_glue/select.c @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <signal.h> +#include <sys/select.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + +#include "fd.h" +#include "ctx.h" +#include "sym.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "tle_glue.h" + +#define FD_ZERO_N(s, n) do { memset((s)->fds_bits, 0, n/sizeof(long)); } while(0) + +static int +fdset_to_events_user(int nfds, fd_set *fdset, int *total, int event) +{ + int i, num = 0; + struct sock *so; + const struct tle_event *ev; + + for (i = fd_table.fd_base; i < nfds; ++i) { + if (!FD_ISSET(i, fdset)) + continue; + + so = fd2sock(i); /* fix me: check if fd is opened */ + + switch (event) { + case EPOLLIN: + ev = &so->rxev; + break; + case EPOLLOUT: + ev = &so->txev; + break; + case EPOLLERR: + ev = &so->erev; + break; + default: + rte_panic("non-sense value\n"); + } + /* Check event is ready */ + if (TLE_SEV_UP == tle_event_state(ev)) { + *total = *total + 1; + } else { + FD_CLR(i, fdset); + num++; + } + + /* We fill sock->event here as we need this when + * we filter events in poll_common(). But it was + * originally set by epoll_ctl(). Now we have to + * assume that there are no application which + * uses epoll/poll/select at the same time. + */ + so->event.events |= event; + so->event.data.u32 = i; + } + + return num; +} + +static int +fdset_to_events_kernel(int nfds, fd_set *fdset, int efd, int event) +{ + int i, num = 0; + struct epoll_event k_ev; + + for (i = 0; i < nfds; ++i) { + if (!FD_ISSET(i, fdset)) + continue; + + k_ev.events = event; + k_ev.data.u32 = i; + k_epoll_ctl(efd, EPOLL_CTL_ADD, i, &k_ev); + num++; + } + + return num; +} + +int +PRE(select)(int nfds, fd_set *readfds, fd_set *writefds, + fd_set *exceptfds, struct timeval *timeout) +{ + int to; + struct glue_ctx *ctx; + int j, efd, total = 0, max = 0; + + /* thread <> context binding happens here */ + if (RTE_PER_LCORE(glue_ctx) == NULL) { + ctx = &ctx_array[glue_ctx_alloc()]; + RTE_PER_LCORE(glue_ctx) = ctx; + } else + ctx = RTE_PER_LCORE(glue_ctx); + + /* step 0, process some packets */ + be_process(ctx); + + /* step 1, check if any userspace events are ready */ + + if (readfds) + max += fdset_to_events_user(nfds, readfds, + &total, EPOLLIN); + if (writefds) + max += fdset_to_events_user(nfds, writefds, + &total, EPOLLOUT); + if (exceptfds) + max += fdset_to_events_user(nfds, writefds, + &total, EPOLLERR); + if (total > 0) { + /* userspace events go firstly */ + if (readfds) + FD_ZERO_N(readfds, fd_table.fd_base); + if (writefds) + FD_ZERO_N(writefds, fd_table.fd_base); + if (exceptfds) + FD_ZERO_N(exceptfds, fd_table.fd_base); + + return total; + } + + /* step 2, only wait for kernel events? */ + if (max == 0) + return k_select(nfds, readfds, writefds, exceptfds, timeout); + + /* step 3, slow path: wait for I/O and kernel events */ + efd = k_epoll_create(1); + if (efd < 0) + rte_panic("k_epoll_create failed %d", errno); + + nfds = RTE_MIN(nfds, fd_table.fd_base); + if (readfds) + max += fdset_to_events_kernel(nfds, readfds, + efd, EPOLLIN); + if (writefds) + max += fdset_to_events_kernel(nfds, writefds, + efd, EPOLLOUT); + if (exceptfds) + max += fdset_to_events_kernel(nfds, exceptfds, + efd, EPOLLERR); + + struct epoll_event events[max]; + + if (timeout) + to = timeout->tv_sec * 1000 + timeout->tv_usec / 1000; + else + to = -1; + total = poll_common(ctx, events, max, to, efd); + + k_close(efd); + for (j = 0; j < total; ++j) { + if (events[j].events & EPOLLIN) + FD_SET(events[j].data.fd, readfds); + + if (events[j].events & EPOLLOUT) + FD_SET(events[j].data.fd, writefds); + + if ((events[j].events & (EPOLLHUP | EPOLLERR)) && exceptfds) + FD_SET(events[j].data.fd, exceptfds); + } + return total; +} + +int +PRE(pselect)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, + const struct timespec *timeout, const sigset_t *sigmask) +{ + struct timeval tv, *tv_to; + + if (sigmask != NULL) + rte_panic("pselect with signal is not supported"); + + if (timeout) { + tv.tv_usec = timeout->tv_nsec / 1000; + tv.tv_sec = timeout->tv_sec; + tv_to = &tv; + } else + tv_to = NULL; + + return select(nfds, readfds, writefds, exceptfds, tv_to); +} diff --git a/lib/libtle_glue/sock.h b/lib/libtle_glue/sock.h new file mode 100644 index 0000000..fcd6362 --- /dev/null +++ b/lib/libtle_glue/sock.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _SOCK_H_ +#define _SOCK_H_ + +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <string.h> + +#include <tle_event.h> +#include <tle_ctx.h> + +#include "ctx.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern unsigned int def_sndbuf; +extern unsigned int def_rcvbuf; + +#ifndef TCP_FASTOPEN +#define TCP_FASTOPEN 23 +#endif + +#ifndef TCP_USER_TIMEOUT +#define TCP_USER_TIMEOUT 18 +#endif + +#ifndef TCP_FASTOPEN_CONNECT +#define TCP_FASTOPEN_CONNECT 30 +#endif + +struct sock; + +struct proto { + int (*setsockopt)(struct sock *sk, int optname, const void *optval, + socklen_t optlen); + int (*getsockopt)(struct sock *sk, int optname, void *optval, + socklen_t *option); + int (*getname)(struct sock *sk, struct sockaddr *addr, int peer); + + int (*bind)(struct sock *sk, const struct sockaddr *addr); + int (*listen)(struct sock *sk, int backlog); + int (*connect)(struct sock *sk, const struct sockaddr *addr); + int (*accept)(struct sock *sk, struct sockaddr *addr, + socklen_t *addrlen, int flags); + + ssize_t (*recv)(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num, struct sockaddr *addr); + ssize_t (*send)(struct sock *sk, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr); + + ssize_t (*readv)(struct tle_stream *s, struct msghdr *msg, int flags); + ssize_t (*writev)(struct sock *sk, const struct iovec *iov, + int iovcnt, const struct sockaddr *dst_addr); + + int (*shutdown)(struct sock *sk, int how); + int (*close)(struct tle_stream *s); + + void (*update_cfg)(struct sock *sk); + + char name[32]; +}; + +enum { + PROTO_TCP, + PROTO_UDP +}; + +#define RECV_SHUTDOWN 1 +#define SEND_SHUTDOWN 2 + +extern struct proto udp_prot; +extern struct proto tcp_prot; +extern struct proto *supported_proto_ops[]; + +struct sock { + int fd; + uint32_t cid:8, /* ctx id for indexing ctx_array */ + domain:8, /* for AF_INET, AF_INET6 */ + proto:8, /* PROTO_TCP, PROTO_UDP */ + valid:1, + epoll:1, + ubind:1, + ubindany:1, + nonblock:1, + tcp_connected:1, + shutdown:2; + struct tle_stream *s; + struct rte_mbuf *rx_left; + tle_stream_options_t option; + union { + struct epoll_event event; + int shadow_efd; + }; + struct tle_event txev; + struct tle_event rxev; + struct tle_event erev; +} __rte_cache_aligned; + +#define CTX(so) (&ctx_array[so->cid]) +#define OPS(so) (supported_proto_ops[so->proto]) +#define IS_TCP(so) (so->proto == PROTO_TCP) +#define IS_UDP(so) (so->proto == PROTO_UDP) + +static inline int +is_nonblock(struct sock *so, int flags) +{ + return (flags & MSG_DONTWAIT) || so->nonblock; +} + +static inline struct tle_ctx * +get_sock_ctx(struct sock *so) +{ + if (IS_TCP(so)) + return CTX(so)->tcp_ctx; + else + return CTX(so)->udp_ctx; +} + +static inline size_t +get_sockaddr_len(sa_family_t family) +{ + switch (family) { + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_INET6: + return sizeof(struct sockaddr_in6); + case AF_UNSPEC: + return sizeof(sa_family_t); + default: + return 0; + } +} + +#ifdef __cplusplus +} +#endif + +#endif /*_SOCK_H_ */ diff --git a/lib/libtle_glue/socket.c b/lib/libtle_glue/socket.c new file mode 100644 index 0000000..31b28be --- /dev/null +++ b/lib/libtle_glue/socket.c @@ -0,0 +1,720 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sym.h" + +#include <stdarg.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/tcp.h> +#include <netinet/udp.h> + +#include "tle_glue.h" +#include "fd.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "sock.h" + +struct proto *supported_proto_ops[] = { + [PROTO_TCP] = &tcp_prot, + [PROTO_UDP] = &udp_prot, +}; + +/* for setup, settings, and destroy */ +int PRE(socket)(int domain, int type, int protocol) +{ + int fd; + struct sock *so; + + if ((domain != AF_INET && domain != AF_INET6) || + (type != SOCK_STREAM && type != SOCK_DGRAM)) + return k_socket(domain, type, protocol); + + if (domain == AF_INET) { + if (default_ctx->ipv4 == 0 && !default_ctx->lo4_enabled) { + errno = EAFNOSUPPORT; + return -1; + } + } else { + if (IN6_IS_ADDR_UNSPECIFIED(&default_ctx->ipv6) && + !default_ctx->lo6_enabled) { + errno = EAFNOSUPPORT; + return -1; + } + } + + fd = get_unused_fd(); + if (fd < 0) { + errno = ENFILE; + return -1; + } + so = fd2sock(fd); + so->cid = get_cid(); + if (type == SOCK_STREAM) + so->proto = PROTO_TCP; + else /* type == SOCK_DGRAM */ + so->proto = PROTO_UDP; + + so->domain = domain; + so->option.raw = 0; + so->option.mulloop = 1; + so->option.multtl = 1; + if (type == SOCK_STREAM) { + so->option.tcpquickack = 1; + /* linux default value: 2 hours */ + so->option.keepidle = 2 * 60 * 60; + /* linux default value: 75seconds */ + so->option.keepintvl = 75; + /* linux default value: 9 */ + so->option.keepcnt = 9; + } + + sock_alloc_events(so); + + GLUE_DEBUG("socket fd = %d", fd); + printf("socket fd = %d", fd); + return fd; +} + +int PRE(bind)(int sockfd, const struct sockaddr *addr, socklen_t addrlen) +{ + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_bind(sockfd, addr, addrlen); + + so = fd2sock(sockfd); + if (so->s) { + /* The socket is already bound to an address */ + errno = EINVAL; + return -1; + } + + if (addrlen < get_sockaddr_len(addr->sa_family)) { + errno = EINVAL; + return -1; + } + + so->cid = get_cid(); /* allow ctx reset as stream is null */ + if (OPS(so)->bind) + return OPS(so)->bind(so, addr); + + errno = EOPNOTSUPP; + return -1; +} + +int PRE(listen)(int sockfd, int backlog) +{ + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_listen(sockfd, backlog); + + so = fd2sock(sockfd); + + if (OPS(so)->listen) + return OPS(so)->listen(so, backlog); + + errno = EOPNOTSUPP; + return -1; +} + +int PRE(accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_accept(sockfd, addr, addrlen); + + so = fd2sock(sockfd); + if (OPS(so)->accept) + return OPS(so)->accept(so, addr, addrlen, 0); + + errno = EOPNOTSUPP; + return -1; +} + +int PRE(accept4)(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int fd; + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_accept4(sockfd, addr, addrlen, flags); + + fd = PRE(accept)(sockfd, addr, addrlen); + + /* inherit NONBLOCK flag */ + if (fd >= 0 && (flags & SOCK_NONBLOCK)) { + so = fd2sock(fd); + so->nonblock = 1; + } + + return fd; +} + +int PRE(connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen) +{ + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_connect(sockfd, addr, addrlen); + + if (addrlen < get_sockaddr_len(addr->sa_family)) { + errno = EINVAL; + return -1; + } + + so = fd2sock(sockfd); + so->cid = get_cid(); + + if (!(is_nonblock(so, 0))) + mac_check(CTX(so), addr); + + if (OPS(so)->connect) + return OPS(so)->connect(so, addr); + + errno = EOPNOTSUPP; + return -1; +} + +unsigned int def_sndbuf = 212992; +unsigned int def_rcvbuf = 212992; +static struct linger ling; + +int PRE(getsockopt)(int sockfd, int level, int optname, + void *optval, socklen_t *optlen) +{ + struct sock *so; + union { + int val; + uint64_t val64; + struct linger ling; + struct timeval tm; + } *p = optval; + + + if (is_kernel_fd(sockfd)) + return k_getsockopt(sockfd, level, optname, optval, optlen); + + if (!optval && !optlen) + return -1; + + so = fd2sock(sockfd); + + switch (level) { + case IPPROTO_IP: + switch (optname) { + case IP_OPTIONS: + *optlen = 0; + return 0; + case IP_MULTICAST_LOOP: + p->val = so->option.mulloop; + return 0; + case IP_MULTICAST_TTL: + p->val = so->option.multtl; + return 0; + } + break; + case IPPROTO_IPV6: + switch (optname) { + case IPV6_V6ONLY: + p->val = so->option.ipv6only; + return 0; + } + break; + case SOL_SOCKET: + /* man socket(7), see /usr/include/asm-generic/socket.h */ + switch (optname) { + case SO_REUSEADDR: + p->val = so->option.reuseaddr; + return 0; + case SO_REUSEPORT: + p->val = so->option.reuseport; + return 0; + case SO_ERROR: + if (TLE_SEV_DOWN == tle_event_state(&so->erev)) + p->val = 0; + else + p->val = ECONNREFUSED; + /* fixe me: ETIMEDOUT */ + return 0; + case SO_LINGER: + p->ling.l_onoff = 0; + return 0; + case SO_SNDBUF: + p->val = def_sndbuf; + return 0; + case SO_RCVBUF: + p->val = def_rcvbuf; + return 0; + case SO_ACCEPTCONN: + if (IS_TCP(so) + && TCP_STREAM(so->s)->tcb.state == TCP_ST_LISTEN) + p->val = 1; + else + p->val = 0; + return 0; + case SO_KEEPALIVE: + p->val = so->option.keepalive; + return 0; + case SO_TYPE: + if (IS_TCP(so)) + p->val = SOCK_STREAM; + else + p->val = SOCK_DGRAM; + return 0; + case SO_OOBINLINE: + p->val = so->option.oobinline; + return 0; + case SO_TIMESTAMP: + p->val = so->option.timestamp; + return 0; + case SO_PROTOCOL: + if (so->proto == PROTO_TCP) + p->val = IPPROTO_TCP; + else + p->val = IPPROTO_UDP; + return 0; + default: + break; + } + + break; + case SOL_TCP: + case SOL_UDP: + return OPS(so)->getsockopt(so, optname, optval, optlen); + } + + GLUE_LOG(WARNING, "getsockopt(%d) with level = %d, optname = %d", + sockfd, level, optname); + errno = EOPNOTSUPP; + return -1; +} + +int PRE(setsockopt)(int sockfd, int level, int optname, + const void *optval, socklen_t optlen) +{ + int val; + struct sock *so; + if (is_kernel_fd(sockfd)) + return k_setsockopt(sockfd, level, optname, optval, optlen); + if (!optval && !optlen) + return -1; + + val = 0; /* just to make compiler happy */ + switch (optlen) { + case sizeof(char): + val = *(const char *)optval; + break; + case sizeof(int): + val = *(const int *)optval; + break; + } + + so = fd2sock(sockfd); + + switch (level) { + case IPPROTO_IP: + switch (optname) { + case IP_RECVERR: + /* needed by netperf */ + return 0; + case IP_MULTICAST_LOOP: + if (val == 0) + so->option.mulloop = 0; + else + so->option.mulloop = 1; + if (so->s != NULL) + so->s->option.mulloop = so->option.mulloop; + return 0; + case IP_MULTICAST_TTL: + if (val > 255 || val < -1) { + errno = EINVAL; + return -1; + } + if(val == -1) { + val = 1; + } + so->option.multtl = val; + if (so->s != NULL) + so->s->option.multtl = so->option.multtl; + return 0; + case IP_ADD_MEMBERSHIP: + if (optlen < sizeof(struct ip_mreq)) { + errno = EINVAL; + return -1; + } + const struct ip_mreq* mreq = (const struct ip_mreq*)optval; + if (mreq->imr_multiaddr.s_addr == INADDR_ANY) { + errno = EINVAL; + return -1; + } + errno = EOPNOTSUPP; + return -1; + case IP_MTU_DISCOVER: + return 0; + case IP_TOS: + return 0; + case IP_RECVTOS: + return 0; + } + break; + case IPPROTO_IPV6: + switch (optname) { + case IPV6_V6ONLY: + if (val == 0) + so->option.ipv6only = 0; + else + so->option.ipv6only = 1; + if (so->s != NULL) + so->s->option.ipv6only = so->option.ipv6only; + return 0; + case IPV6_TCLASS: + return 0; + case IPV6_RECVTCLASS: + return 0; + } + break; + case SOL_SOCKET: + switch (optname) { + case SO_REUSEADDR: + if (val == 0) + so->option.reuseaddr = 0; + else + so->option.reuseaddr = 1; + if (so->s != NULL) + so->s->option.reuseaddr = so->option.reuseaddr; + return 0; + case SO_LINGER: + ling = *(const struct linger *)optval; + if (ling.l_onoff == 0) + return 0; + else { + GLUE_LOG(ERR, "app is enabling SO_LINGER which is not really supported"); + return 0; + } + break; + case SO_KEEPALIVE: + if (val == 0) + so->option.keepalive = 0; + else + so->option.keepalive = 1; + if (so->s != NULL) { + so->s->option.keepalive = so->option.keepalive; + if (so->proto == PROTO_TCP) + tle_tcp_stream_set_keepalive(so->s); + } + return 0; + case SO_REUSEPORT: + if (val == 0) + so->option.reuseport = 0; + else + so->option.reuseport = 1; + if (so->s != NULL) + so->s->option.reuseport = so->option.reuseport; + return 0; + case SO_SNDBUF: + def_sndbuf = val; + return 0; + case SO_RCVBUF: + def_rcvbuf = val; + return 0; + case SO_DONTROUTE: + /* needed by netperf */ + return 0; + case SO_BROADCAST: + /* needed by nc */ + /* todo: only supported for DGRAM */ + return 0; + case SO_TIMESTAMP: + so->option.timestamp = !!val; + if (so->s != NULL) + so->s->option.timestamp = so->option.timestamp; + return 0; + case SO_OOBINLINE: + if (val == 0) + so->option.oobinline = 0; + else + so->option.oobinline = 1; + if (so->s != NULL) + so->s->option.oobinline = so->option.oobinline; + return 0; + default: + break; + } + break; + case IPPROTO_TCP: + case IPPROTO_UDP: + return OPS(so)->setsockopt(so, optname, optval, optlen); + } + + GLUE_LOG(WARNING, "setsockopt(%d) with level = %d, optname = %d\n", + sockfd, level, optname); + errno = EOPNOTSUPP; + return -1; +} + +/* + * Refer to glibc/sysdeps/unix/sysv/linux/fcntl.c + */ +int PRE(fcntl)(int fd, int cmd, ...) +{ + int rc; + void *arg; + va_list ap; + struct sock *so; + + va_start(ap, cmd); + arg = va_arg(ap, void *); + va_end(ap); + + if (is_kernel_fd(fd)) + return k_fcntl(fd, cmd, arg); + + so = fd2sock(fd); + switch (cmd) { + case F_SETFL: + if ((unsigned long)arg & O_NONBLOCK) + so->nonblock = 1; + else + so->nonblock = 0; + rc = 0; + break; + case F_GETFL: + if (so->nonblock) + rc = O_NONBLOCK | O_RDWR; + else + rc = O_RDWR; + break; + case F_SETFD: + rc = 0; + break; + default: + rc = -1; + errno = EOPNOTSUPP; + GLUE_LOG(WARNING, "fcntl(%d) with cmd = %d", fd, cmd); + } + + return rc; +} + +/* + * Refer to musl/src/misc/ioctl.c + */ +int PRE(ioctl)(int fd, unsigned long int request, ...) +{ + int rc; + void *arg; + va_list ap; + uint16_t left; + struct sock *so; + struct rte_mbuf *m; + + va_start(ap, request); + arg = va_arg(ap, void *); + va_end(ap); + + if (is_kernel_fd(fd)) + return k_ioctl(fd, request, arg); + + so = fd2sock(fd); + + switch (request) { + case FIONREAD: /* SIOCINQ */ + if (so->s == NULL) + *(int *)arg = 0; + else if (IS_TCP(so)) { + left = tle_tcp_stream_inq(so->s); + if (so->rx_left) + left += rte_pktmbuf_pkt_len(so->rx_left); + *(int *)arg = left; + } else { + if (so->rx_left) + *(int *)arg = rte_pktmbuf_pkt_len(so->rx_left); + else { + if (tle_udp_stream_recv(so->s, &m , 1) == 0) + *(int *)arg = 0; + else { + *(int *)arg = rte_pktmbuf_pkt_len(m); + so->rx_left = m; + } + } + } + rc = 0; + break; + case FIONBIO: + if (*(int *)arg) + so->nonblock = 1; + else + so->nonblock = 0; + rc = 0; + break; + case SIOCGSTAMP: + if (so->s->timestamp == 0) { + errno = ENOENT; + rc = -1; + } else { + ((struct timeval*)arg)->tv_sec = so->s->timestamp >> 20; + ((struct timeval*)arg)->tv_usec = so->s->timestamp & 0xFFFFFUL; + rc = 0; + } + break; + default: + errno = EOPNOTSUPP; + rc = -1; + GLUE_LOG(WARNING, "ioctl(%d) with request = %ld", fd, request); + } + + return rc; +} + +int PRE(shutdown)(int sockfd, int how) +{ + struct sock *so; + + if (is_kernel_fd(sockfd)) + return k_shutdown(sockfd, how); + + so = fd2sock(sockfd); + switch (how) { + case SHUT_RD: + so->shutdown |= RECV_SHUTDOWN; + break; + case SHUT_WR: + so->shutdown |= SEND_SHUTDOWN; + break; + case SHUT_RDWR: + so->shutdown = RECV_SHUTDOWN | SEND_SHUTDOWN; + break; + } + if (OPS(so)->shutdown) + return OPS(so)->shutdown(so, how); + + errno = EOPNOTSUPP; + return -1; +} + +static inline int +getname(int sockfd, struct sockaddr *uaddr, socklen_t *addrlen, int peer) +{ + struct sock *so; + size_t socklen; + int rc; + + so = fd2sock(sockfd); + + /* This is ugly, but netperf ask for local addr (before any + * connect or bind) to check family. + * + * To formally fix this, we shall bind a local address in advance + */ + socklen = get_sockaddr_len(so->domain); + /* fixme: It is not conform to linux standard, fix it later. */ + if (*addrlen < socklen) { + errno = EINVAL; + return -1; + } + *addrlen = socklen; + + if (so->s == NULL) { + if (peer) { + errno = ENOTCONN; + return -1; + } else { + memset(uaddr, 0, socklen); + uaddr->sa_family = so->domain; + return 0; + } + } + + if (OPS(so)->getname) { + rc = OPS(so)->getname(so, uaddr, peer); + if (rc < 0) + return rc; + if (peer) { + if ((uaddr->sa_family == AF_INET && + ((struct sockaddr_in*)uaddr)->sin_addr.s_addr == 0) || + (uaddr->sa_family == AF_INET6 && + IN6_IS_ADDR_UNSPECIFIED(&((struct sockaddr_in6*) + uaddr)->sin6_addr))) { + errno = ENOTCONN; + return -1; + } + } + if (uaddr->sa_family == AF_INET && so->domain == AF_INET6) + trans_4mapped6_addr(uaddr); + return rc; + } + + errno = EOPNOTSUPP; + return -1; +} + +int PRE(getsockname)(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + if (is_kernel_fd(sockfd)) + return k_getsockname(sockfd, addr, addrlen); + + return getname(sockfd, addr, addrlen, 0); +} + +int PRE(getpeername)(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + if (is_kernel_fd(sockfd)) + return k_getpeername(sockfd, addr, addrlen); + + return getname(sockfd, addr, addrlen, 1); +} + +int PRE(close)(int fd) +{ + struct sock *so; + + if (is_kernel_fd(fd)) + return k_close(fd); + + GLUE_DEBUG("close fd = %d", fd); + + so = fd2sock(fd); + if (unlikely(so->valid == 0)) { + errno = EBADF; + return -1; + } else if (unlikely(so->epoll)) { + k_close(so->shadow_efd); + glue_ctx_free(CTX(so)); + } else if (so->s) { + if (OPS(so)->close) + OPS(so)->close(so->s); + + if (IS_TCP(so)) + be_tx_with_lock(CTX(so)); + + if (so->rx_left) + rte_pktmbuf_free(so->rx_left); + } + + tle_event_idle_err(&so->erev); + tle_event_idle(&so->rxev); + tle_event_idle(&so->txev); + + memset(((int*)so) + 1, 0, sizeof(*so) - sizeof(int)); + put_free_fd(fd); + return 0; +} diff --git a/lib/libtle_glue/sym.c b/lib/libtle_glue/sym.c new file mode 100644 index 0000000..39b1707 --- /dev/null +++ b/lib/libtle_glue/sym.c @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#ifndef __USE_GNU +#define __USE_GNU +#endif +#include <dlfcn.h> + +#include <rte_debug.h> + +#include "sym.h" +#include "log.h" + +#ifdef PRELOAD +int (*k_epoll_create)(int size); +int (*k_epoll_create1)(int flags); +int (*k_epoll_create1)(int flags); +int (*k_epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); +int (*k_epoll_wait)(int epfd, struct epoll_event *events, int maxevents, int timeout); +int (*k_epoll_pwait)(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask); +int (*k_poll)(struct pollfd *fds, nfds_t nfds, int timeout); +int (*k_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); +int (*k_pselect)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask); +int (*k_socket)(int domain, int type, int protocol); +int (*k_listen)(int sockfd, int backlog); +int (*k_bind)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int (*k_accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int (*k_accept4)(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); +int (*k_connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int (*k_getsockopt)(int sockfd, int level, int optname, void *optval, socklen_t *optlen); +int (*k_setsockopt)(int sockfd, int level, int optname, const void *optval, socklen_t optlen); +int (*k_fcntl)(int fd, int cmd, ... /* arg */ ); +int (*k_ioctl)(int d, int request, ...); +int (*k_shutdown)(int sockfd, int how); +int (*k_close)(int fd); +ssize_t (*k_recv)(int sockfd, void *buf, size_t len, int flags); +ssize_t (*k_recvfrom)(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); +ssize_t (*k_recvmsg)(int sockfd, struct msghdr *msg, int flags); +ssize_t (*k_read)(int fd, void *buf, size_t count); +ssize_t (*k_readv)(int fd, const struct iovec *iov, int iovcnt); +ssize_t (*k_send)(int sockfd, const void *buf, size_t len, int flags); +ssize_t (*k_sendto)(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); +ssize_t (*k_sendmsg)(int sockfd, const struct msghdr *msg, int flags); +ssize_t (*k_write)(int fd, const void *buf, size_t count); +ssize_t (*k_writev)(int fd, const struct iovec *iov, int iovcnt); +ssize_t (*k_splice)(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); +ssize_t (*k_sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); +int (*k_getsockname)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int (*k_getpeername)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); + +#define INIT_FUNC(func, handle) do { \ + k_##func = dlsym(handle, #func); \ + if ((error = dlerror()) != NULL) { \ + rte_panic(#func "is not init"); \ + } \ + RTE_ASSERT(k_##func); \ +} while (0) + +#endif + +void +symbol_init(void) +{ +#ifdef PRELOAD + void *handle; + char *error; + + TRACE("in %s", __func__); + + handle = dlopen("libc.so.6", RTLD_NOW); + error = dlerror(); + if (!handle) { + fprintf(stderr, "%s\n", error); + exit(EXIT_FAILURE); + } + + INIT_FUNC(epoll_create, handle); + INIT_FUNC(epoll_create1, handle); + INIT_FUNC(epoll_create1, handle); + INIT_FUNC(epoll_ctl, handle); + INIT_FUNC(epoll_wait, handle); + INIT_FUNC(epoll_pwait, handle); + INIT_FUNC(socket, handle); + INIT_FUNC(listen, handle); + INIT_FUNC(bind, handle); + INIT_FUNC(accept, handle); + INIT_FUNC(accept4, handle); + INIT_FUNC(connect, handle); + INIT_FUNC(getsockopt, handle); + INIT_FUNC(setsockopt, handle); + INIT_FUNC(fcntl, handle); + INIT_FUNC(ioctl, handle); + INIT_FUNC(shutdown, handle); + INIT_FUNC(close, handle); + INIT_FUNC(recv, handle); + INIT_FUNC(recvfrom, handle); + INIT_FUNC(recvmsg, handle); + INIT_FUNC(read, handle); + INIT_FUNC(readv, handle); + INIT_FUNC(send, handle); + INIT_FUNC(sendto, handle); + INIT_FUNC(sendmsg, handle); + INIT_FUNC(write, handle); + INIT_FUNC(writev, handle); + INIT_FUNC(splice, handle); + INIT_FUNC(sendfile, handle); + INIT_FUNC(poll, handle); + INIT_FUNC(getsockname, handle); + INIT_FUNC(getpeername, handle); + INIT_FUNC(select, handle); + INIT_FUNC(pselect, handle); + + dlclose(handle); +#endif +} diff --git a/lib/libtle_glue/sym.h b/lib/libtle_glue/sym.h new file mode 100644 index 0000000..b5a333d --- /dev/null +++ b/lib/libtle_glue/sym.h @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_KSYM_H_ +#define _TLE_KSYM_H_ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <sys/socket.h> + +#include <sys/epoll.h> +#include <unistd.h> +#include <sys/types.h> +#include <poll.h> +#include <sys/uio.h> +#include <sys/sendfile.h> +#include <sys/select.h> +#include <sys/time.h> + +#include "tle_glue.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void symbol_init(void); + +#ifdef PRELOAD +int (*k_epoll_create)(int size); +int (*k_epoll_create1)(int flags); +int (*k_epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); +int (*k_epoll_wait)(int epfd, struct epoll_event *events, int maxevents, int timeout); +int (*k_epoll_pwait)(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask); +int (*k_poll)(struct pollfd *fds, nfds_t nfds, int timeout); +int (*k_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); +int (*k_pselect)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask); + +int (*k_socket)(int domain, int type, int protocol); +int (*k_listen)(int sockfd, int backlog); +int (*k_bind)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int (*k_accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int (*k_accept4)(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); +int (*k_connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int (*k_getsockopt)(int sockfd, int level, int optname, void *optval, socklen_t *optlen); +int (*k_setsockopt)(int sockfd, int level, int optname, const void *optval, socklen_t optlen); +int (*k_fcntl)(int fd, int cmd, ... /* arg */ ); +int (*k_ioctl)(int d, int request, ...); +int (*k_shutdown)(int sockfd, int how); +int (*k_close)(int fd); +ssize_t (*k_recv)(int sockfd, void *buf, size_t len, int flags); +ssize_t (*k_recvfrom)(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); +ssize_t (*k_recvmsg)(int sockfd, struct msghdr *msg, int flags); +ssize_t (*k_read)(int fd, void *buf, size_t count); +ssize_t (*k_readv)(int fd, const struct iovec *iov, int iovcnt); +ssize_t (*k_send)(int sockfd, const void *buf, size_t len, int flags); +ssize_t (*k_sendto)(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); +ssize_t (*k_sendmsg)(int sockfd, const struct msghdr *msg, int flags); +ssize_t (*k_write)(int fd, const void *buf, size_t count); +ssize_t (*k_writev)(int fd, const struct iovec *iov, int iovcnt); +ssize_t (*k_splice)(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); +ssize_t (*k_sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); +int (*k_getsockname)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int (*k_getpeername)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +#else +#define k_epoll_create epoll_create +#define k_epoll_create1 epoll_create1 +#define k_epoll_ctl epoll_ctl +#define k_epoll_wait epoll_wait +#define k_epoll_pwait epoll_pwait +#define k_poll poll +#define k_select select +#define k_pselect pselect +#define k_socket socket +#define k_listen listen +#define k_bind bind +#define k_accept accept +#define k_accept4 accept4 +#define k_connect connect +#define k_getsockopt getsockopt +#define k_setsockopt setsockopt +#define k_fcntl fcntl +#define k_ioctl ioctl +#define k_shutdown shutdown +#define k_close close +#define k_recv recv +#define k_recvfrom recvfrom +#define k_recvmsg recvmsg +#define k_read read +#define k_readv readv +#define k_send send +#define k_sendto sendto +#define k_sendmsg sendmsg +#define k_write write +#define k_writev writev +#define k_splice splice +#define k_sendfile sendfile +#define k_getsockname getsockname +#define k_getpeername getpeername +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_KSYM_H_ */ diff --git a/lib/libtle_glue/tcp.c b/lib/libtle_glue/tcp.c new file mode 100644 index 0000000..e5186c0 --- /dev/null +++ b/lib/libtle_glue/tcp.c @@ -0,0 +1,558 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdarg.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/tcp.h> +#include <netinet/udp.h> + +#include <tle_tcp.h> + +#include "sym.h" +#include "fd.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "sock.h" + +#define MAX_TCP_KEEPIDLE 32767 +#define MAX_TCP_KEEPINTVL 32767 +#define MAX_TCP_KEEPCNT 127 + +static inline void +foo_support(const char *msg) +{ + GLUE_LOG(WARNING, "%s, return ok without really supporting it", msg); +} + +static int +tcp_setsockopt(struct sock *sk, int optname, + const void *optval, socklen_t optlen) +{ + int val; + + val = 0; /* just to make compiler happy */ + if (optlen == sizeof(val)) + val = *(const int *)optval; + + /* man tcp(7) or see /usr/include/netinet/tcp.h */ + switch (optname) { + case TCP_NODELAY: /* antonym: TCP_CORK */ + if (val == 0) + sk->option.tcpnodelay = 0; + else + sk->option.tcpnodelay = 1; + if (sk->s != NULL) + sk->s->option.tcpnodelay = sk->option.tcpnodelay; + return 0; + case TCP_CORK: + if (val == 0) + sk->option.tcpcork = 0; + else + sk->option.tcpcork = 1; + if (sk->s != NULL) + sk->s->option.tcpcork = sk->option.tcpcork; + return 0; + case TCP_KEEPIDLE: + if (val <= 0 || val > MAX_TCP_KEEPIDLE) { + errno = EINVAL; + return -1; + } + sk->option.keepidle = val; + if (sk->s != NULL) { + sk->s->option.keepidle = sk->option.keepidle; + tle_tcp_stream_set_keepalive(sk->s); + } + return 0; + case TCP_KEEPINTVL: + if (val <= 0 || val > MAX_TCP_KEEPINTVL) { + errno = EINVAL; + return -1; + } + sk->option.keepintvl = val; + if (sk->s != NULL) { + sk->s->option.keepintvl = sk->option.keepintvl; + tle_tcp_stream_set_keepalive(sk->s); + } + return 0; + case TCP_KEEPCNT: + if (val <= 0 || val > MAX_TCP_KEEPCNT) { + errno = EINVAL; + return -1; + } + sk->option.keepcnt = val; + if (sk->s != NULL) + sk->s->option.keepcnt = sk->option.keepcnt; + return 0; + case TCP_USER_TIMEOUT: + foo_support("set TCP_USER_TIMEOUT"); + return 0; + case TCP_DEFER_ACCEPT: + if (val == 0) + return 0; + break; + case TCP_FASTOPEN: + case TCP_FASTOPEN_CONNECT: + if (val == 0) + return 0; + break; + case TCP_QUICKACK: + /* Based on below info, it's safe to just return 0: + * "This flag is not permanent, it only enables a + * switch to or from quickack mode. Subsequent + * operationof the TCP protocol will once again ..." + */ + if (val == 0) + sk->option.tcpquickack = 0; + else + sk->option.tcpquickack = 8; + if (sk->s != NULL) + sk->s->option.tcpquickack = sk->option.tcpquickack; + return 0; + case TCP_CONGESTION: + /* only support NewReno; but we return success for + * any kind of setting. + */ + foo_support("set TCP_CONGESTION"); + return 0; + default: + break; + } + + GLUE_LOG(WARNING, "setsockopt(%d) with level = SOL_TCP, optname = %d\n", + sock2fd(sk), optname); + errno = EOPNOTSUPP; + return -1; +} + +static int +tcp_getsockopt(struct sock *sk, int optname, + void *optval, socklen_t *optlen) +{ + int rc; + union { + int val; + uint64_t val64; + struct linger ling; + struct timeval tm; + } *p = optval; + + RTE_SET_USED(optlen); + + /* man tcp(7) or see /usr/include/netinet/tcp.h */ + switch (optname) { + case TCP_MAXSEG: + p->val = 64 * 1024; + return 0; + case TCP_FASTOPEN: + case TCP_FASTOPEN_CONNECT: + p->val = 0; + return 0; + case TCP_INFO: + /* needed by netperf */ + rc = tle_tcp_stream_get_info(sk->s, optval, optlen); + if (rc < 0) { + errno = -rc; + return -1; + } + return 0; + case TCP_CONGESTION: + strncpy(optval, "NewReno", *optlen); + ((char *)optval)[*optlen - 1] = '\0'; + return 0; + case TCP_CORK: + p->val = sk->option.tcpcork; + return 0; + case TCP_QUICKACK: + p->val = sk->option.tcpquickack != 0 ? 1 : 0; + return 0; + case TCP_NODELAY: + p->val = sk->option.tcpnodelay; + return 0; + case TCP_KEEPIDLE: + p->val = sk->option.keepidle; + return 0; + case TCP_KEEPINTVL: + p->val = sk->option.keepintvl; + return 0; + case TCP_KEEPCNT: + p->val = sk->option.keepcnt; + return 0; + default: + break; + } + + GLUE_LOG(WARNING, "getsockopt(%d) with level = SOL_TCP, optname = %d", + sock2fd(sk), optname); + errno = EOPNOTSUPP; + return -1; +} + +static int +tcp_getname(struct sock *sk, struct sockaddr *addr, int peer) +{ + int rc; + int addrlen; + struct tle_tcp_stream_addr a; + + rc = tle_tcp_stream_get_addr(sk->s, &a); + if (rc) { + errno = -rc; + return -1; + } + + if (a.local.ss_family == AF_INET) + addrlen = sizeof(struct sockaddr_in); + else + addrlen = sizeof(struct sockaddr_in6); + + if (peer) + memcpy(addr, &a.remote, addrlen); + else + memcpy(addr, &a.local, addrlen); + + addr->sa_family = a.local.ss_family; + + return 0; +} + +static int +tcp_bind(struct sock *sk, const struct sockaddr *addr) +{ + sk->s = open_bind(sk, addr, NULL); + if (sk->s == NULL) + return -1; + return 0; +} + +static int +tcp_listen(struct sock *sk, int backlog) +{ + int32_t rc; + + if (backlog < 0) { + errno = EINVAL; + return -1; + } + + /* + * if socket is unbind, should call open_bind to assign an ramdon addres + * before listening + */ + if (sk->s == NULL) { + sk->s = open_bind(sk, NULL, NULL); + if (sk->s == NULL) + return -1; + } + + rc = tle_tcp_stream_listen(sk->s); + if (rc) { + errno = -rc; + return -1; + } + + return 0; +} + +static int +tcp_connect(struct sock *sk, const struct sockaddr *addr) +{ + int rc; + int rx; + int ret; + struct epoll_event event; + struct sockaddr_storage laddr; + struct sockaddr_storage raddr; + struct sockaddr_in *addr4; + struct sockaddr_in6 *addr6; + struct sockaddr *local = NULL; + + /* TODO: For multi-thread case, we shall properly manage local + * L4 port so that packets coming back can be put into the same + * queue pair. + */ + if (sk->s) { + struct tle_tcp_stream *ts = TCP_STREAM(sk->s); + /* case 1: bind happens before connect; + * case 2: connect after a previous connect, failed + * or succeeded. + */ + if (ts->tcb.err != 0) { + errno = ts->tcb.err; + return -1; + } + + int state = ts->tcb.state; + + if (state >= TCP_ST_ESTABLISHED && sk->tcp_connected == 0) { + sk->tcp_connected = 1; + return 0; /* connect succeeds */ + } + + if (state == TCP_ST_CLOSED) { + if (tcp_getname(sk, (struct sockaddr *)&laddr, 0) == 0) + local = (struct sockaddr *)&laddr; + tle_tcp_stream_close(sk->s); + sk->s = NULL; + goto do_connect; /* case 1 */ + } else if (state >= TCP_ST_SYN_SENT && + state < TCP_ST_ESTABLISHED) + errno = EALREADY; + else if (state >= TCP_ST_ESTABLISHED) + errno = EISCONN; + else + errno = EINVAL; + return -1; + } + +do_connect: + sk->s = open_bind(sk, local, addr); + if (sk->s == NULL) /* errno is set */ + return -1; + + if (sk->domain == AF_INET) { + addr4 = (struct sockaddr_in*)&raddr; + addr4->sin_family = AF_INET; + addr4->sin_port = sk->s->port.src; + addr4->sin_addr.s_addr = sk->s->ipv4.addr.src; + } else { + addr6 = (struct sockaddr_in6*)&raddr; + addr6->sin6_family = AF_INET6; + addr6->sin6_port = sk->s->port.src; + rte_memcpy(&addr6->sin6_addr, &sk->s->ipv6.addr.src, + sizeof(struct in6_addr)); + } + rc = tle_tcp_stream_connect(sk->s, (const struct sockaddr*)&raddr); + if (rc < 0) { + errno = -rc; + return -1; + } + + if (is_nonblock(sk, 0)) { + be_tx_with_lock(CTX(sk)); + errno = EINPROGRESS; /* It could not be ready so fast */ + return -1; + } + + do { + be_process(CTX(sk)); + + if (tle_event_state(&sk->txev) == TLE_SEV_UP) { + sk->tcp_connected = 1; + tle_event_down(&sk->txev); + ret = 0; + break; + } + + if (tle_event_state(&sk->erev) == TLE_SEV_UP) { + tle_event_down(&sk->erev); + errno = ECONNREFUSED; + ret = -1; + break; + } + + /* fix me: timeout? */ + epoll_kernel_wait(CTX(sk), -1, &event, 1, 1, &rx); + } while (1); + + return ret; +} + +static void tcp_update_cfg(struct sock *sk); + +static int +tcp_accept(struct sock *sk, struct sockaddr *addr, + socklen_t *addrlen, int flags) +{ + int fd; + int rx; + struct sock *newsk; + struct tle_stream *rs; + struct epoll_event event; + struct tle_tcp_stream_addr a; + + if (sk->s == NULL) { + errno = EINVAL; + return -1; + } + + fd = get_unused_fd(); + if (fd < 0) { + errno = ENFILE; + return -1; + } + + newsk = fd2sock(fd); +again: + if (tle_tcp_stream_accept(sk->s, &rs, 1) == 0) { + if (rte_errno != EAGAIN) { + errno = rte_errno; + return -1; + } + + if (is_nonblock(sk, flags)) { + newsk->valid = 0; + put_free_fd(fd); + errno = EAGAIN; + return -1; + } + + epoll_kernel_wait(CTX(sk), -1, &event, 1, 1, &rx); + be_process(CTX(sk)); + goto again; + } + + newsk->s = rs; + newsk->cid = sk->cid; + newsk->domain = sk->domain; + newsk->proto = sk->proto; + newsk->option.raw = 0; + newsk->option.tcpquickack = 1; + newsk->option.mulloop = 1; + newsk->option.multtl = 1; + newsk->option.keepidle = 2 * 60 * 60; + newsk->option.keepintvl = 75; + newsk->option.keepcnt = 9; + newsk->s->option.raw = newsk->option.raw; + sock_alloc_events(newsk); + tcp_update_cfg(newsk); + + if (addr) { + /* We assume this function never fails */ + tle_tcp_stream_get_addr(rs, &a); + + *addrlen = sizeof(struct sockaddr_in); + memcpy(addr, &a.remote, *addrlen); + } + + GLUE_DEBUG("accept fd = %d", fd); + return fd; +} + +static ssize_t +tcp_send(struct sock *sk, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + uint16_t rc; + RTE_SET_USED(dst_addr); + + if (sk->s == NULL) { + errno = EPIPE; + return 0; + } + + rc = tle_tcp_stream_send(sk->s, pkt, num); + if (rc == 0) + errno = rte_errno; + return rc; +} + +static ssize_t +tcp_recv(struct tle_stream *s, struct rte_mbuf *pkt[], + uint16_t num, struct sockaddr *addr) +{ + uint16_t rc; + + RTE_SET_USED(addr); + + /* optimize me: merge multiple mbufs into one */ + rc = tle_tcp_stream_recv(s, pkt, num); + if (rc == 0) + errno = rte_errno; + + return rc; +} + +static ssize_t +tcp_readv(struct tle_stream *ts, struct msghdr *msg, int flags __rte_unused) +{ + ssize_t rc; + + rc = tle_tcp_stream_recvmsg(ts, msg); + if (rc < 0) + errno = rte_errno; + return rc; +} + +static ssize_t +tcp_writev(struct sock *sk, const struct iovec *iov, + int iovcnt, const struct sockaddr *dst_addr) +{ + ssize_t rc; + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + + RTE_SET_USED(dst_addr); + + if (sk->s == NULL) { + errno = EPIPE; + return -1; + } + + rc = tle_tcp_stream_writev(sk->s, mp, iov, iovcnt); + if (rc < 0) + errno = rte_errno; + return rc; +} + +static int +tcp_shutdown(struct sock *sk, int how) +{ + int ret; + + /* Refer to linux/net/ipv4/tcp.c:tcp_shutdown() */ + if (how == SHUT_RD) + return 0; + + ret = tle_tcp_stream_shutdown(sk->s, how); + if (ret < 0) + errno = rte_errno; + else + be_tx_with_lock(CTX(sk)); /* Make sure fin is sent */ + return ret; + +} + +static void +tcp_update_cfg(struct sock *sk) +{ + struct tle_tcp_stream_cfg prm = {0}; + + prm.recv_ev = &sk->rxev; + prm.send_ev = &sk->txev; + prm.err_ev = &sk->erev; + tle_tcp_stream_update_cfg(&sk->s, &prm, 1); +} + +struct proto tcp_prot = { + .name = "TCP", + .setsockopt = tcp_setsockopt, + .getsockopt = tcp_getsockopt, + .getname = tcp_getname, + .bind = tcp_bind, + .listen = tcp_listen, + .connect = tcp_connect, + .accept = tcp_accept, + .recv = tcp_recv, + .send = tcp_send, + .readv = tcp_readv, + .writev = tcp_writev, + .shutdown = tcp_shutdown, + .close = tle_tcp_stream_close, + .update_cfg = tcp_update_cfg, +}; diff --git a/lib/libtle_glue/tle_glue.h b/lib/libtle_glue/tle_glue.h new file mode 100644 index 0000000..38357e4 --- /dev/null +++ b/lib/libtle_glue/tle_glue.h @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_H_ +#define _TLE_GLUE_H_ + +#include <sys/types.h> +#include <sys/epoll.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <signal.h> +#include <poll.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef PRELOAD + +#define PRE(name) name + +#else + +#define PRE(name) tle_ ## name + +#endif + +void glue_init1(int argc, char **argv); + +/* epoll */ +int PRE(epoll_create)(int size); +int PRE(epoll_create1)(int flags); +int PRE(epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); +int PRE(epoll_wait)(int epfd, struct epoll_event *events, int maxevents, int timeout); +int PRE(epoll_pwait)(int epfd, struct epoll_event *events, + int maxevents, int timeout, const sigset_t *sigmask); + +/* for setup, settings, and destroy */ +int PRE(socket)(int domain, int type, int protocol); +int PRE(listen)(int sockfd, int backlog); +int PRE(bind)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int PRE(accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int PRE(accept4)(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); +int PRE(connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); +int PRE(getsockopt)(int sockfd, int level, int optname, + void *optval, socklen_t *optlen); +int PRE(setsockopt)(int sockfd, int level, int optname, + const void *optval, socklen_t optlen); +int PRE(getsockname)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int PRE(getpeername)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); +int PRE(fcntl)(int fd, int cmd, ... /* arg */ ); +int PRE(ioctl)(int d, unsigned long int request, ...); +int PRE(shutdown)(int sockfd, int how); +int PRE(close)(int fd); + +/* for recv */ +ssize_t PRE(recv)(int sockfd, void *buf, size_t len, int flags); +ssize_t PRE(recvfrom)(int sockfd, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen); +ssize_t PRE(recvmsg)(int sockfd, struct msghdr *msg, int flags); +ssize_t PRE(read)(int fd, void *buf, size_t count); +ssize_t PRE(readv)(int fd, const struct iovec *iov, int iovcnt); + +/* for send */ +ssize_t PRE(send)(int sockfd, const void *buf, size_t len, int flags); +ssize_t PRE(sendto)(int sockfd, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen); +ssize_t PRE(sendmsg)(int sockfd, const struct msghdr *msg, int flags); +ssize_t PRE(write)(int fd, const void *buf, size_t count); +ssize_t PRE(writev)(int fd, const struct iovec *iov, int iovcnt); + +/* advanced functions */ +ssize_t PRE(splice)(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags); +ssize_t PRE(sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); + +/* poll */ +int PRE(poll)(struct pollfd *fds, nfds_t nfds, int timeout); +int PRE(ppoll)(struct pollfd *fds, nfds_t nfds, + const struct timespec *tmo_p, const sigset_t *sigmask); + +/* select */ +int PRE(select)(int nfds, fd_set *readfds, fd_set *writefds, + fd_set *exceptfds, struct timeval *timeout); +int PRE(pselect)(int nfds, fd_set *readfds, fd_set *writefds, + fd_set *exceptfds, const struct timespec *timeout, + const sigset_t *sigmask); + +/* non-posix APIs */ +int fd_ready(int fd, int events); +void v_get_stats_snmp(unsigned long mibs[]); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_GLUE_H_ */ diff --git a/lib/libtle_glue/udp.c b/lib/libtle_glue/udp.c new file mode 100644 index 0000000..9f199bc --- /dev/null +++ b/lib/libtle_glue/udp.c @@ -0,0 +1,419 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdarg.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> + +#include <rte_ethdev.h> +#include <tle_udp.h> + +#include "sym.h" +#include "fd.h" +#include "log.h" +#include "util.h" +#include "internal.h" +#include "sock.h" + +static int +udp_setsockopt(__rte_unused struct sock *sk, __rte_unused int optname, + __rte_unused const void *optval, __rte_unused socklen_t optlen) +{ + return 0; +} + +static int +udp_getsockopt(__rte_unused struct sock *sk, __rte_unused int optname, + __rte_unused void *optval, __rte_unused socklen_t *optlen) +{ + return 0; +} + +static int +udp_getname(struct sock *sk, struct sockaddr *addr, int peer) +{ + struct tle_udp_stream_param p; + size_t addrlen; + int rc; + + rc = tle_udp_stream_get_param(sk->s, &p); + if (rc) { + errno = -rc; + return -1; + } + + addrlen = get_sockaddr_len(sk->domain); + if (peer) + memcpy(addr, &p.remote_addr, addrlen); + else + memcpy(addr, &p.local_addr, addrlen); + addr->sa_family = p.local_addr.ss_family; + return 0; +} + +static int +udp_bind(struct sock *sk, const struct sockaddr *addr) +{ + if (sk->ubind) { + errno = EINVAL; + return -1; + } + + sk->s = open_bind(sk, addr, NULL); + if (sk->s != NULL) { + sk->ubind = 1; + if (is_any_addr(addr)) + sk->ubindany = 1; + return 0; + } + + return -1; +} + +static int +udp_connect(struct sock *sk, const struct sockaddr *addr) +{ + struct sockaddr_storage laddr; + + /* According to linux manual, connectionless sockets may dissolve the + * association by connecting to an address with the sa_family member of + * sockaddr set to AF_UNSPEC (supported on Linux since kernel 2.2). + */ + if (sk->ubind) { + if (udp_getname(sk, (struct sockaddr *)&laddr, 0)) + return -1; + if (addr->sa_family == AF_UNSPEC) { + addr = NULL; + if (sk->ubindany) + set_any_addr((struct sockaddr *)&laddr); + } + sk->s = open_bind(sk, (const struct sockaddr *)&laddr, addr); + } else { + if (addr->sa_family == AF_UNSPEC) { + tle_udp_stream_close(sk->s); + sk->s = NULL; + return 0; + } + sk->s = open_bind(sk, NULL, addr); + } + + if (sk->s) + return 0; + + return -1; +} + +static int +udp_addr_prepare(struct sock *sk, const struct sockaddr **p_dst_addr, + struct sockaddr_storage *addr) +{ + const struct sockaddr *dst_addr = *p_dst_addr; + + if (dst_addr != NULL && + dst_addr->sa_family == AF_INET6 && + IN6_IS_ADDR_V4MAPPED(&((const struct sockaddr_in6 *)dst_addr)->sin6_addr)) { + rte_memcpy(addr, dst_addr, sizeof(struct sockaddr_in6)); + dst_addr = (const struct sockaddr*)(addr); + *p_dst_addr = dst_addr; + retrans_4mapped6_addr((struct sockaddr_storage*)(addr)); + } + + if (sk->s == NULL) { + if (dst_addr == NULL) { + errno = EDESTADDRREQ; + return -1; + } + + sk->s = open_bind(sk, NULL, dst_addr); + if (sk->s == NULL) /* errno is set */ + return -1; + } else if (dst_addr != NULL) { + if (dst_addr->sa_family == AF_INET6 && sk->domain == AF_INET) { + errno = EINVAL; + return -1; + } + if (dst_addr->sa_family == AF_INET && sk->domain == AF_INET6) { + if (IN6_IS_ADDR_UNSPECIFIED(&sk->s->ipv6.addr.dst)) { + sk->s->type = TLE_V4; + sk->s->ipv4.addr.dst = 0; + } else { + errno = ENETUNREACH; + return -1; + } + } + } + + return 0; +} + +/* abstract client info from mbuf into s */ +static inline void +udp_pkt_addr(const struct rte_mbuf *m, struct sockaddr *addr, + __rte_unused uint16_t family) +{ + const struct ipv4_hdr *ip4h; + const struct ipv6_hdr *ip6h; + const struct udp_hdr *udph; + struct sockaddr_in *in4; + struct sockaddr_in6 *in6; + int off = -(m->l4_len + m->l3_len); + + udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len); + ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, off); + if ((ip4h->version_ihl>>4) == 4) { + addr->sa_family = AF_INET; + in4 = (struct sockaddr_in *)addr; + in4->sin_port = udph->src_port; + in4->sin_addr.s_addr = ip4h->src_addr; + } else { + addr->sa_family = AF_INET6; + ip6h = (const struct ipv6_hdr*)ip4h; + in6 = (struct sockaddr_in6 *)addr; + in6->sin6_port = udph->src_port; + rte_memcpy(&in6->sin6_addr, ip6h->src_addr, + sizeof(in6->sin6_addr)); + } +} + +static ssize_t +udp_send(struct sock *sk, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + uint16_t i; + struct sockaddr_storage addr; + + if (udp_addr_prepare(sk, &dst_addr, &addr) != 0) + return 0; + + /* chain them together as *one* message */ + for (i = 1; i < num; ++i) { + pkt[i-1]->next = pkt[i]; + pkt[0]->pkt_len += pkt[i]->pkt_len; + } + pkt[0]->nb_segs = num; + + if (tle_udp_stream_send(sk->s, &pkt[0], 1, dst_addr) == 0) { + errno = rte_errno; + return 0; + } + + return num; +} + +static ssize_t +udp_readv(struct tle_stream *s, struct msghdr *msg, int flags) +{ + int i; + ssize_t sz; + uint16_t rc; + uint32_t fin; + struct iovec iv; + struct rte_mbuf *m; + const struct iovec *iov = msg->msg_iov; + int iovcnt = msg->msg_iovlen; + + rc = tle_udp_stream_recv(s, &m, 1); + if (rc == 0) { + errno = rte_errno; + return -1; + } + + if (!s->option.timestamp) + s->timestamp = m->timestamp; + if (msg != NULL && msg->msg_control != NULL) { + if (s->option.timestamp) + tle_set_timestamp(msg, m); + else + msg->msg_controllen = 0; + } + + if (msg != NULL && msg->msg_name != NULL) { + udp_pkt_addr(m, (struct sockaddr*)msg->msg_name, 0); + if (((struct sockaddr *)msg->msg_name)->sa_family == AF_INET) + msg->msg_namelen = sizeof(struct sockaddr_in); + else + msg->msg_namelen = sizeof(struct sockaddr_in6); + } + + for (i = 0, sz = 0; i != iovcnt; i++) { + iv = iov[i]; + sz += iv.iov_len; + fin = _mbus_to_iovec(&iv, &m, 1); + if (fin == 1) { + sz -= iv.iov_len; + break; + } + } + if (fin == 0) { + if (flags & MSG_TRUNC) + sz += m->pkt_len; + rte_pktmbuf_free_seg(m); + msg->msg_flags |= MSG_TRUNC; + } + return sz; +} + +static ssize_t +udp_writev(struct sock *sk, const struct iovec *iov, + int iovcnt, const struct sockaddr *dst_addr) +{ + struct rte_mempool *mp = get_mempool_by_socket(0); /* fix me */ + struct sockaddr_storage addr; + uint32_t slen, left_m, left_b, copy_len, left; + uint16_t i, rc, nb_mbufs; + char *dst, *src; + uint64_t ufo; + size_t total; + int j; + + if (udp_addr_prepare(sk, &dst_addr, &addr) != 0) + return -1; + + for (j = 0, total = 0; j < iovcnt; ++j) + total += iov[j].iov_len; + + ufo = tx_offload & DEV_TX_OFFLOAD_UDP_TSO; + if (ufo) + slen = RTE_MBUF_DEFAULT_DATAROOM; + else + slen = 1500 - 20; /* mtu - ip_hdr_len */ + + nb_mbufs = (total + 8 + slen - 1) / slen; + struct rte_mbuf *mbufs[nb_mbufs]; + if (unlikely(rte_pktmbuf_alloc_bulk(mp, mbufs, nb_mbufs) != 0)) { + errno = ENOMEM; + return -1; + } + + left_b = iov[0].iov_len; + for (i = 0, j = 0; i < nb_mbufs && j < iovcnt; ++i) { + /* first frag has udp hdr, its payload is 8 bytes less */ + if (i == 0) + slen -= 8; + else if (i == 1) + slen += 8; + left_m = slen; + while (left_m > 0 && j < iovcnt) { + copy_len = RTE_MIN(left_m, left_b); + dst = rte_pktmbuf_mtod_offset(mbufs[i], char *, + slen - left_m); + src = (char *)iov[j].iov_base + iov[j].iov_len - left_b; + rte_memcpy(dst, src, copy_len); + + left_m -= copy_len; + left_b -= copy_len; + if (left_b == 0) { + j++; + left_b = iov[j].iov_len; + } + } + mbufs[i]->data_len = slen; + mbufs[i]->pkt_len = slen; + } + + /* last seg */ + if (nb_mbufs == 1) { + mbufs[nb_mbufs - 1]->data_len = total; + mbufs[nb_mbufs - 1]->pkt_len = total; + } else { + mbufs[nb_mbufs - 1]->data_len = total - (nb_mbufs - 1) * slen + 8; + mbufs[nb_mbufs - 1]->pkt_len = total - (nb_mbufs - 1) * slen + 8; + } + + /* chain as *one* message */ + for (i = 1; i < nb_mbufs; ++i) + mbufs[i-1]->next = mbufs[i]; + mbufs[0]->nb_segs = nb_mbufs; + mbufs[0]->pkt_len = total; + nb_mbufs = 1; + + rc = tle_udp_stream_send(sk->s, mbufs, nb_mbufs, dst_addr); + for (i = rc, left = 0; i < nb_mbufs; ++i) { + left += mbufs[i]->pkt_len; + rte_pktmbuf_free(mbufs[i]); + } + + if (rc == 0) { + errno = rte_errno; + return -1; + } + + return total - left; +} + +static ssize_t +udp_recv(struct tle_stream *s, struct rte_mbuf *pkt[], uint16_t num, + struct sockaddr *addr) +{ + uint16_t rc; + + rc = tle_udp_stream_recv(s, pkt, num); + if (addr && num == 1 && rc == 1) + udp_pkt_addr(pkt[0], addr, 0); + + if (rc == 0) + errno = rte_errno; + return rc; +} + +static void +udp_update_cfg(struct sock *sk) +{ + struct tle_udp_stream_param prm; + memset(&prm, 0, sizeof(prm)); + + prm.recv_ev = &sk->rxev; + prm.send_ev = &sk->txev; + + tle_udp_stream_update_cfg(&sk->s, &prm, 1); +} + +static int +udp_shutdown(struct sock *sk, int how) +{ + int rc; + + if (sk->s == NULL) { + errno = ENOTCONN; + return -1; + } + + rc = tle_udp_stream_shutdown(sk->s, how); + if (rc < 0) { + errno = -rc; + return -1; + } + return 0; +} + +struct proto udp_prot = { + .name = "UDP", + .setsockopt = udp_setsockopt, + .getsockopt = udp_getsockopt, + .getname = udp_getname, + .bind = udp_bind, + .connect = udp_connect, + .recv = udp_recv, + .send = udp_send, + .readv = udp_readv, + .writev = udp_writev, + .shutdown = udp_shutdown, + .close = tle_udp_stream_close, + .update_cfg = udp_update_cfg, +}; diff --git a/lib/libtle_glue/util.c b/lib/libtle_glue/util.c new file mode 100644 index 0000000..69fc555 --- /dev/null +++ b/lib/libtle_glue/util.c @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <pthread.h> +#include <sched.h> +#include <unistd.h> + +#include "util.h" + +#define NUMA_NODE_PATH "/sys/devices/system/node" + +static unsigned +eal_cpu_socket_id(unsigned lcore_id) +{ + unsigned socket; + char path[PATH_MAX]; + + for (socket = 0; socket < RTE_MAX_NUMA_NODES; socket++) { + snprintf(path, sizeof(path), "%s/node%u/cpu%u", NUMA_NODE_PATH, + socket, lcore_id); + if (access(path, F_OK) == 0) + return socket; + } + return 0; +} + +uint32_t +get_socket_id(void) +{ + int err; + uint32_t i; + cpu_set_t cpuset; + + CPU_ZERO(&cpuset); + err = pthread_getaffinity_np(pthread_self(), + sizeof(cpuset), &cpuset); + if (err) + return 0; + + for (i = 0; i < CPU_SETSIZE; i++) + if (CPU_ISSET(i, &cpuset)) + break; + + return eal_cpu_socket_id(i); +} diff --git a/lib/libtle_glue/util.h b/lib/libtle_glue/util.h new file mode 100644 index 0000000..ac67d8b --- /dev/null +++ b/lib/libtle_glue/util.h @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_UTIL_H_ +#define _TLE_GLUE_UTIL_H_ + +#include <stdarg.h> +#include <stdint.h> +#include <stdio.h> +#include <string.h> + +#include <tle_tcp.h> +#include <tle_udp.h> + +#include "../libtle_l4p/tcp_stream.h" + +#include "fd.h" +#include "ctx.h" +#include "sock.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline void * +xstrdup(const void *old) +{ + void *new = strdup(old); + if (unlikely(new == NULL)) + rte_panic("Failed to strdup"); + return new; +} + +static inline void * +xmalloc(size_t size) +{ + void *p = malloc(size ? size : 1); + if (p == NULL) + rte_panic("Failed to malloc"); + return p; +} + +static inline char * +xvasprintf(const char *format, va_list args) +{ + va_list args2; + size_t needed; + char *s; + + va_copy(args2, args); + needed = vsnprintf(NULL, 0, format, args); + + s = xmalloc(needed + 1); + + vsnprintf(s, needed + 1, format, args2); + va_end(args2); + + return s; +} + +static inline char * +xasprintf(const char *format, ...) +{ + va_list args; + char *s; + + va_start(args, format); + s = xvasprintf(format, args); + va_end(args); + + return s; +} + +static inline char ** +grow_argv(char **argv, size_t cur_siz, size_t grow_by) +{ + char **p; + + p = realloc(argv, sizeof(char *) * (cur_siz + grow_by)); + if (unlikely(p == NULL)) + rte_panic("Failed to grow argv"); + return p; +} + +static inline void +release_argv(int argc, char **argv_to_release, char **argv) +{ + int i; + + for (i = 0; i < argc; ++i) + free(argv_to_release[i]); + + free(argv_to_release); + free(argv); +} + +static inline void +tle_event_attach(struct tle_event *ev, struct tle_evq *evq, const void *data) +{ + ev->head = evq; + ev->data = data; +} + +static inline void +sock_alloc_events(struct sock *so) +{ + tle_event_attach(&so->erev, CTX(so)->ereq, so); + tle_event_attach(&so->rxev, CTX(so)->rxeq, so); + tle_event_attach(&so->txev, CTX(so)->txeq, so); + tle_event_active(&so->erev, TLE_SEV_DOWN); +#ifndef LOOK_ASIDE_BACKEND + tle_event_active(&so->rxev, TLE_SEV_DOWN); + tle_event_active(&so->txev, TLE_SEV_DOWN); +#endif +} + +static inline void +sock_active_events(struct sock *so) +{ + tle_event_active(&so->erev, TLE_SEV_DOWN); + tle_event_active(&so->rxev, TLE_SEV_DOWN); + tle_event_active(&so->txev, TLE_SEV_DOWN); +} + +static inline const struct in6_addr* +select_local_addr_v6(const struct sockaddr *remote, struct glue_ctx *ctx) +{ + /* todo: implement route table to decide local address */ + + if (IN6_IS_ADDR_LOOPBACK(&((const struct sockaddr_in6 *)remote) + ->sin6_addr)) + return &in6addr_loopback; + else + return &ctx->ipv6; +} + +static inline in_addr_t +select_local_addr(const struct sockaddr *remote, struct glue_ctx *ctx) +{ + /* todo: implement route table to decide local address */ + in_addr_t remote_addr; + + remote_addr = ((const struct sockaddr_in*)remote)->sin_addr.s_addr; + if (remote_addr == htonl(INADDR_LOOPBACK)) + return htonl(INADDR_LOOPBACK); + else + return ctx->ipv4; +} + +static inline bool +is_any_addr(const struct sockaddr *addr) +{ + const struct sockaddr_in *addr4; + const struct sockaddr_in6 *addr6; + + if (addr->sa_family == AF_INET) { + addr4 = (const struct sockaddr_in *)addr; + if (addr4->sin_addr.s_addr == htonl(INADDR_ANY)) + return true; + else + return false; + } else if (addr->sa_family == AF_INET6) { + addr6 = (const struct sockaddr_in6 *)addr; + if (IN6_IS_ADDR_UNSPECIFIED(&addr6->sin6_addr)) + return true; + else + return false; + } else + return false; +} + +static inline void +set_any_addr(struct sockaddr *addr) +{ + struct sockaddr_in *addr4; + struct sockaddr_in6 *addr6; + + if (addr->sa_family == AF_INET) { + addr4 = (struct sockaddr_in *)addr; + addr4->sin_addr.s_addr = htonl(INADDR_ANY); + } else if (addr->sa_family == AF_INET6) { + addr6 = (struct sockaddr_in6 *)addr; + addr6->sin6_addr = in6addr_any; + } +} + +/* transform an IPv4 address(in struct sockaddr_in) to + * an IPv4 mapped IPv6 address(in struct sockaddr_in6) */ +static inline void +trans_4mapped6_addr(struct sockaddr *addr) +{ + struct sockaddr_in6 *addr6; + + if (addr->sa_family != AF_INET) + return; + + addr6 = (struct sockaddr_in6*)addr; + addr6->sin6_family = AF_INET6; + addr6->sin6_addr.s6_addr32[0] = 0; + addr6->sin6_addr.s6_addr32[1] = 0; + addr6->sin6_addr.s6_addr32[2] = 0xffff0000; + addr6->sin6_addr.s6_addr32[3] = ((struct sockaddr_in*)addr)->sin_addr.s_addr; +} + +/* transform an IPv4 mapped IPv6 address(in struct sockaddr_in6) to + * an IPv4 address(in struct sockaddr_in) */ +static inline void +retrans_4mapped6_addr(struct sockaddr_storage * addr) +{ + struct in6_addr* addr6; + if (addr->ss_family == AF_INET) + return; + + addr6 = &((struct sockaddr_in6*)addr)->sin6_addr; + if(IN6_IS_ADDR_V4MAPPED(addr6)) { + addr->ss_family = AF_INET; + ((struct sockaddr_in*)addr)->sin_addr.s_addr = addr6->__in6_u.__u6_addr32[3]; + } +} + +static inline struct tle_stream * +open_bind(struct sock *so, const struct sockaddr *local, + const struct sockaddr *remote) +{ + struct tle_stream *s; + struct sockaddr_storage *l, *r; + struct sockaddr_in *addr4; + struct sockaddr_in6 *addr6; + struct tle_tcp_stream_param pt = {0}; + struct tle_udp_stream_param pu = {0}; + + if (IS_TCP(so)) { + pt.option = so->option.raw; + l = &pt.addr.local; + r = &pt.addr.remote; + pt.cfg.err_ev = &so->erev; + pt.cfg.recv_ev = &so->rxev; + pt.cfg.send_ev = &so->txev; + } else { + pu.option = so->option.raw; + l = &pu.local_addr; + r = &pu.remote_addr; + pu.recv_ev = &so->rxev; + pu.send_ev = &so->txev; + } + + if (remote) { + memcpy(r, remote, get_sockaddr_len(remote->sa_family)); + retrans_4mapped6_addr(r); + if(r->ss_family == AF_INET) { + addr4 = (struct sockaddr_in*)r; + if (addr4->sin_addr.s_addr == 0) + addr4->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + addr6 = (struct sockaddr_in6*)r; + if (IN6_IS_ADDR_UNSPECIFIED(&addr6->sin6_addr)) + rte_memcpy(&addr6->sin6_addr, &in6addr_loopback, + sizeof(struct in6_addr)); + } + } + + if (local) { + memcpy(l, local, get_sockaddr_len(local->sa_family)); + retrans_4mapped6_addr(l); + } else { + if (remote) + l->ss_family = r->ss_family; + else + l->ss_family = so->domain; + } + + if (!remote) + r->ss_family = l->ss_family; + + /* Endpoints of stream have different socket families */ + if (r->ss_family != l->ss_family) { + if (l->ss_family == AF_INET) { + errno = EINVAL; + return NULL; + } else { + /* if local addr is unbound, convert into remote family */ + if (IN6_IS_ADDR_UNSPECIFIED(&((struct sockaddr_in6*)l)->sin6_addr)) { + l->ss_family = AF_INET; + ((struct sockaddr_in*)l)->sin_addr.s_addr = 0; + } else { + errno = ENETUNREACH; + return NULL; + } + } + } + + if (l->ss_family == AF_INET) { + addr4 = (struct sockaddr_in*)l; + if (addr4->sin_addr.s_addr == htonl(INADDR_ANY) && remote) { + addr4->sin_addr.s_addr = + select_local_addr((struct sockaddr*)r, CTX(so)); + if (addr4->sin_addr.s_addr == htonl(INADDR_ANY)) { + errno = EADDRNOTAVAIL; + return NULL; + } + } + else if (addr4->sin_addr.s_addr != CTX(so)->ipv4 && + addr4->sin_addr.s_addr != htonl(INADDR_LOOPBACK) && + addr4->sin_addr.s_addr != htonl(INADDR_ANY)) { + errno = EADDRNOTAVAIL; + return NULL; + } + } else { + addr6 = (struct sockaddr_in6 *)l; + if (IN6_IS_ADDR_UNSPECIFIED(&addr6->sin6_addr) && remote) { + memcpy(&addr6->sin6_addr, + select_local_addr_v6((struct sockaddr*)r, CTX(so)), + sizeof(struct in6_addr)); + if (IN6_IS_ADDR_UNSPECIFIED(&addr6->sin6_addr)) { + errno = EADDRNOTAVAIL; + return NULL; + } + } + else if (memcmp(&addr6->sin6_addr, &CTX(so)->ipv6, + sizeof(struct in6_addr)) != 0 && + (!IN6_IS_ADDR_LOOPBACK(&addr6->sin6_addr)) && + (!IN6_IS_ADDR_UNSPECIFIED(&addr6->sin6_addr))) { + errno = EADDRNOTAVAIL; + return NULL; + } + } + + if (IS_TCP(so)) + s = tle_tcp_stream_open(CTX(so)->tcp_ctx, &pt); + else { + if (so->s == NULL) + s = tle_udp_stream_open(CTX(so)->udp_ctx, &pu); + else + s = tle_udp_stream_set(so->s, CTX(so)->udp_ctx, &pu); + } + + if (s == NULL) + errno = rte_errno; + + return s; +} + +static inline struct tle_stream * +open_bind_listen(struct sock *so, const struct sockaddr *local) +{ + struct tle_stream *s = open_bind(so, local, NULL); + + if (s == NULL) + return NULL; + + if (tle_tcp_stream_listen(s) != 0) { + tle_tcp_stream_close(s); + return NULL; + } + + return s; +} + +uint32_t get_socket_id(void); + +#ifdef __cplusplus +} +#endif + +#endif /*_TLE_GLUE_UTIL_H_ */ diff --git a/lib/libtle_glue/zerocopy.h b/lib/libtle_glue/zerocopy.h new file mode 100644 index 0000000..a37f8f5 --- /dev/null +++ b/lib/libtle_glue/zerocopy.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2018 Ant Financial Services Group. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_GLUE_ZEROCOPY_H_ +#define _TLE_GLUE_ZEROCOPY_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * This API performs recv operation on specified socket, and it's + * optimized for zero copy, which means the caller does not need to + * prepare the buffer, instead, it will get a pointer on success. + * @param sockfd + * the file descriptor for the socket. + * @param buf + * after successfully receiving some payload, the pointer of the + * received buffer will be stored in *buf. + * @return + * the number of bytes received, or -1 if an error occurred, or 0 + * if a stream socket peer has performed an orderly shutdown. + * + */ +ssize_t recv_zc(int sockfd, void **buf); + +/** + * This API performs send operation on specified socket, and it's + * optimized for zero copy, which means the caller does not need to + * free the buffer, not even touch that buffer even after calling this + * API; the buffer will be freed after an ack from the socket peer. + * @param sockfd + * the file descriptor for the socket. + * @param buf + * The pointer to the payload buffer to be sent. + * @param len + * The length of the payload buffer to be sent. + * @return + * the number of bytes sent, or -1 if an error occurred. + */ +ssize_t send_zc(int sockfd, const void *buf, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif /*_TLE_GLUE_ZEROCOPY_H_ */ |