diff options
author | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2016-06-03 16:43:13 +0100 |
---|---|---|
committer | Konstantin Ananyev <konstantin.ananyev@intel.com> | 2016-06-07 14:17:17 +0100 |
commit | 3395610ea65d66fb96ab98d6915a7ffbd584c34e (patch) | |
tree | 1e4b4ad406679913ee8c490a2f9be6e3d610dbf3 | |
parent | b0a4a8b51228d049e8472757349569a6d53c27c5 (diff) |
Initial commit of tldk code.
Change-Id: Ib96fdd2c57bae0a51ed420137c35eb8e2ee58473
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Signed-off-by: Ed Warnicke <eaw@cisco.com>
-rw-r--r-- | Makefile | 58 | ||||
-rw-r--r-- | README | 66 | ||||
-rw-r--r-- | dpdk/Makefile | 192 | ||||
-rw-r--r-- | dpdk/README | 34 | ||||
-rw-r--r-- | examples/udpfwd/Makefile | 44 | ||||
-rw-r--r-- | examples/udpfwd/README | 134 | ||||
-rw-r--r-- | examples/udpfwd/be.cfg | 5 | ||||
-rw-r--r-- | examples/udpfwd/fe.cfg | 24 | ||||
-rw-r--r-- | examples/udpfwd/fwdtbl.h | 117 | ||||
-rw-r--r-- | examples/udpfwd/main.c | 1810 | ||||
-rw-r--r-- | examples/udpfwd/netbe.h | 251 | ||||
-rw-r--r-- | examples/udpfwd/parse.c | 586 | ||||
-rw-r--r-- | examples/udpfwd/parse.h | 75 | ||||
-rw-r--r-- | examples/udpfwd/pkt.c | 579 | ||||
-rw-r--r-- | lib/libtle_udp/Makefile | 50 | ||||
-rw-r--r-- | lib/libtle_udp/buf_cage.c | 81 | ||||
-rw-r--r-- | lib/libtle_udp/buf_cage.h | 231 | ||||
-rw-r--r-- | lib/libtle_udp/event.c | 104 | ||||
-rw-r--r-- | lib/libtle_udp/misc.h | 296 | ||||
-rw-r--r-- | lib/libtle_udp/osdep.h | 56 | ||||
-rw-r--r-- | lib/libtle_udp/port_bitmap.h | 112 | ||||
-rw-r--r-- | lib/libtle_udp/tle_event.h | 257 | ||||
-rw-r--r-- | lib/libtle_udp/tle_udp_impl.h | 373 | ||||
-rw-r--r-- | lib/libtle_udp/udp_ctl.c | 723 | ||||
-rw-r--r-- | lib/libtle_udp/udp_impl.h | 161 | ||||
-rw-r--r-- | lib/libtle_udp/udp_rxtx.c | 767 |
26 files changed, 7186 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4a4fe31 --- /dev/null +++ b/Makefile @@ -0,0 +1,58 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +TLDK_ROOT := $(CURDIR) +export TLDK_ROOT + +DPDK_VERSION=16.04 +LOCAL_RTE_SDK=$(TLDK_ROOT)/dpdk/_build/dpdk-$(DPDK_VERSION)/ + +ifeq ($(RTE_SDK),) + export RTE_SDK=$(LOCAL_RTE_SDK) +endif + +# Default target, can be overriden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +DIRS-y += lib/libtle_udp +DIRS-y += examples/udpfwd + +MAKEFLAGS += --no-print-directory + +# output directory +O ?= $(TLDK_ROOT)/${RTE_TARGET} +BASE_OUTPUT ?= $(abspath $(O)) + +.PHONY: all +all: $(DIRS-y) + +.PHONY: clean +clean: $(DIRS-y) + +.PHONY: $(DIRS-y) +$(DIRS-y): $(RTE_SDK)/mk/rte.vars.mk + @echo "== $@" + $(Q)$(MAKE) -C $(@) \ + M=$(CURDIR)/$(@)/Makefile \ + O=$(BASE_OUTPUT) \ + BASE_OUTPUT=$(BASE_OUTPUT) \ + CUR_SUBDIR=$(CUR_SUBDIR)/$(@) \ + S=$(CURDIR)/$(@) \ + RTE_TARGET=$(RTE_TARGET) \ + $(filter-out $(DIRS-y),$(MAKECMDGOALS)) + +$(RTE_SDK)/mk/rte.vars.mk: +ifeq ($(RTE_SDK),$(LOCAL_RTE_SDK)) + @make RTE_TARGET=$(RTE_TARGET) config all -C $(TLDK_ROOT)/dpdk/ +endif + @@ -0,0 +1,66 @@ +OVERVIEW +======== + +TLDK project scope is: +1) Implement a set of libraries for L4 protocol processing (UDP, TCP etc.) + for both IPv4 and IPv6. + + The goal is to provide lightweight, high performance and highly adaptable + implementation for L4(UDP, TCP etc.) protocol processing. The provided API + are not planned to be compatible with BSD socket API. These libraries are + suppose to be built on top of DPDK. + + Note: these libraries are not supposed to be a 'complete' network stack. + + Implementation of ARP, IP, ETHER, etc layers and related routing tables, + code for setup, manage and perform actual IO over underlying devices are + all out of scope of these libraries. + + Implementation of ARP, IP etc. layers and their related routing tables + are out of scope of these libraries. Similarly, the setup, management and + actual IO on underlying NIC devices are out of scope too. + + The libraries only need to know about underlying devices plus what + HW offloads are supported, underlying device MTU and L3/L2 addresses to + fill into L3/L2 headers for the outgoing packets. + + These libraries should be developed in such manner, they could be used + independently from implementations of 2) and 3). + +2) Create VPP graph nodes, plugins etc using those libraries to implement + a host stack. + +3) Create such mechanisms (netlink agents, packaging, etc) necessary to make + the resulting host stack easily usable by existing non-vpp aware software. + +INSTALLATION GUIDE +================= + +1. Obtain latest DPDK. + (refer to http://dpdk.org for information how to download and build it). +2. Make sure that RTE_SDK and RTE_TARGET DPDK related environment variables + are setup correctly. +3. Go to the TLDK root directory and type: 'make all'. +4. Run sample applications. + +As an example: +export RTE_SDK=/opt/DPDK +export RTE_TARGET=x86_64-native-linuxapp-gcc +cd tldk +make all +./x86_64-native-linuxapp-gcc/app/udpfwd ... + +CONTENTS +======== + +$(TLDK_ROOT) +| ++----lib +| | +| +--libtle_udp - implementation of the UDP datagram processing +| ++----examples + | + +-- udpfwd - sample app to demonstrate and test libtle_udp usage + (refer to examples/udpfwd/README for more information) + diff --git a/dpdk/Makefile b/dpdk/Makefile new file mode 100644 index 0000000..7653a4e --- /dev/null +++ b/dpdk/Makefile @@ -0,0 +1,192 @@ +# Copyright (c) 2015 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Scripts require non-POSIX parts of bash +SHELL := /bin/bash + +DPDK_VERSION ?= 16.04 +DPDK_BUILD_DIR ?= $(CURDIR)/_build +DPDK_INSTALL_DIR ?= $(DPDK_BUILD_DIR)/dpdk-$(DPDK_VERSION)/$(RTE_TARGET) +DPDK_PKTMBUF_HEADROOM ?= 128 +DPDK_DOWNLOAD_DIR ?= $(HOME)/Downloads +DPDK_MARCH ?= native +DPDK_TUNE ?= generic +DPDK_DEBUG ?= n + +B := $(DPDK_BUILD_DIR) +I := $(DPDK_INSTALL_DIR) +DPDK_BASE_URL ?= https://nexus.fd.io/content/repositories/thirdparty/ +DPDK_TARBALL := dpdk-$(DPDK_VERSION).tar.gz +DPDK_TAR_URL := $(DPDK_BASE_URL)/$(DPDK_TARBALL) +DPDK_2.1.0_TARBALL_MD5_CKSUM := 205a0d12bfd6eb717d57506272f43519 +DPDK_2.2.0_TARBALL_MD5_CKSUM := 22e2fd68cd5504f43fe9a5a6fd6dd938 +DPDK_16.04_TARBALL_MD5_CKSUM := 0728d506d7f56eb64233e824fa3c098a +DPDK_SOURCE := $(B)/dpdk-$(DPDK_VERSION) + +ifneq (,$(findstring clang,$(CC))) +DPDK_CC=clang +else +DPDK_CC=gcc +endif + + +ifeq (,$(DPDK_TARGET)) +DPDK_TARGET := x86_64-native-linuxapp-$(DPDK_CC) +endif + +JOBS := $(shell grep processor /proc/cpuinfo | wc -l) + +# compiler/linker custom arguments +DPDK_CPU_CFLAGS := -pie -fPIC +DPDK_CPU_LDFLAGS := +DPDK_EXTRA_LDFLAGS := -g + +ifeq ($(DPDK_DEBUG),n) +DPDK_EXTRA_CFLAGS := -g -mtune=$(DPDK_TUNE) +else +DPDK_EXTRA_CFLAGS := -g -O0 +endif + +# translate gcc march values to DPDK arch +ifeq ($(DPDK_MARCH),native) +DPDK_MACHINE:=native # autodetect host CPU +else ifeq ($(DPDK_MARCH),corei7) +DPDK_MACHINE:=nhm # Nehalem / Westmere +else ifeq ($(DPDK_MARCH),corei7-avx) +DPDK_MACHINE:=snb # Sandy Bridge +else ifeq ($(DPDK_MARCH),core-avx-i) +DPDK_MACHINE:=ivb # Ivy Bridge +else ifeq ($(DPDK_MARCH),core-avx2) +DPDK_MACHINE:=hsw # Haswell +else ifeq ($(DPDK_MARCH),armv7a) +DPDK_MACHINE:=armv7a # ARMv7 +else ifeq ($(DPDK_MARCH),armv8a) +DPDK_MACHINE:=armv8a # ARMv8 +else +$(error Unknown DPDK_MARCH) +endif + +# assemble DPDK make arguments +DPDK_MAKE_ARGS := -C $(DPDK_SOURCE) -j $(JOBS) \ + T=$(DPDK_TARGET) \ + RTE_CONFIG_TEMPLATE=../custom-config \ + RTE_OUTPUT=$(I) \ + EXTRA_CFLAGS="$(DPDK_EXTRA_CFLAGS)" \ + EXTRA_LDFLAGS="$(DPDK_EXTRA_LDFLAGS)" \ + CPU_CFLAGS="$(DPDK_CPU_CFLAGS)" \ + CPU_LDFLAGS="$(DPDK_CPU_LDFLAGS)" \ + $(DPDK_MAKE_EXTRA_ARGS) + +DPDK_SOURCE_FILES := $(shell [ -e $(DPDK_SOURCE) ] && find $(DPDK_SOURCE) -name "*.[chS]") + +define set +@if grep -q CONFIG_$1 $@ ; \ + then sed -i -e 's/.*\(CONFIG_$1=\).*/\1$2/' $@ ; \ + else echo CONFIG_$1=$2 >> $@ ; \ +fi +endef + +all: build + +$(B)/custom-config: $(B)/.patch.ok Makefile + @echo --- generating custom config from $(DPDK_SOURCE)/config/defconfig_$(DPDK_TARGET) --- + @cpp -undef -ffreestanding -x assembler-with-cpp $(DPDK_SOURCE)/config/defconfig_$(DPDK_TARGET) $@ + $(call set,RTE_MACHINE,$(DPDK_MACHINE)) + @# modify options + $(call set,RTE_MAX_LCORE,256) + $(call set,RTE_PKTMBUF_HEADROOM,$(DPDK_PKTMBUF_HEADROOM)) + $(call set,RTE_LIBEAL_USE_HPET,y) + $(call set,RTE_BUILD_COMBINE_LIBS,y) + $(call set,RTE_LIBRTE_I40E_16BYTE_RX_DESC,y) + $(call set,RTE_LIBRTE_I40E_ITR_INTERVAL,16) + @# enable debug init for device drivers + $(call set,RTE_LIBRTE_I40E_DEBUG_INIT,$(DPDK_DEBUG)) + $(call set,RTE_LIBRTE_IXGBE_DEBUG_INIT,$(DPDK_DEBUG)) + $(call set,RTE_LIBRTE_E1000_DEBUG_INIT,$(DPDK_DEBUG)) + $(call set,RTE_LIBRTE_VIRTIO_DEBUG_INIT,$(DPDK_DEBUG)) + $(call set,RTE_LIBRTE_VMXNET3_DEBUG_INIT,$(DPDK_DEBUG)) + $(call set,RTE_LIBRTE_PMD_BOND,y) + $(call set,RTE_LIBRTE_IP_FRAG,y) + @# not needed + $(call set,RTE_LIBRTE_TIMER,n) + $(call set,RTE_LIBRTE_CFGFILE,n) + $(call set,RTE_LIBRTE_LPM,y) + $(call set,RTE_LIBRTE_ACL,n) + $(call set,RTE_LIBRTE_POWER,n) + $(call set,RTE_LIBRTE_DISTRIBUTOR,n) + $(call set,RTE_LIBRTE_REORDER,n) + $(call set,RTE_LIBRTE_PORT,n) + $(call set,RTE_LIBRTE_TABLE,n) + $(call set,RTE_LIBRTE_PIPELINE,n) + $(call set,RTE_KNI_KMOD,n) + @rm -f .config.ok + +$(CURDIR)/$(DPDK_TARBALL): + @mkdir -p $(B) + @if [ -e $(DPDK_DOWNLOAD_DIR)/$(DPDK_TARBALL) ] ; \ + then cp $(DPDK_DOWNLOAD_DIR)/$(DPDK_TARBALL) $(CURDIR) ; \ + else curl -o $(CURDIR)/$(DPDK_TARBALL) -LO $(DPDK_TAR_URL) ; \ + fi + @rm -f $(B)/.download.ok + +$(B)/.download.ok: $(CURDIR)/$(DPDK_TARBALL) + @openssl md5 $< | cut -f 2 -d " " - > $(B)/$(DPDK_TARBALL).md5sum + @([ "$$(<$(B)/$(DPDK_TARBALL).md5sum)" = "$(DPDK_$(DPDK_VERSION)_TARBALL_MD5_CKSUM)" ] || \ + ( echo "Bad Checksum! Please remove $< and retry" && \ + rm $(B)/$(DPDK_TARBALL).md5sum && false )) + @touch $@ + +.PHONY: download +download: $(B)/.download.ok + +$(B)/.extract.ok: $(B)/.download.ok + @echo --- extracting $(DPDK_TARBALL) --- + @tar --directory $(B) --extract --file $(CURDIR)/$(DPDK_TARBALL) --gzip + @touch $@ + +.PHONY: extract +extract: $(B)/.extract.ok + +$(B)/.patch.ok: $(B)/.extract.ok +ifneq ($(wildcard $(CURDIR)/dpdk-$(DPDK_VERSION)_patches/*.patch),) + @echo --- patching --- + for f in $(CURDIR)/dpdk-$(DPDK_VERSION)_patches/*.patch ; do \ + echo Applying patch: $$(basename $$f) ; \ + patch -p1 -d $(DPDK_SOURCE) < $$f ; \ + done +endif + @touch $@ + +.PHONY: patch +patch: $(B)/.patch.ok + +$(B)/.config.ok: $(B)/.patch.ok $(B)/custom-config + @make $(DPDK_MAKE_ARGS) config + @touch $@ + +.PHONY: config +config: $(B)/.config.ok + +$(B)/.build.ok: $(DPDK_SOURCE_FILES) + @if [ ! -e $(B)/.config.ok ] ; then echo 'Please run "make config" first' && false ; fi + @make $(DPDK_MAKE_ARGS) install + @cp $(I)/.config $(B)/.config + @touch $@ + +.PHONY: build +build: $(B)/.build.ok + +.PHONY: clean +clean: + @rm -rf $(B) $(I) + diff --git a/dpdk/README b/dpdk/README new file mode 100644 index 0000000..95154dc --- /dev/null +++ b/dpdk/README @@ -0,0 +1,34 @@ + +Changes needed to DPDK are stored here as git patch files. Maintaining these +files using “git format-patch” and “git am” will make it simpler to manage +these changes. Patches made to DPDK should only be temporary until they are +accepted upstream and made available in the next DPDK release. + +The following is the method used to generate these patches: + +1. Git clone the DPDK to a new directory: + # git clone http://dpdk.org/git/dpdk dpdk + +2. Create a branch based on the DPDK release you wish to patch. +Note, “git tag” will show you all the release tags. The following example is +for DPDK release tag “v2.2.0” and will create a branch named “two_dot_two”. + # cd dpdk + # git checkout -b two_dot_two v2.2.0 + +3. Apply all the existing VPP DPDK patches to this new branch. + # git am <VPP directory>/dpdk/dpdk-2.2.0_patches/* + +4. Make your changes and commit your change to your DPDK repository. + # <edit files> + # git commit -s + +5. Create the patch files with format-patch. This creates all the patch files +for your branch (two_dot_two), with your latest commits as the last ones. + # git format-patch master..two_dot_two + +6. Copy, add and commit the new patches into the VPP patches directory. + # cp <new patch files> <VPP directory>/dpdk/dpdk-2.2.0_patches + # cd <VPP directory> + # git add dpdk/dpdk-2.2.0_patches/<new patch files> + # git commit -s + diff --git a/examples/udpfwd/Makefile b/examples/udpfwd/Makefile new file mode 100644 index 0000000..c23947a --- /dev/null +++ b/examples/udpfwd/Makefile @@ -0,0 +1,44 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +ifeq ($(RTE_TARGET),) +$(error "Please define RTE_TARGET environment variable") +endif + +ifeq ($(TLDK_ROOT),) +$(error "Please define TLDK_ROOT environment variable") +endif + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = udpfwd + +# all source are stored in SRCS-y +SRCS-y += parse.c +SRCS-y += pkt.c +SRCS-y += main.c + +CFLAGS += $(WERROR_FLAGS) +CFLAGS += -I$(TLDK_ROOT)/$(RTE_TARGET)/include + +LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib +LDLIBS += -ltle_udp + +EXTRA_CFLAGS += -O3 + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/examples/udpfwd/README b/examples/udpfwd/README new file mode 100644 index 0000000..8ab7e98 --- /dev/null +++ b/examples/udpfwd/README @@ -0,0 +1,134 @@ +Introduction +============ + +udpfwd is a sample application to demonstrate and test libtle_udp. +Depending on configuration it can do simple send/recv or both over +opened udp streams. Also it implements ability to do UDP datagram +forwarding between different streams, so it is possible to use that +application as some sort of 'UDP proxy'. +The application can reassemble input fragmented IP packets, +and fragment outgoing IP packets (if destination MTU is less then packet size). +To build and run the application DPDK and TLDK libraries are required. + +Logically the application is divided into two parts: + +- Back End (BE) +BE is responsible for: + - RX over DPDK ports and feed them into UDP TLDK context(s) + (via tle_udp_rx_bulk). + - retrieve packets ready to be send out from UDP TLDK context(s) + and TX them over destined DPDK port. +Right now only one RX/TX queue per port is used. +Each BE lcore can serve multiple DPDK ports, TLDK UDP contexts. + +- Front End (FE) +FE responsibility is to open configured UDP streams and perform +send/recv over them. These streams can belong to different UDP contexts. + +Right now each lcore can act as BE or FE (but not both simultaneously). +Master lcore can act as FE only. + +Usage +===== + +udpfwd <EAL parameters> -- \ + -P | --promisc /* promiscuous mode enabled. */ \ + -R | --rbufs <num> /* max recv buffers per stream. */ \ + -S | --sbufs <num> /* max send buffers per stream. */ \ + -s | --streams <num> /* streams to open per context. */ \ + -b | --becfg <filename> /* backend configuration file. */ \ + -f | --fecfg <filename> /* frontend configuration file. */ \ + <port0_params> <port1_params> ... <portN_params> + +port_params: port=<uint>,lcore=<uint>,\ +[rx_offload=<uint>,tx_offload=<uint>,mtu=<uint>,ipv4=<ipv4>,ipv6=<ipv6>] + +port_params are used to configure the particular DPDK device (rte_ethdev port), +and specify BE lcore that will do RX/TX from/to the device and manage +BE part of corresponding UDP context. + +port - DPDK port id (right now on each port is used just one RX, + one TX queue). +lcore - EAL lcore id to do IO over that port (rx_burst/tx_burst). + several ports can be managed by the same lcore, + but same port can't belong to more than one lcore. +rx_offload - RX HW offload capabilities to enable/use on this port. + (bitmask of DEV_RX_OFFLOAD_* values). +tx_offload - TX HW offload capabilities to enable/use on this port. + (bitmask of DEV_TX_OFFLOAD_* values). +mtu - MTU to be used on that port + ( = UDP data size + L2/L3/L4 headers sizes, default=1514). +ipv4 - ipv4 address to assign to that port. +ipv6 - ipv6 address to assign to that port. + +At least one of ipv4/ipv6 values have to be specified for each port. + +As an example: +udpfwd --lcores='3,6' -w 01:00.0 -- \ +--promisc --rbufs 0x1000 --sbufs 0x1000 --streams 0x100 \ +--fecfg ./fe.cfg --becfg ./be.cfg \ +port=0,lcore=6,rx_offload=0xf,tx_offload=0,\ +ipv4=192.168.1.233,ipv6=2001:4860:b002::28 + +Will create TLDK UDP context on lcore=6 (BE lcore) to manage DPDK port 0. +Will assign IPv4 address 192.168.1.233 and IPv6 address 2001:4860:b002::28 +to that port. +The following supported by DPDK RX HW offloads: + DEV_RX_OFFLOAD_VLAN_STRIP, + DEV_RX_OFFLOAD_IPV4_CKSUM, + DEV_RX_OFFLOAD_UDP_CKSUM, + DEV_RX_OFFLOAD_TCP_CKSUM +will be enabled on that port. +No HW TX offloads will be enabled. + +Fornt-End (FE) and Back-End (BE) configuration files format: +------------------------------------------------------------ + - each record on a separate line. + - lines started with '#' are treated as comments. + - empty lines (containing whitespace chars only) are ignored. + - kvargs style format for each record. + - each FE record correspond to at least one stream to be opened + (could be multiple streams in case of op="fwd"). + - each BE record define a ipv4/ipv6 destination. + +FE config record format: +------------------------ + +lcore=<uint>,op=<"rx|tx|echo|fwd">,\ +laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\ +[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16> + +lcore - EAL lcore to manage that stream(s). +op - operation to perform on that stream: + "rx" - do receive only on that stream. + "tx" - do send only on that stream. + "echo" - mimic recvfrom(..., &addr);sendto(..., &addr); + on that stream. + "fwd" - forward packets between streams. +laddr - local address for the stream to open. +lport - local port for the stream to open. +raddr - remote address for the stream to open. +rport - remote port for the stream to open. +txlen - data length to send with each packet ("tx" mode only). +fwladdr - local address for the forwarding stream(s) to open + ("fwd mode only). +fwlport - local port for the forwarding stream(s) to open + ("fwd mode only). +fwraddr - remote address for the forwarding stream(s) to open + ("fwd mode only). +fwrport - remote port for the forwarding stream(s) to open + ("fwd mode only). + +Refer to fe.cfg for an example. + +BE config record format: +------------------------ + +port=<uint>,addr=<ipv4/ipv6>,masklen=<uint>,mac=<ether> + +port - port number to be used to send packets to the destination. +addr - destionation network address. +masklen - desitantion network prefix length. +mac - destination ethernet address. + +Refer to fe.cfg for an example. diff --git a/examples/udpfwd/be.cfg b/examples/udpfwd/be.cfg new file mode 100644 index 0000000..5c1d173 --- /dev/null +++ b/examples/udpfwd/be.cfg @@ -0,0 +1,5 @@ +# +# udpfwd BE cconfig file exaple +# +port=0,masklen=16,addr=192.168.0.0,mac=01:de:ad:be:ef:01 +port=0,addr=2001:4860:b002::,masklen=64,mac=01:de:ad:be:ef:01 diff --git a/examples/udpfwd/fe.cfg b/examples/udpfwd/fe.cfg new file mode 100644 index 0000000..2706323 --- /dev/null +++ b/examples/udpfwd/fe.cfg @@ -0,0 +1,24 @@ +# +# udpfwd FE config file example +# + +# open IPv4 stream with local_addr=192.168.1.233:32768, +# and remote_addr as wildcard (any remote addressi/port allowed). +# use it echo mode - for any received packet - send it back to the source +lcore=3,op=echo,laddr=192.168.1.233,lport=0x8000,raddr=0.0.0.0,rport=0 + +# open IPv4 stream with specified local/remote address/port and +# do send only over that stream. +lcore=3,op=tx,laddr=192.168.1.233,lport=0x8001,raddr=192.168.1.56,rport=0x200,txlen=72 + +# open IPv6 stream with specified local port (512) probably over multiple +# eth ports, and do recv only over that stream. +lcore=3,op=rx,laddr=::,lport=0x200,raddr=::,rport=0,txlen=72 + +# fwd mode example. +# open IPv4 stream on local port 11211 (memcached) over all possible ports. +# for each new flow, sort of tunnel will be created, i.e: +# new stream will be opend to communcate with forwarding remote address, +# so all packets with <laddr=A:11211,raddr=X:N> will be forwarded to +# <laddr=[B]:M,raddr=[2001:4860:b002::56]:11211> and visa-versa. +lcore=3,op=fwd,laddr=0.0.0.0,lport=11211,raddr=0.0.0.0,rport=0,fwladdr=::,fwlport=0,fwraddr=2001:4860:b002::56,fwrport=11211 diff --git a/examples/udpfwd/fwdtbl.h b/examples/udpfwd/fwdtbl.h new file mode 100644 index 0000000..1c4265e --- /dev/null +++ b/examples/udpfwd/fwdtbl.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __FWDTBL_H__ +#define __FWDTBL_H__ + +struct fwd4_key { + uint32_t port; + struct in_addr addr; +} __attribute__((__packed__)); + +struct fwd6_key { + uint32_t port; + struct in6_addr addr; +} __attribute__((__packed__)); + +union fwd_key { + struct fwd4_key k4; + struct fwd6_key k6; +}; + +static struct rte_hash * +fwd_tbl_key_prep(const struct netfe_lcore *fe, uint16_t family, + const struct sockaddr *sa, union fwd_key *key) +{ + struct rte_hash *h; + const struct sockaddr_in *sin4; + const struct sockaddr_in6 *sin6; + + if (family == AF_INET) { + h = fe->fw4h; + sin4 = (const struct sockaddr_in *)sa; + key->k4.port = sin4->sin_port; + key->k4.addr = sin4->sin_addr; + } else { + h = fe->fw6h; + sin6 = (const struct sockaddr_in6 *)sa; + key->k6.port = sin6->sin6_port; + key->k6.addr = sin6->sin6_addr; + } + + return h; +} + +static int +fwd_tbl_add(struct netfe_lcore *fe, uint16_t family, const struct sockaddr *sa, + struct netfe_stream *data) +{ + int32_t rc; + struct rte_hash *h; + union fwd_key key; + + h = fwd_tbl_key_prep(fe, family, sa, &key); + rc = rte_hash_add_key_data(h, &key, data); + return rc; +} + +static struct netfe_stream * +fwd_tbl_lkp(struct netfe_lcore *fe, uint16_t family, const struct sockaddr *sa) +{ + int rc; + void *d; + struct rte_hash *h; + union fwd_key key; + + h = fwd_tbl_key_prep(fe, family, sa, &key); + rc = rte_hash_lookup_data(h, &key, &d); + if (rc < 0) + d = NULL; + return d; +} + +static int +fwd_tbl_init(struct netfe_lcore *fe, uint16_t family, uint32_t lcore) +{ + int32_t rc; + struct rte_hash **h; + struct rte_hash_parameters hprm; + char buf[RTE_HASH_NAMESIZE]; + + if (family == AF_INET) { + snprintf(buf, sizeof(buf), "fwd4tbl@%u", lcore); + h = &fe->fw4h; + hprm.key_len = sizeof(struct fwd4_key); + } else { + snprintf(buf, sizeof(buf), "fwd6tbl@%u", lcore); + h = &fe->fw6h; + hprm.key_len = sizeof(struct fwd6_key); + } + + hprm.name = buf; + hprm.entries = RTE_MAX(2 * fe->snum, 0x10U); + hprm.socket_id = rte_lcore_to_socket_id(lcore); + hprm.hash_func = NULL; + hprm.hash_func_init_val = 0; + + *h = rte_hash_create(&hprm); + if (*h == NULL) + rc = (rte_errno != 0) ? -rte_errno : -ENOMEM; + else + rc = 0; + return rc; +} + +#endif /* __FWDTBL_H__ */ diff --git a/examples/udpfwd/main.c b/examples/udpfwd/main.c new file mode 100644 index 0000000..a907355 --- /dev/null +++ b/examples/udpfwd/main.c @@ -0,0 +1,1810 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "netbe.h" +#include "parse.h" + +#define MAX_RULES 0x100 +#define MAX_TBL8 0x800 + +#define RX_RING_SIZE 0x400 +#define TX_RING_SIZE 0x800 + +#define MPOOL_CACHE_SIZE 0x100 +#define MPOOL_NB_BUF 0x20000 + +#define FRAG_MBUF_BUF_SIZE (RTE_PKTMBUF_HEADROOM + TLE_UDP_MAX_HDR) +#define FRAG_TTL MS_PER_S +#define FRAG_TBL_BUCKET_ENTRIES 16 + +#define FIRST_PORT 0x8000 + +#define RX_CSUM_OFFLOAD (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM) +#define TX_CSUM_OFFLOAD (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM) + +#define OPT_SHORT_PROMISC 'P' +#define OPT_LONG_PROMISC "promisc" + +#define OPT_SHORT_RBUFS 'R' +#define OPT_LONG_RBUFS "rbufs" + +#define OPT_SHORT_SBUFS 'S' +#define OPT_LONG_SBUFS "sbufs" + +#define OPT_SHORT_STREAMS 's' +#define OPT_LONG_STREAMS "streams" + +#define OPT_SHORT_FECFG 'f' +#define OPT_LONG_FECFG "fecfg" + +#define OPT_SHORT_BECFG 'b' +#define OPT_LONG_BECFG "becfg" + +RTE_DEFINE_PER_LCORE(struct netfe_lcore *, _fe); + +#include "fwdtbl.h" + +static const struct option long_opt[] = { + {OPT_LONG_BECFG, 1, 0, OPT_SHORT_BECFG}, + {OPT_LONG_FECFG, 1, 0, OPT_SHORT_FECFG}, + {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC}, + {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS}, + {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS}, + {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS}, + {NULL, 0, 0, 0} +}; + +static volatile int force_quit; + +static struct netbe_cfg becfg; +static struct rte_mempool *mpool[RTE_MAX_NUMA_NODES + 1]; +static struct rte_mempool *frag_mpool[RTE_MAX_NUMA_NODES + 1]; + +static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .max_rx_pkt_len = ETHER_MAX_VLAN_FRAME_LEN, + .hw_vlan_strip = 1, + .jumbo_frame = 1, + }, +}; + +#include "parse.h" + +static void +sig_handle(int signum) +{ + RTE_LOG(ERR, USER1, "%s(%d)\n", __func__, signum); + force_quit = 1; +} + +/* + * Initilise DPDK port. + * In current version, only one queue per port is used. + */ +static int +port_init(struct netbe_port *uprt, struct rte_mempool *mp) +{ + int32_t socket, rc; + uint16_t q; + struct rte_eth_conf port_conf; + struct rte_eth_dev_info dev_info; + + const uint16_t rx_rings = 1, tx_rings = 1; + + rte_eth_dev_info_get(uprt->id, &dev_info); + if ((dev_info.rx_offload_capa & uprt->rx_offload) != uprt->rx_offload) { + RTE_LOG(ERR, USER1, + "port#%u supported/requested RX offloads don't match, " + "supported: %#x, requested: %#x;\n", + uprt->id, dev_info.rx_offload_capa, uprt->rx_offload); + return -EINVAL; + } + if ((dev_info.tx_offload_capa & uprt->tx_offload) != uprt->tx_offload) { + RTE_LOG(ERR, USER1, + "port#%u supported/requested TX offloads don't match, " + "supported: %#x, requested: %#x;\n", + uprt->id, dev_info.tx_offload_capa, uprt->tx_offload); + return -EINVAL; + } + + port_conf = port_conf_default; + if ((uprt->rx_offload & RX_CSUM_OFFLOAD) != 0) { + RTE_LOG(ERR, USER1, "%s(%u): enabling RX csum offload;\n", + __func__, uprt->id); + port_conf.rxmode.hw_ip_checksum = 1; + } + + port_conf.rxmode.max_rx_pkt_len = uprt->mtu + ETHER_CRC_LEN; + + rc = rte_eth_dev_configure(uprt->id, rx_rings, tx_rings, &port_conf); + RTE_LOG(NOTICE, USER1, + "%s: rte_eth_dev_configure(%u) returns %d;\n", + __func__, uprt->id, rc); + if (rc != 0) + return rc; + + socket = rte_eth_dev_socket_id(uprt->id); + + dev_info.default_rxconf.rx_drop_en = 1; + + dev_info.default_txconf.tx_free_thresh = TX_RING_SIZE / 2; + if (uprt->tx_offload != 0) { + RTE_LOG(ERR, USER1, "%s(%u): enabling full featured TX;\n", + __func__, uprt->id); + dev_info.default_txconf.txq_flags = 0; + } + + for (q = 0; q < rx_rings; q++) { + rc = rte_eth_rx_queue_setup(uprt->id, q, RX_RING_SIZE, + socket, NULL, mp); + if (rc < 0) + return rc; + } + + for (q = 0; q < tx_rings; q++) { + rc = rte_eth_tx_queue_setup(uprt->id, q, TX_RING_SIZE, + socket, &dev_info.default_txconf); + if (rc < 0) + return rc; + } + + + return 0; +} + +/* + * Check that lcore is enabled, not master, and not in use already. + */ +static int +check_lcore(uint32_t lc) +{ + if (rte_lcore_is_enabled(lc) == 0) { + RTE_LOG(ERR, USER1, "lcore %u is not enabled\n", lc); + return -EINVAL; + } + if (rte_get_master_lcore() == lc) { + RTE_LOG(ERR, USER1, "lcore %u is not slave\n", lc); + return -EINVAL; + } + if (rte_eal_get_lcore_state(lc) == RUNNING) { + RTE_LOG(ERR, USER1, "lcore %u already running %p\n", + lc, lcore_config[lc].f); + return -EINVAL; + } + return 0; +} + +static void +log_netbe_prt(const struct netbe_port *uprt) +{ + RTE_LOG(NOTICE, USER1, + "uprt %p = <id = %u, lcore = %u, " + "mtu = %u, rx_offload = %u, tx_offload = %u,\n" + "ipv4 = %#x, " + "ipv6 = %04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx:%04hx, " + "mac = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx>;\n", + uprt, uprt->id, uprt->lcore, + uprt->mtu, uprt->rx_offload, uprt->tx_offload, + uprt->ipv4, + uprt->ipv6.s6_addr16[0], uprt->ipv6.s6_addr16[1], + uprt->ipv6.s6_addr16[2], uprt->ipv6.s6_addr16[3], + uprt->ipv6.s6_addr16[4], uprt->ipv6.s6_addr16[5], + uprt->ipv6.s6_addr16[6], uprt->ipv6.s6_addr16[7], + uprt->mac.addr_bytes[0], uprt->mac.addr_bytes[1], + uprt->mac.addr_bytes[2], uprt->mac.addr_bytes[3], + uprt->mac.addr_bytes[4], uprt->mac.addr_bytes[5]); +} + +static void +log_netbe_cfg(const struct netbe_cfg *ucfg) +{ + uint32_t i; + + RTE_LOG(NOTICE, USER1, + "ucfg @ %p, prt_num = %u\n", ucfg, ucfg->prt_num); + + for (i = 0; i != ucfg->prt_num; i++) + log_netbe_prt(ucfg->prt + i); +} + +static int +pool_init(uint32_t sid) +{ + int32_t rc; + struct rte_mempool *mp; + char name[RTE_MEMPOOL_NAMESIZE]; + + snprintf(name, sizeof(name), "MP%u", sid); + mp = rte_pktmbuf_pool_create(name, MPOOL_NB_BUF, MPOOL_CACHE_SIZE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, sid - 1); + if (mp == NULL) { + rc = -rte_errno; + RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n", + __func__, sid - 1, rc); + return rc; + } + + mpool[sid] = mp; + return 0; +} + +static int +frag_pool_init(uint32_t sid) +{ + int32_t rc; + struct rte_mempool *frag_mp; + char frag_name[RTE_MEMPOOL_NAMESIZE]; + + snprintf(frag_name, sizeof(frag_name), "frag_MP%u", sid); + frag_mp = rte_pktmbuf_pool_create(frag_name, MPOOL_NB_BUF, + MPOOL_CACHE_SIZE, 0, FRAG_MBUF_BUF_SIZE, sid - 1); + if (frag_mp == NULL) { + rc = -rte_errno; + RTE_LOG(ERR, USER1, "%s(%d) failed with error code: %d\n", + __func__, sid - 1, rc); + return rc; + } + + frag_mpool[sid] = frag_mp; + return 0; +} + + +/* + * Setup all enabled ports. + */ +static void +netbe_port_init(struct netbe_cfg *cfg, int argc, char *argv[]) +{ + int32_t rc; + uint32_t i, n, sid; + + n = RTE_MIN(RTE_DIM(cfg->prt), (uint32_t)argc); + + rc = 0; + for (i = 0; i != n; i++) { + rc = parse_netbe_arg(cfg->prt + i, argv[i]); + if (rc != 0) + break; + + rc = check_lcore(cfg->prt[i].lcore); + if (rc != 0) + break; + + sid = rte_lcore_to_socket_id(cfg->prt[i].lcore) + 1; + assert(sid < RTE_DIM(mpool)); + + if (mpool[sid] == NULL && (rc = pool_init(sid)) != 0) + break; + + if (frag_mpool[sid] == NULL && (rc = frag_pool_init(sid)) != 0) + break; + + rc = port_init(cfg->prt + i, mpool[sid]); + if (rc != 0) + break; + + rte_eth_macaddr_get(cfg->prt[i].id, &cfg->prt[i].mac); + if (cfg->promisc) + rte_eth_promiscuous_enable(cfg->prt[i].id); + } + + if (rc != 0) + rte_exit(EXIT_FAILURE, + "%s: processing of \"%s\" failed with error code: %d\n", + __func__, argv[i], rc); + + cfg->prt_num = i; + log_netbe_cfg(cfg); +} + +/* + * UDP IPv4 destination lookup callback. + */ +static int +lpm4_dst_lookup(void *data, const struct in_addr *addr, + struct tle_udp_dest *res) +{ + int32_t rc; + uint32_t idx; + struct netbe_lcore *lc; + struct tle_udp_dest *dst; + + lc = data; + + rc = rte_lpm_lookup(lc->lpm4, rte_be_to_cpu_32(addr->s_addr), &idx); + if (rc == 0) { + dst = &lc->dst4[idx]; + rte_memcpy(res, dst, dst->l2_len + dst->l3_len + + offsetof(struct tle_udp_dest, hdr)); + } + return rc; +} + +/* + * UDP IPv6 destination lookup callback. + */ +static int +lpm6_dst_lookup(void *data, const struct in6_addr *addr, + struct tle_udp_dest *res) +{ + int32_t rc; + uint8_t idx; + struct netbe_lcore *lc; + struct tle_udp_dest *dst; + uintptr_t p; + + lc = data; + p = (uintptr_t)addr->s6_addr; + + rc = rte_lpm6_lookup(lc->lpm6, (uint8_t *)p, &idx); + if (rc == 0) { + dst = &lc->dst6[idx]; + rte_memcpy(res, dst, dst->l2_len + dst->l3_len + + offsetof(struct tle_udp_dest, hdr)); + } + return rc; +} + +static int +netbe_add_ipv4_route(struct netbe_lcore *lc, const struct netbe_dest *dst, + uint8_t idx) +{ + int32_t rc; + uint32_t addr, depth; + char str[INET_ADDRSTRLEN]; + + depth = dst->prfx; + addr = rte_be_to_cpu_32(dst->ipv4.s_addr); + + inet_ntop(AF_INET, &dst->ipv4, str, sizeof(str)); + rc = rte_lpm_add(lc->lpm4, addr, depth, idx); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p," + "ipv4=%s/%u,mtu=%u," + "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) " + "returns %d;\n", + __func__, lc->id, dst->port, lc->dst4[idx].dev, + str, depth, lc->dst4[idx].mtu, + dst->mac.addr_bytes[0], dst->mac.addr_bytes[1], + dst->mac.addr_bytes[2], dst->mac.addr_bytes[3], + dst->mac.addr_bytes[4], dst->mac.addr_bytes[5], + rc); + return rc; +} + +static int +netbe_add_ipv6_route(struct netbe_lcore *lc, const struct netbe_dest *dst, + uint8_t idx) +{ + int32_t rc; + uint32_t depth; + char str[INET6_ADDRSTRLEN]; + + depth = dst->prfx; + + rc = rte_lpm6_add(lc->lpm6, (uint8_t *)(uintptr_t)dst->ipv6.s6_addr, + depth, idx); + + inet_ntop(AF_INET6, &dst->ipv6, str, sizeof(str)); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u,port=%u,dev=%p," + "ipv6=%s/%u,mtu=%u," + "mac=%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx) " + "returns %d;\n", + __func__, lc->id, dst->port, lc->dst6[idx].dev, + str, depth, lc->dst4[idx].mtu, + dst->mac.addr_bytes[0], dst->mac.addr_bytes[1], + dst->mac.addr_bytes[2], dst->mac.addr_bytes[3], + dst->mac.addr_bytes[4], dst->mac.addr_bytes[5], + rc); + return rc; +} + +static int +lcore_lpm_init(struct netbe_lcore *lc) +{ + int32_t sid; + char str[RTE_LPM_NAMESIZE]; + const struct rte_lpm_config lpm4_cfg = { + .max_rules = MAX_RULES, + .number_tbl8s = MAX_TBL8, + }; + const struct rte_lpm6_config lpm6_cfg = { + .max_rules = MAX_RULES, + .number_tbl8s = MAX_TBL8, + }; + + sid = rte_lcore_to_socket_id(lc->id); + + snprintf(str, sizeof(str), "LPM4%u\n", lc->id); + lc->lpm4 = rte_lpm_create(str, sid, &lpm4_cfg); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm4=%p;\n", + __func__, lc->id, lc->lpm4); + if (lc->lpm4 == NULL) + return -ENOMEM; + + snprintf(str, sizeof(str), "LPM6%u\n", lc->id); + lc->lpm6 = rte_lpm6_create(str, sid, &lpm6_cfg); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): lpm6=%p;\n", + __func__, lc->id, lc->lpm6); + if (lc->lpm6 == NULL) + return -ENOMEM; + + return 0; +} + +static void +fill_dst(struct tle_udp_dest *dst, struct netbe_dev *bed, + const struct netbe_dest *bdp, uint16_t l3_type, int32_t sid) +{ + struct ether_hdr *eth; + struct ipv4_hdr *ip4h; + struct ipv6_hdr *ip6h; + + static const struct ipv4_hdr ipv4_tmpl = { + .version_ihl = 4 << 4 | sizeof(*ip4h) / IPV4_IHL_MULTIPLIER, + .time_to_live = 64, + .next_proto_id = IPPROTO_UDP, + }; + + static const struct ipv6_hdr ipv6_tmpl = { + .vtc_flow = 6 << 4, + .proto = IPPROTO_UDP, + .hop_limits = 64, + }; + + dst->dev = bed->dev; + dst->head_mp = frag_mpool[sid + 1]; + dst->mtu = RTE_MIN(bdp->mtu, bed->port.mtu); + dst->l2_len = sizeof(*eth); + + eth = (struct ether_hdr *)dst->hdr; + + ether_addr_copy(&bed->port.mac, ð->s_addr); + ether_addr_copy(&bdp->mac, ð->d_addr); + eth->ether_type = rte_cpu_to_be_16(l3_type); + + if (l3_type == ETHER_TYPE_IPv4) { + dst->l3_len = sizeof(*ip4h); + ip4h = (struct ipv4_hdr *)(eth + 1); + ip4h[0] = ipv4_tmpl; + } else if (l3_type == ETHER_TYPE_IPv6) { + dst->l3_len = sizeof(*ip6h); + ip6h = (struct ipv6_hdr *)(eth + 1); + ip6h[0] = ipv6_tmpl; + } +} + + +/* + * BE lcore setup routine. + */ +static int +lcore_init(struct netbe_lcore *lc, const struct tle_udp_ctx_param *ctx_prm, + const struct netbe_port prt[], uint32_t prt_num) +{ + int32_t rc, sid; + uint32_t i; + uint64_t frag_cycles; + struct tle_udp_ctx_param cprm; + struct tle_udp_dev_param dprm; + + lc->id = prt[0].lcore; + lc->prt_num = prt_num; + + sid = rte_lcore_to_socket_id(lc->id); + + rc = lcore_lpm_init(lc); + if (rc != 0) + return rc; + + cprm = *ctx_prm; + cprm.socket_id = sid; + cprm.lookup4 = lpm4_dst_lookup; + cprm.lookup4_data = lc; + cprm.lookup6 = lpm6_dst_lookup; + cprm.lookup6_data = lc; + + /* to facilitate both IPv4 and IPv6. */ + cprm.max_streams *= 2; + + frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * FRAG_TTL; + lc->ftbl = rte_ip_frag_table_create(cprm.max_streams, + FRAG_TBL_BUCKET_ENTRIES, cprm.max_streams, frag_cycles, sid); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): frag_tbl=%p;\n", + __func__, lc->id, lc->ftbl); + + lc->ctx = tle_udp_create(&cprm); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u): udp_ctx=%p;\n", + __func__, lc->id, lc->ctx); + + if (lc->ctx == NULL || lc->ftbl == NULL) + rc = ENOMEM; + + for (i = 0; i != prt_num && rc == 0; i++) { + + memset(&dprm, 0, sizeof(dprm)); + + lc->prt[i].rxqid = 0; + lc->prt[i].txqid = 0; + lc->prt[i].port = prt[i]; + + dprm.rx_offload = prt[i].rx_offload; + dprm.tx_offload = prt[i].tx_offload; + dprm.local_addr4.s_addr = prt[i].ipv4; + memcpy(&dprm.local_addr6, &prt[i].ipv6, sizeof(prt[i].ipv6)); + + lc->prt[i].dev = tle_udp_add_dev(lc->ctx, &dprm); + RTE_LOG(NOTICE, USER1, "%s(lcore=%u, port=%u), udp_dev: %p;\n", + __func__, lc->id, prt[i].id, lc->prt[i].dev); + if (lc->prt[i].dev == NULL) + rc = -rte_errno; + } + + if (rc != 0) { + RTE_LOG(ERR, USER1, "%s(lcore=%u) failed with error code: %d\n", + __func__, lc->id, rc); + tle_udp_destroy(lc->ctx); + rte_ip_frag_table_destroy(lc->ftbl); + rte_lpm_free(lc->lpm4); + rte_lpm6_free(lc->lpm6); + } + + return rc; +} + +static int +prt_lcore_cmp(const void *s1, const void *s2) +{ + const struct netbe_port *p1, *p2; + + p1 = s1; + p2 = s2; + return p1->lcore - p2->lcore; +} + +static void +netbe_lcore_init(struct netbe_cfg *cfg, const struct tle_udp_ctx_param *ctx_prm) +{ + int32_t rc; + uint32_t i, k, n, num; + struct netbe_port sp[RTE_DIM(cfg->prt)]; + + num = cfg->prt_num; + memcpy(sp, cfg->prt, sizeof(sp[0]) * num); + qsort(sp, num, sizeof(sp[0]), prt_lcore_cmp); + + /* Fill ports to be used by each lcore. */ + + k = 0; + n = 0; + rc = 0; + for (i = 0; i != num && rc == 0; i++) { + if (sp[n].lcore != sp[i].lcore) { + rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n); + n = i; + k++; + } + } + + if (rc == 0 && i != n) { + rc = lcore_init(cfg->cpu + k, ctx_prm, sp + n, i - n); + k++; + } + + if (rc != 0) + rte_exit(EXIT_FAILURE, "%s: failed with error code: %d\n", + __func__, rc); + + cfg->cpu_num = k; +} + +static void +netbe_lcore_fini(struct netbe_cfg *cfg) +{ + uint32_t i; + + for (i = 0; i != cfg->cpu_num; i++) { + tle_udp_destroy(cfg->cpu[i].ctx); + rte_ip_frag_table_destroy(cfg->cpu[i].ftbl); + rte_lpm_free(cfg->cpu[i].lpm4); + rte_lpm6_free(cfg->cpu[i].lpm6); + } + + memset(cfg->cpu, 0, sizeof(cfg->cpu)); + cfg->cpu_num = 0; +} + +static int +netbe_add_dest(struct netbe_lcore *lc, uint32_t dev_idx, uint16_t family, + const struct netbe_dest *dst, uint32_t dnum) +{ + int32_t rc, sid; + uint16_t l3_type; + uint32_t i, n, m; + struct tle_udp_dest *dp; + + if (family == AF_INET) { + n = lc->dst4_num; + dp = lc->dst4 + n; + m = RTE_DIM(lc->dst4); + l3_type = ETHER_TYPE_IPv4; + } else { + n = lc->dst6_num; + dp = lc->dst6 + n; + m = RTE_DIM(lc->dst6); + l3_type = ETHER_TYPE_IPv6; + } + + if (n + dnum >= m) { + RTE_LOG(ERR, USER1, "%s(lcore=%u, family=%hu, dnum=%u) exceeds " + "maximum allowed number of destinations(%u);\n", + __func__, lc->id, family, dnum, m); + return -ENOSPC; + } + + sid = rte_lcore_to_socket_id(lc->id); + rc = 0; + + for (i = 0; i != dnum && rc == 0; i++) { + fill_dst(dp + i, lc->prt + dev_idx, dst + i, l3_type, sid); + if (family == AF_INET) + rc = netbe_add_ipv4_route(lc, dst + i, n + i); + else + rc = netbe_add_ipv6_route(lc, dst + i, n + i); + } + + if (family == AF_INET) + lc->dst4_num = n + i; + else + lc->dst6_num = n + i; + + return rc; +} + +static int +netbe_port2lcore(struct netbe_cfg *cfg, uint32_t port, struct netbe_lcore **plc) +{ + uint32_t i, j; + struct netbe_lcore *lc; + + for (i = 0; i != cfg->cpu_num; i++) { + lc = cfg->cpu + i; + for (j = 0; j != cfg->prt_num; j++) { + if (lc->prt[j].port.id == port) { + *plc = lc; + return j; + } + } + } + + return -ENOENT; +} + +static int +netbe_dest_cmp(const void *s1, const void *s2) +{ + const struct netbe_dest *p1, *p2; + + p1 = s1; + p2 = s2; + if (p1->port == p2->port) + return p1->family - p2->family; + else + return p1->port - p2->port; +} + +static int +netbe_dest_init(const char *fname, struct netbe_cfg *cfg) +{ + int32_t rc; + uint32_t f, i, j, p; + struct netbe_lcore *lc; + struct netbe_dest_prm prm; + + rc = netbe_parse_dest(fname, &prm); + if (rc != 0) + return rc; + + qsort(prm.dest, prm.nb_dest, sizeof(prm.dest[0]), netbe_dest_cmp); + + rc = 0; + for (i = 0; i != prm.nb_dest; i = j) { + + p = prm.dest[i].port; + f = prm.dest[i].family; + for (j = i + 1; j != prm.nb_dest && p == prm.dest[j].port && + f == prm.dest[j].family; + j++) + ; + + rc = netbe_port2lcore(cfg, p, &lc); + if (rc < 0) { + RTE_LOG(ERR, USER1, "%s(%s) error at line %u: " + "port %u not managed by any lcore;\n", + __func__, fname, prm.dest[i].line, p); + break; + } + + rc = netbe_add_dest(lc, rc, f, prm.dest + i, j - i); + if (rc != 0) + break; + } + + free(prm.dest); + return rc; +} + +static void +netfe_stream_close(struct netfe_lcore *fe, uint32_t dec) +{ + uint32_t sidx; + + fe->sidx -= dec; + sidx = fe->sidx; + tle_event_free(fe->fs[sidx].txev); + tle_event_free(fe->fs[sidx].rxev); + tle_udp_stream_close(fe->fs[sidx].s); + memset(&fe->fs[sidx], 0, sizeof(fe->fs[sidx])); +} + +static void +netfe_stream_dump(const struct netfe_stream *fes) +{ + struct sockaddr_in *l4, *r4; + struct sockaddr_in6 *l6, *r6; + uint16_t lport, rport; + struct tle_udp_stream_param sprm; + char laddr[INET6_ADDRSTRLEN]; + char raddr[INET6_ADDRSTRLEN]; + + tle_udp_stream_get_param(fes->s, &sprm); + + if (sprm.local_addr.ss_family == AF_INET) { + + l4 = (struct sockaddr_in *)&sprm.local_addr; + r4 = (struct sockaddr_in *)&sprm.remote_addr; + + lport = l4->sin_port; + rport = r4->sin_port; + + } else if (sprm.local_addr.ss_family == AF_INET6) { + + l6 = (struct sockaddr_in6 *)&sprm.local_addr; + r6 = (struct sockaddr_in6 *)&sprm.remote_addr; + + lport = l6->sin6_port; + rport = r6->sin6_port; + + } else { + RTE_LOG(ERR, USER1, "stream@%p - unknown family=%hu\n", + fes->s, sprm.local_addr.ss_family); + return; + } + + format_addr(&sprm.local_addr, laddr, sizeof(laddr)); + format_addr(&sprm.remote_addr, raddr, sizeof(raddr)); + + RTE_LOG(INFO, USER1, + "stream@%p={" + "family=%hu,laddr=%s,lport=%hu,raddr=%s,rport=%hu," + "stats={" + "rxp=%" PRIu64 ",txp=%" PRIu64 ",drops=%" PRIu64 "," + "rxev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "]," + "txev[IDLE, DOWN, UP]=[%" PRIu64 ", %" PRIu64 ", %" PRIu64 "]," + "}};\n", + fes->s, + sprm.local_addr.ss_family, + laddr, ntohs(lport), raddr, ntohs(rport), + fes->stat.rxp, fes->stat.txp, fes->stat.drops, + fes->stat.rxev[TLE_SEV_IDLE], + fes->stat.rxev[TLE_SEV_DOWN], + fes->stat.rxev[TLE_SEV_UP], + fes->stat.txev[TLE_SEV_IDLE], + fes->stat.txev[TLE_SEV_DOWN], + fes->stat.txev[TLE_SEV_UP]); +} + + +/* + * helper function: opens IPv4 and IPv6 streams for selected port. + */ +static struct netfe_stream * +netfe_stream_open(struct netfe_lcore *fe, struct tle_udp_stream_param *sprm, + uint32_t lcore, uint16_t op, uint32_t bidx) +{ + int32_t rc; + uint32_t sidx; + struct netfe_stream *fes; + + sidx = fe->sidx; + fes = fe->fs + sidx; + if (sidx >= fe->snum) { + rte_errno = ENOBUFS; + return NULL; + } + + fes->rxev = tle_event_alloc(fe->rxeq, &fe->fs[sidx]); + fes->txev = tle_event_alloc(fe->txeq, &fe->fs[sidx]); + sprm->recv_ev = fes->rxev; + if (op != FWD) + sprm->send_ev = fes->txev; + + RTE_LOG(ERR, USER1, "%s(%u) [%u]={op=%hu, rxev=%p, txev=%p}\n", + __func__, lcore, sidx, op, fes->rxev, fes->txev); + if (fes->rxev == NULL || fes->txev == NULL) { + netfe_stream_close(fe, 0); + rte_errno = ENOMEM; + return NULL; + } + + if (op == TXONLY || op == FWD) { + tle_event_active(fes->txev, TLE_SEV_DOWN); + fes->stat.txev[TLE_SEV_DOWN]++; + } + + if (op != TXONLY) { + tle_event_active(fes->rxev, TLE_SEV_DOWN); + fes->stat.rxev[TLE_SEV_DOWN]++; + } + + fes->s = tle_udp_stream_open(becfg.cpu[bidx].ctx, sprm); + if (fes->s == NULL) { + rc = rte_errno; + netfe_stream_close(fe, 0); + rte_errno = rc; + return NULL; + } + + fes->op = op; + fes->family = sprm->local_addr.ss_family; + + fe->sidx = sidx + 1; + return fes; + +} + +static inline int +netfe_addr_eq(struct sockaddr_storage *l, struct sockaddr_storage *r, + uint16_t family) +{ + struct sockaddr_in *l4, *r4; + struct sockaddr_in6 *l6, *r6; + + if (family == AF_INET) { + l4 = (struct sockaddr_in *)l; + r4 = (struct sockaddr_in *)r; + return (l4->sin_port == r4->sin_port && + l4->sin_addr.s_addr == r4->sin_addr.s_addr); + } else { + l6 = (struct sockaddr_in6 *)l; + r6 = (struct sockaddr_in6 *)r; + return (l6->sin6_port == r6->sin6_port && + memcmp(&l6->sin6_addr, &r6->sin6_addr, + sizeof(l6->sin6_addr))); + } +} + +static inline void +netfe_pkt_addr(const struct rte_mbuf *m, struct sockaddr_storage *ps, + uint16_t family) +{ + const struct ipv4_hdr *ip4h; + const struct ipv6_hdr *ip6h; + const struct udp_hdr *udph; + struct sockaddr_in *in4; + struct sockaddr_in6 *in6; + + NETFE_PKT_DUMP(m); + + udph = rte_pktmbuf_mtod_offset(m, struct udp_hdr *, -m->l4_len); + + if (family == AF_INET) { + in4 = (struct sockaddr_in *)ps; + ip4h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, + -(m->l4_len + m->l3_len)); + in4->sin_port = udph->src_port; + in4->sin_addr.s_addr = ip4h->src_addr; + } else { + in6 = (struct sockaddr_in6 *)ps; + ip6h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *, + -(m->l4_len + m->l3_len)); + in6->sin6_port = udph->src_port; + rte_memcpy(&in6->sin6_addr, ip6h->src_addr, + sizeof(in6->sin6_addr)); + } +} + +static inline uint32_t +pkt_eq_addr(struct rte_mbuf *pkt[], uint32_t num, uint16_t family, + struct sockaddr_storage *cur, struct sockaddr_storage *nxt) +{ + uint32_t i; + + for (i = 0; i != num; i++) { + netfe_pkt_addr(pkt[i], nxt, family); + if (netfe_addr_eq(cur, nxt, family) == 0) + break; + } + + return i; +} + +static inline void +pkt_buf_empty(struct pkt_buf *pb) +{ + uint32_t i; + + for (i = 0; i != pb->num; i++) + rte_pktmbuf_free(pb->pkt[i]); + + pb->num = 0; +} + +static inline void +pkt_buf_fill(uint32_t lcore, struct pkt_buf *pb, uint32_t dlen) +{ + uint32_t i; + int32_t sid; + + sid = rte_lcore_to_socket_id(lcore) + 1; + + for (i = pb->num; i != RTE_DIM(pb->pkt); i++) { + pb->pkt[i] = rte_pktmbuf_alloc(mpool[sid]); + if (pb->pkt[i] == NULL) + break; + rte_pktmbuf_append(pb->pkt[i], dlen); + } + + pb->num = i; +} + +static struct netfe_stream * +find_fwd_dst(uint32_t lcore, struct netfe_stream *fes, + const struct sockaddr *sa) +{ + uint32_t rc; + struct netfe_stream *fed; + struct netfe_lcore *fe; + struct tle_udp_stream_param sprm; + + fe = RTE_PER_LCORE(_fe); + + fed = fwd_tbl_lkp(fe, fes->family, sa); + if (fed != NULL) + return fed; + + /* create a new stream and put it into the fwd table. */ + + sprm = fes->fwdprm.prm; + + /* open forward stream with wildcard remote addr. */ + memset(&sprm.remote_addr.ss_family + 1, 0, + sizeof(sprm.remote_addr) - sizeof(sprm.remote_addr.ss_family)); + fed = netfe_stream_open(fe, &sprm, lcore, FWD, fes->fwdprm.bidx); + if (fed == NULL) + return NULL; + + rc = fwd_tbl_add(fe, fes->family, sa, fed); + if (rc != 0) { + netfe_stream_close(fe, 1); + fed = NULL; + } + + fed->fwdprm.prm.remote_addr = *(const struct sockaddr_storage *)sa; + return fed; +} + +static inline void +netfe_tx_process(uint32_t lcore, struct netfe_stream *fes) +{ + uint32_t i, k, n; + + /* refill with new mbufs. */ + pkt_buf_fill(lcore, &fes->pbuf, fes->txlen); + + n = fes->pbuf.num; + if (n == 0) + return; + + k = tle_udp_stream_send(fes->s, fes->pbuf.pkt, n, NULL); + NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n", + __func__, lcore, fes->s, n, k); + fes->stat.txp += k; + fes->stat.drops += n - k; + + if (k == 0) + return; + + /* adjust pbuf array. */ + fes->pbuf.num = n - k; + for (i = k; i != n; i++) + fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i]; +} + + +static inline void +netfe_fwd(uint32_t lcore, struct netfe_stream *fes) +{ + uint32_t i, j, k, n, x; + uint16_t family; + void *pi0, *pi1, *pt; + struct rte_mbuf **pkt; + struct netfe_stream *fed; + struct sockaddr_storage in[2]; + + family = fes->family; + n = fes->pbuf.num; + pkt = fes->pbuf.pkt; + + if (n == 0) + return; + + in[0].ss_family = family; + in[1].ss_family = family; + pi0 = &in[0]; + pi1 = &in[1]; + + netfe_pkt_addr(pkt[0], pi0, family); + + x = 0; + for (i = 0; i != n; i = j) { + + j = i + pkt_eq_addr(&pkt[i + 1], + n - i - 1, family, pi0, pi1) + 1; + + fed = find_fwd_dst(lcore, fes, (const struct sockaddr *)pi0); + if (fed != NULL) { + + k = tle_udp_stream_send(fed->s, pkt + i, j - i, + (const struct sockaddr *) + &fes->fwdprm.prm.remote_addr); + + NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) " + "returns %u\n", + __func__, lcore, fed->s, j - i, k); + fed->stat.txp += k; + fed->stat.drops += j - i - k; + fes->stat.fwp += k; + + } else { + NETFE_TRACE("%s(%u, %p): no fwd stream for %u pkts;\n", + __func__, lcore, fes->s, j - i); + for (k = i; k != j; k++) { + NETFE_TRACE("%s(%u, %p): free(%p);\n", + __func__, lcore, fes->s, pkt[k]); + rte_pktmbuf_free(pkt[j]); + } + fes->stat.drops += j - i; + } + + /* copy unforwarded mbufs. */ + for (i += k; i != j; i++, x++) + pkt[x] = pkt[i]; + + /* swap the pointers */ + pt = pi0; + pi0 = pi1; + pi1 = pt; + } + + fes->pbuf.num = x; + + if (x != 0) { + tle_event_raise(fes->txev); + fes->stat.txev[TLE_SEV_UP]++; + } + + if (n == RTE_DIM(fes->pbuf.pkt)) { + tle_event_active(fes->rxev, TLE_SEV_UP); + fes->stat.rxev[TLE_SEV_UP]++; + } +} + +static inline void +netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) +{ + uint32_t k, n; + + n = fes->pbuf.num; + k = RTE_DIM(fes->pbuf.pkt) - n; + + /* packet buffer is full, can't receive any new packets. */ + if (k == 0) { + tle_event_idle(fes->rxev); + fes->stat.rxev[TLE_SEV_IDLE]++; + return; + } + + n = tle_udp_stream_recv(fes->s, fes->pbuf.pkt + n, k); + if (n == 0) + return; + + NETFE_TRACE("%s(%u): tle_udp_stream_recv(%p, %u) returns %u\n", + __func__, lcore, fes->s, k, n); + + fes->pbuf.num += n; + fes->stat.rxp += n; + + /* free all received mbufs. */ + if (fes->op == RXONLY) + pkt_buf_empty(&fes->pbuf); + /* mark stream as writable */ + else if (k == RTE_DIM(fes->pbuf.pkt)) { + if (fes->op == RXTX) { + tle_event_active(fes->txev, TLE_SEV_UP); + fes->stat.txev[TLE_SEV_UP]++; + } else if (fes->op == FWD) { + tle_event_raise(fes->txev); + fes->stat.txev[TLE_SEV_UP]++; + } + } +} + +static inline void +netfe_rxtx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes) +{ + uint32_t i, j, k, n; + uint16_t family; + void *pi0, *pi1, *pt; + struct rte_mbuf **pkt; + struct sockaddr_storage in[2]; + + family = fes->family; + n = fes->pbuf.num; + pkt = fes->pbuf.pkt; + + /* there is nothing to send. */ + if (n == 0) { + tle_event_idle(fes->txev); + fes->stat.txev[TLE_SEV_IDLE]++; + return; + } + + in[0].ss_family = family; + in[1].ss_family = family; + pi0 = &in[0]; + pi1 = &in[1]; + + netfe_pkt_addr(pkt[0], pi0, family); + + for (i = 0; i != n; i = j) { + + j = i + pkt_eq_addr(&pkt[i + 1], + n - i - 1, family, pi0, pi1) + 1; + + k = tle_udp_stream_send(fes->s, pkt + i, j - i, + (const struct sockaddr *)pi0); + + NETFE_TRACE("%s(%u): tle_udp_stream_send(%p, %u) returns %u\n", + __func__, lcore, fes->s, j - i, k); + fes->stat.txp += k; + fes->stat.drops += j - i - k; + + i += k; + + /* stream send buffer is full */ + if (i != j) + break; + + /* swap the pointers */ + pt = pi0; + pi0 = pi1; + pi1 = pt; + } + + /* not able to send anything. */ + if (i == 0) + return; + + if (n == RTE_DIM(fes->pbuf.pkt)) { + /* mark stream as readable */ + tle_event_active(fes->rxev, TLE_SEV_UP); + fes->stat.rxev[TLE_SEV_UP]++; + } + + /* adjust pbuf array. */ + fes->pbuf.num = n - i; + for (j = i; j != n; j++) + pkt[j - i] = pkt[j]; +} + +static int +netfe_lcore(void *arg) +{ + size_t sz; + int32_t rc; + uint32_t i, j, n, lcore, snum; + const struct netfe_lcore_prm *prm; + struct netfe_lcore *fe; + struct tle_evq_param eprm; + struct tle_udp_stream_param sprm; + struct netfe_stream *fes, *fs[MAX_PKT_BURST]; + + lcore = rte_lcore_id(); + prm = arg; + + snum = prm->max_streams; + RTE_LOG(NOTICE, USER1, "%s(lcore=%u, nb_streams=%u, max_streams=%u)\n", + __func__, lcore, prm->nb_streams, snum); + + memset(&eprm, 0, sizeof(eprm)); + eprm.socket_id = rte_lcore_to_socket_id(lcore); + eprm.max_events = snum; + + sz = sizeof(*fe) + snum * sizeof(fe->fs[0]); + fe = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + rte_lcore_to_socket_id(lcore)); + + if (fe == NULL) { + RTE_LOG(ERR, USER1, "%s:%d failed to allocate %zu bytes\n", + __func__, __LINE__, sz); + return -ENOMEM; + } + + RTE_PER_LCORE(_fe) = fe; + + fe->snum = snum; + fe->fs = (struct netfe_stream *)(fe + 1); + + fe->rxeq = tle_evq_create(&eprm); + fe->txeq = tle_evq_create(&eprm); + + RTE_LOG(ERR, USER1, "%s(%u) rx evq=%p, tx evq=%p\n", + __func__, lcore, fe->rxeq, fe->txeq); + if (fe->rxeq == NULL || fe->txeq == NULL) + return -ENOMEM; + + rc = fwd_tbl_init(fe, AF_INET, lcore); + RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n", + __func__, lcore, AF_INET, rc); + if (rc != 0) + return rc; + + rc = fwd_tbl_init(fe, AF_INET6, lcore); + RTE_LOG(ERR, USER1, "%s(%u) fwd_tbl_init(%u) returns %d\n", + __func__, lcore, AF_INET6, rc); + if (rc != 0) + return rc; + + /* open all requested streams. */ + for (i = 0; i != prm->nb_streams; i++) { + sprm = prm->stream[i].sprm.prm; + fes = netfe_stream_open(fe, &sprm, lcore, prm->stream[i].op, + prm->stream[i].sprm.bidx); + if (fes == NULL) { + rc = -rte_errno; + break; + } + + netfe_stream_dump(fes); + + if (prm->stream[i].op == FWD) { + fes->fwdprm = prm->stream[i].fprm; + rc = fwd_tbl_add(fe, + prm->stream[i].fprm.prm.remote_addr.ss_family, + (const struct sockaddr *) + &prm->stream[i].fprm.prm.remote_addr, + fes); + if (rc != 0) { + netfe_stream_close(fe, 1); + break; + } + } else if (prm->stream[i].op == TXONLY) { + fes->txlen = prm->stream[i].txlen; + fes->raddr = sprm.remote_addr; + } + } + + while (fe->sidx >= prm->nb_streams && force_quit == 0) { + + n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, + RTE_DIM(fs)); + + if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) " + "returns %u\n", + __func__, lcore, fe->rxeq, n); + for (j = 0; j != n; j++) + netfe_rx_process(lcore, fs[j]); + } + + n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, + RTE_DIM(fs)); + + if (n != 0) { + NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) " + "returns %u\n", + __func__, lcore, fe->txeq, n); + for (j = 0; j != n; j++) { + if (fs[j]->op == RXTX) + netfe_rxtx_process(lcore, fs[j]); + else if (fs[j]->op == FWD) + netfe_fwd(lcore, fs[j]); + else if (fs[j]->op == TXONLY) + netfe_tx_process(lcore, fs[j]); + } + } + } + + RTE_LOG(NOTICE, USER1, "%s(lcore=%u) finish\n", + __func__, lcore); + + while (fe->sidx != 0) { + + i = fe->sidx - 1; + netfe_stream_dump(fe->fs + i); + netfe_stream_close(fe, 1); + } + + tle_evq_destroy(fe->txeq); + tle_evq_destroy(fe->rxeq); + rte_free(fe); + + return rc; +} + +static inline void +netbe_rx(struct netbe_lcore *lc, uint32_t pidx) +{ + uint32_t j, k, n; + struct rte_mbuf *pkt[MAX_PKT_BURST]; + struct rte_mbuf *rp[MAX_PKT_BURST]; + int32_t rc[MAX_PKT_BURST]; + + n = rte_eth_rx_burst(lc->prt[pidx].port.id, + lc->prt[pidx].rxqid, pkt, RTE_DIM(pkt)); + if (n == 0) + return; + + lc->prt[pidx].rx_stat.in += n; + NETBE_TRACE("%s(%u): rte_eth_rx_burst(%u, %u) returns %u\n", + __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].rxqid, + n); + + k = tle_udp_rx_bulk(lc->prt[pidx].dev, pkt, rp, rc, n); + + lc->prt[pidx].rx_stat.up += k; + lc->prt[pidx].rx_stat.drop += n - k; + NETBE_TRACE("%s(%u): tle_udp_rx_bulk(%p, %u) returns %u\n", + __func__, lc->id, lc->prt[pidx].dev, n, k); + + for (j = 0; j != n - k; j++) { + NETBE_TRACE("%s:%d(port=%u) rp[%u]={%p, %d};\n", + __func__, __LINE__, lc->prt[pidx].port.id, + j, rp[j], rc[j]); + rte_pktmbuf_free(rp[j]); + } +} + +static inline void +netbe_tx(struct netbe_lcore *lc, uint32_t pidx) +{ + uint32_t j, k, n; + struct rte_mbuf **mb; + + n = lc->prt[pidx].tx_buf.num; + k = RTE_DIM(lc->prt[pidx].tx_buf.pkt) - n; + mb = lc->prt[pidx].tx_buf.pkt; + + if (k >= RTE_DIM(lc->prt[pidx].tx_buf.pkt) / 2) { + j = tle_udp_tx_bulk(lc->prt[pidx].dev, mb + n, k); + n += j; + lc->prt[pidx].tx_stat.down += j; + } + + if (n == 0) + return; + + NETBE_TRACE("%s(%u): tle_udp_tx_bulk(%p) returns %u,\n" + "total pkts to send: %u\n", + __func__, lc->id, lc->prt[pidx].dev, j, n); + + for (j = 0; j != n; j++) + NETBE_PKT_DUMP(mb[j]); + + k = rte_eth_tx_burst(lc->prt[pidx].port.id, + lc->prt[pidx].txqid, mb, n); + + lc->prt[pidx].tx_stat.out += k; + lc->prt[pidx].tx_stat.drop += n - k; + NETBE_TRACE("%s(%u): rte_eth_tx_burst(%u, %u, %u) returns %u\n", + __func__, lc->id, lc->prt[pidx].port.id, lc->prt[pidx].txqid, + n, k); + + lc->prt[pidx].tx_buf.num = n - k; + if (k != 0) + for (j = k; j != n; j++) + mb[j - k] = mb[j]; +} + +static int +netbe_lcore(void *arg) +{ + uint32_t i, j; + int32_t rc; + struct netbe_lcore *lc; + + lc = arg; + RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) start\n", + __func__, lc->id, lc->ctx); + + /* + * ??????? + * wait for FE lcores to start, so BE dont' drop any packets + * because corresponding streams not opened yet by FE. + * usefull when used with pcap PMDS. + * think better way, or should this timeout be a cmdlien parameter. + * ??????? + */ + rte_delay_ms(10); + + for (i = 0; i != lc->prt_num; i++) { + RTE_LOG(NOTICE, USER1, "%s:%u(port=%u, udp_dev: %p)\n", + __func__, i, lc->prt[i].port.id, lc->prt[i].dev); + rc = setup_rx_cb(&lc->prt[i].port, lc); + if (rc < 0) + sig_handle(SIGQUIT); + } + + while (force_quit == 0) { + for (i = 0; i != lc->prt_num; i++) { + netbe_rx(lc, i); + netbe_tx(lc, i); + } + } + + RTE_LOG(NOTICE, USER1, "%s(lcore=%u, udp_ctx: %p) finish\n", + __func__, lc->id, lc->ctx); + for (i = 0; i != lc->prt_num; i++) { + RTE_LOG(NOTICE, USER1, "%s:%u(port=%u) " + "rx_stats={" + "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "}, " + "tx_stats={" + "in=%" PRIu64 ",up=%" PRIu64 ",drop=%" PRIu64 "};\n", + __func__, i, lc->prt[i].port.id, + lc->prt[i].rx_stat.in, + lc->prt[i].rx_stat.up, + lc->prt[i].rx_stat.drop, + lc->prt[i].tx_stat.down, + lc->prt[i].tx_stat.out, + lc->prt[i].tx_stat.drop); + } + + + for (i = 0; i != lc->prt_num; i++) { + for (j = 0; j != lc->prt[i].tx_buf.num; j++) + rte_pktmbuf_free(lc->prt[i].tx_buf.pkt[j]); + } + + return 0; +} + +static int +netfe_lcore_cmp(const void *s1, const void *s2) +{ + const struct netfe_stream_prm *p1, *p2; + + p1 = s1; + p2 = s2; + return p1->lcore - p2->lcore; +} + +/* + * Helper functions, finds BE by given local and remote addresses. + */ +static int +netbe_find4(const struct in_addr *laddr, const struct in_addr *raddr) +{ + uint32_t i, j; + int32_t rc; + uint32_t idx; + struct netbe_lcore *bc; + + if (laddr->s_addr == INADDR_ANY) { + + /* we have exactly one BE, use it for all traffic */ + if (becfg.cpu_num == 1) + return 0; + + /* search by remote address. */ + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + rc = rte_lpm_lookup(bc->lpm4, + rte_be_to_cpu_32(raddr->s_addr), &idx); + if (rc == 0) + return i; + } + } else { + + /* search by local address */ + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + for (j = 0; j != bc->prt_num; j++) + if (laddr->s_addr == bc->prt[j].port.ipv4) + return i; + } + } + + return -ENOENT; +} + +static int +netbe_find6(const struct in6_addr *laddr, const struct in6_addr *raddr) +{ + uint32_t i, j; + int32_t rc; + uint8_t idx; + struct netbe_lcore *bc; + + if (memcmp(laddr, &in6addr_any, sizeof(*laddr)) == 0) { + + /* we have exactly one BE, use it for all traffic */ + if (becfg.cpu_num == 1) + return 0; + + /* search by remote address. */ + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + rc = rte_lpm6_lookup(bc->lpm6, + (uint8_t *)(uintptr_t)raddr->s6_addr, &idx); + if (rc == 0) + return i; + } + } else { + /* search by local address */ + for (i = 0; i != becfg.cpu_num; i++) { + bc = becfg.cpu + i; + for (j = 0; j != bc->prt_num; j++) + if (memcmp(laddr, &bc->prt[j].port.ipv6, + sizeof(*laddr)) == 0) + return i; + } + } + + return -ENOENT; +} + +static int +netbe_find(const struct tle_udp_stream_param *p) +{ + const struct sockaddr_in *l4, *r4; + const struct sockaddr_in6 *l6, *r6; + + if (p->local_addr.ss_family == AF_INET) { + l4 = (const struct sockaddr_in *)&p->local_addr; + r4 = (const struct sockaddr_in *)&p->remote_addr; + return netbe_find4(&l4->sin_addr, &r4->sin_addr); + } else if (p->local_addr.ss_family == AF_INET6) { + l6 = (const struct sockaddr_in6 *)&p->local_addr; + r6 = (const struct sockaddr_in6 *)&p->remote_addr; + return netbe_find6(&l6->sin6_addr, &r6->sin6_addr); + } + return -EINVAL; +} + +static int +netfe_sprm_flll_be(struct netfe_sprm *sp, uint32_t line) +{ + int32_t bidx; + + bidx = netbe_find(&sp->prm); + if (bidx < 0) { + RTE_LOG(ERR, USER1, "%s(line=%u): no BE for that stream\n", + __func__, line); + return -EINVAL; + } + sp->bidx = bidx; + return 0; +} + +/* start front-end processing. */ +static int +netfe_launch(struct netfe_lcore_prm *lprm) +{ + uint32_t i, j, k, lc, ln, mi; + struct netfe_lcore_prm feprm[RTE_MAX_LCORE]; + + /* determine on what BE each stream should be open. */ + for (i = 0; i != lprm->nb_streams; i++) { + + lc = lprm->stream[i].lcore; + ln = lprm->stream[i].line; + + if (netfe_sprm_flll_be(&lprm->stream[i].sprm, ln) != 0 || + (lprm->stream[i].op == FWD && + netfe_sprm_flll_be(&lprm->stream[i].fprm, + ln) != 0)) + return -EINVAL; + } + + /* group all fe parameters by lcore. */ + + memset(feprm, 0, sizeof(feprm)); + qsort(lprm->stream, lprm->nb_streams, sizeof(lprm->stream[0]), + netfe_lcore_cmp); + + k = 0; + mi = UINT32_MAX; + for (i = 0; i != lprm->nb_streams; i = j) { + + lc = lprm->stream[i].lcore; + ln = lprm->stream[i].line; + + if (rte_lcore_is_enabled(lc) == 0) { + RTE_LOG(ERR, USER1, + "%s(line=%u): lcore %u is not enabled\n", + __func__, ln, lc); + return -EINVAL; + } + + if (rte_get_master_lcore() == lc) + mi = k; + else if (rte_eal_get_lcore_state(lc) == RUNNING) { + RTE_LOG(ERR, USER1, + "%s(line=%u): lcore %u already in use\n", + __func__, ln, lc); + return -EINVAL; + } + + for (j = i + 1; j != lprm->nb_streams && + lc == lprm->stream[j].lcore; + j++) + ; + + feprm[k].max_streams = lprm->max_streams; + feprm[k].nb_streams = j - i; + feprm[k].stream = lprm->stream + i; + k++; + } + + /* launch all slave FE lcores. */ + for (i = 0; i != k; i++) { + if (i != mi) + rte_eal_remote_launch(netfe_lcore, feprm + i, + feprm[i].stream[0].lcore); + } + + /* launch FE at master lcore. */ + if (mi != UINT32_MAX) + netfe_lcore(feprm + mi); + + return 0; +} + +int +main(int argc, char *argv[]) +{ + int32_t opt, opt_idx, rc; + uint32_t i; + uint64_t v; + struct tle_udp_ctx_param ctx_prm; + struct netfe_lcore_prm feprm; + struct rte_eth_stats stats; + char fecfg_fname[PATH_MAX + 1]; + char becfg_fname[PATH_MAX + 1]; + + fecfg_fname[0] = 0; + becfg_fname[0] = 0; + + rc = rte_eal_init(argc, argv); + if (rc < 0) + rte_exit(EXIT_FAILURE, + "%s: rte_eal_init failed with error code: %d\n", + __func__, rc); + + argc -= rc; + argv += rc; + + optind = 0; + optarg = NULL; + while ((opt = getopt_long(argc, argv, "PR:S:b:f:s:", long_opt, + &opt_idx)) != EOF) { + if (opt == OPT_SHORT_PROMISC) { + becfg.promisc = 1; + } else if (opt == OPT_SHORT_RBUFS) { + rc = parse_uint_val(NULL, optarg, &v); + if (rc < 0) + rte_exit(EXIT_FAILURE, "%s: invalid value: %s " + "for option: \'%c\'\n", + __func__, optarg, opt); + ctx_prm.max_stream_rbufs = v; + } else if (opt == OPT_SHORT_SBUFS) { + rc = parse_uint_val(NULL, optarg, &v); + if (rc < 0) + rte_exit(EXIT_FAILURE, "%s: invalid value: %s " + "for option: \'%c\'\n", + __func__, optarg, opt); + ctx_prm.max_stream_sbufs = v; + } else if (opt == OPT_SHORT_STREAMS) { + rc = parse_uint_val(NULL, optarg, &v); + if (rc < 0) + rte_exit(EXIT_FAILURE, "%s: invalid value: %s " + "for option: \'%c\'\n", + __func__, optarg, opt); + ctx_prm.max_streams = v; + } else if (opt == OPT_SHORT_BECFG) { + snprintf(becfg_fname, sizeof(becfg_fname), "%s", + optarg); + } else if (opt == OPT_SHORT_FECFG) { + snprintf(fecfg_fname, sizeof(fecfg_fname), "%s", + optarg); + } else { + rte_exit(EXIT_FAILURE, + "%s: unknown option: \'%c\'\n", + __func__, opt); + } + } + + signal(SIGINT, sig_handle); + + netbe_port_init(&becfg, argc - optind, argv + optind); + netbe_lcore_init(&becfg, &ctx_prm); + + if ((rc = netbe_dest_init(becfg_fname, &becfg)) != 0) + sig_handle(SIGQUIT); + + for (i = 0; i != becfg.prt_num && rc == 0; i++) { + RTE_LOG(NOTICE, USER1, "%s: starting port %u\n", + __func__, becfg.prt[i].id); + rc = rte_eth_dev_start(becfg.prt[i].id); + if (rc != 0) { + RTE_LOG(ERR, USER1, + "%s: rte_eth_dev_start(%u) returned " + "error code: %d\n", + __func__, becfg.prt[i].id, rc); + sig_handle(SIGQUIT); + } + } + + feprm.max_streams = ctx_prm.max_streams * becfg.cpu_num; + if (rc == 0 && (rc = netfe_parse_cfg(fecfg_fname, &feprm)) != 0) + sig_handle(SIGQUIT); + + for (i = 0; rc == 0 && i != becfg.cpu_num; i++) { + rte_eal_remote_launch(netbe_lcore, becfg.cpu + i, + becfg.cpu[i].id); + } + + if (rc == 0 && (rc = netfe_launch(&feprm)) != 0) + sig_handle(SIGQUIT); + + rte_eal_mp_wait_lcore(); + + for (i = 0; i != becfg.prt_num; i++) { + RTE_LOG(NOTICE, USER1, "%s: stoping port %u\n", + __func__, becfg.prt[i].id); + rte_eth_stats_get(becfg.prt[i].id, &stats); + RTE_LOG(NOTICE, USER1, "port %u stats={\n" + "ipackets=%" PRIu64 ";" + "ibytes=%" PRIu64 ";" + "ierrors=%" PRIu64 ";\n" + "opackets=%" PRIu64 ";" + "obytes=%" PRIu64 ";" + "oerrors=%" PRIu64 ";\n" + "}\n", + becfg.prt[i].id, + stats.ipackets, + stats.ibytes, + stats.ierrors, + stats.opackets, + stats.obytes, + stats.oerrors); + rte_eth_dev_stop(becfg.prt[i].id); + } + + netbe_lcore_fini(&becfg); + + return 0; +} diff --git a/examples/udpfwd/netbe.h b/examples/udpfwd/netbe.h new file mode 100644 index 0000000..41bd452 --- /dev/null +++ b/examples/udpfwd/netbe.h @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __NETBE_H__ +#define __NETBE_H__ + +#include <stdint.h> +#include <stddef.h> +#include <inttypes.h> +#include <getopt.h> +#include <arpa/inet.h> +#include <assert.h> +#include <signal.h> + +#include <rte_config.h> +#include <rte_common.h> +#include <rte_eal.h> +#include <rte_lcore.h> +#include <rte_ethdev.h> +#include <rte_kvargs.h> +#include <rte_errno.h> +#include <rte_malloc.h> +#include <rte_cycles.h> +#include <rte_lpm.h> +#include <rte_lpm6.h> +#include <rte_hash.h> +#include <rte_ip.h> +#include <rte_ip_frag.h> +#include <rte_udp.h> +#include <tle_udp_impl.h> +#include <tle_event.h> + +#define MAX_PKT_BURST 0x20 + +/* + * BE related structures. + */ + +struct netbe_port { + uint32_t id; + uint32_t lcore; + uint32_t mtu; + uint32_t rx_offload; + uint32_t tx_offload; + uint32_t ipv4; + struct in6_addr ipv6; + struct ether_addr mac; +}; + +struct netbe_dest { + uint32_t line; + uint32_t port; + uint32_t mtu; + uint32_t prfx; + uint16_t family; + union { + struct in_addr ipv4; + struct in6_addr ipv6; + }; + struct ether_addr mac; +}; + +struct netbe_dest_prm { + uint32_t nb_dest; + struct netbe_dest *dest; +}; + +struct pkt_buf { + uint32_t num; + struct rte_mbuf *pkt[2 * MAX_PKT_BURST]; +}; + +struct netbe_dev { + uint16_t rxqid; + uint16_t txqid; + struct netbe_port port; + struct tle_udp_dev *dev; + struct { + uint64_t in; + uint64_t up; + uint64_t drop; + } rx_stat; + struct { + uint64_t down; + uint64_t out; + uint64_t drop; + } tx_stat; + struct pkt_buf tx_buf; +}; + +/* 8 bit LPM user data. */ +#define LCORE_MAX_DST (UINT8_MAX + 1) + +struct netbe_lcore { + uint32_t id; + struct rte_lpm *lpm4; + struct rte_lpm6 *lpm6; + struct rte_ip_frag_tbl *ftbl; + struct tle_udp_ctx *ctx; + uint32_t prt_num; + uint32_t dst4_num; + uint32_t dst6_num; + struct netbe_dev prt[RTE_MAX_ETHPORTS]; + struct tle_udp_dest dst4[LCORE_MAX_DST]; + struct tle_udp_dest dst6[LCORE_MAX_DST]; + struct rte_ip_frag_death_row death_row; +}; + +struct netbe_cfg { + uint32_t promisc; + uint32_t prt_num; + uint32_t cpu_num; + struct netbe_port prt[RTE_MAX_ETHPORTS]; + struct netbe_lcore cpu[RTE_MAX_LCORE]; +}; + +/* + * FE related structures. + */ + +enum { + RXONLY, + TXONLY, + RXTX, + FWD, +}; + +struct netfe_sprm { + uint32_t bidx; /* BE index to use. */ + struct tle_udp_stream_param prm; +}; + +struct netfe_stream_prm { + uint32_t lcore; + uint32_t line; + uint16_t op; + uint16_t txlen; /* valid/used only for TXONLY op. */ + struct netfe_sprm sprm; + struct netfe_sprm fprm; /* valid/used only for FWD op. */ +}; + +struct netfe_lcore_prm { + uint32_t max_streams; + uint32_t nb_streams; + struct netfe_stream_prm *stream; +}; + +struct netfe_stream { + struct tle_udp_stream *s; + struct tle_event *rxev; + struct tle_event *txev; + uint16_t op; + uint16_t family; + uint16_t txlen; + struct { + uint64_t rxp; + uint64_t txp; + uint64_t fwp; + uint64_t drops; + uint64_t rxev[TLE_SEV_NUM]; + uint64_t txev[TLE_SEV_NUM]; + } stat; + struct pkt_buf pbuf; + struct sockaddr_storage raddr; + struct netfe_sprm fwdprm; +}; + +struct netfe_lcore { + uint32_t snum; /* max number of streams */ + uint32_t sidx; /* last open stream index */ + struct tle_evq *rxeq; + struct tle_evq *txeq; + struct rte_hash *fw4h; + struct rte_hash *fw6h; + struct netfe_stream *fs; +}; + +/* + * debug/trace macros. + */ + +#define DUMMY_MACRO do {} while (0) + +#ifdef NETFE_DEBUG +#define NETFE_TRACE(fmt, arg...) printf(fmt, ##arg) +#define NETFE_PKT_DUMP(p) rte_pktmbuf_dump(stdout, (p), 64) +#else +#define NETFE_TRACE(fmt, arg...) DUMMY_MACRO +#define NETFE_PKT_DUMP(p) DUMMY_MACRO +#endif + +#ifdef NETBE_DEBUG +#define NETBE_TRACE(fmt, arg...) printf(fmt, ##arg) +#define NETBE_PKT_DUMP(p) rte_pktmbuf_dump(stdout, (p), 64) +#else +#define NETBE_TRACE(fmt, arg...) DUMMY_MACRO +#define NETBE_PKT_DUMP(p) DUMMY_MACRO +#endif + +#define FUNC_STAT(v, c) do { \ + static uint64_t nb_call, nb_data; \ + nb_call++; \ + nb_data += (v); \ + if ((nb_call & ((c) - 1)) == 0) { \ + printf("%s#%d@%u: nb_call=%lu, avg(" #v ")=%#Lf\n", \ + __func__, __LINE__, rte_lcore_id(), nb_call, \ + (long double)nb_data / nb_call); \ + nb_call = 0; \ + nb_data = 0; \ + } \ +} while (0) + +#define FUNC_TM_STAT(v, c) do { \ + static uint64_t nb_call, nb_data; \ + static uint64_t cts, pts, sts; \ + cts = rte_rdtsc(); \ + if (pts != 0) \ + sts += cts - pts; \ + pts = cts; \ + nb_call++; \ + nb_data += (v); \ + if ((nb_call & ((c) - 1)) == 0) { \ + printf("%s#%d@%u: nb_call=%lu, " \ + "avg(" #v ")=%#Lf, " \ + "avg(cycles)=%#Lf, " \ + "avg(cycles/" #v ")=%#Lf\n", \ + __func__, __LINE__, rte_lcore_id(), nb_call, \ + (long double)nb_data / nb_call, \ + (long double)sts / nb_call, \ + (long double)sts / nb_data); \ + nb_call = 0; \ + nb_data = 0; \ + sts = 0; \ + } \ +} while (0) + +int setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc); + +#endif /* __NETBE_H__ */ diff --git a/examples/udpfwd/parse.c b/examples/udpfwd/parse.c new file mode 100644 index 0000000..979145c --- /dev/null +++ b/examples/udpfwd/parse.c @@ -0,0 +1,586 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "netbe.h" +#include "parse.h" + +#define DEF_LINE_NUM 0x400 + +static const struct { + const char *name; + uint16_t op; +} name2feop[] = { + { .name = "rx", .op = RXONLY,}, + { .name = "tx", .op = TXONLY,}, + { .name = "echo", .op = RXTX,}, + { .name = "fwd", .op = FWD,}, +}; + +static int +parse_ipv4_val(__rte_unused const char *key, const char *val, void *prm) +{ + union parse_val *rv; + + rv = prm; + if (inet_pton(AF_INET, val, &rv->in.addr4) != 1) + return -EINVAL; + rv->in.family = AF_INET; + return 0; +} + +static int +parse_ipv6_val(__rte_unused const char *key, const char *val, void *prm) +{ + union parse_val *rv; + + rv = prm; + if (inet_pton(AF_INET6, val, &rv->in.addr6) != 1) + return -EINVAL; + rv->in.family = AF_INET6; + return 0; +} + +static int +parse_ip_val(__rte_unused const char *key, const char *val, void *prm) +{ + if (parse_ipv6_val(key, val, prm) != 0 && + parse_ipv4_val(key, val, prm) != 0) + return -EINVAL; + return 0; +} + + +#define PARSE_UINT8x16(s, v, l) \ +do { \ + char *end; \ + unsigned long t; \ + errno = 0; \ + t = strtoul((s), &end, 16); \ + if (errno != 0 || end[0] != (l) || t > UINT8_MAX) \ + return -EINVAL; \ + (s) = end + 1; \ + (v) = t; \ +} while (0) + +static int +parse_mac_val(__rte_unused const char *key, const char *val, void *prm) +{ + union parse_val *rv; + const char *s; + + rv = prm; + s = val; + + PARSE_UINT8x16(s, rv->mac.addr_bytes[0], ':'); + PARSE_UINT8x16(s, rv->mac.addr_bytes[1], ':'); + PARSE_UINT8x16(s, rv->mac.addr_bytes[2], ':'); + PARSE_UINT8x16(s, rv->mac.addr_bytes[3], ':'); + PARSE_UINT8x16(s, rv->mac.addr_bytes[4], ':'); + PARSE_UINT8x16(s, rv->mac.addr_bytes[5], 0); + return 0; +} + +static int +parse_feop_val(__rte_unused const char *key, const char *val, void *prm) +{ + uint32_t i; + union parse_val *rv; + + rv = prm; + for (i = 0; i != RTE_DIM(name2feop); i++) { + if (strcmp(val, name2feop[i].name) == 0) { + rv->u64 = name2feop[i].op; + return 0; + } + } + + return -EINVAL; +} + +static int +parse_kvargs(const char *arg, const char *keys_man[], uint32_t nb_man, + const char *keys_opt[], uint32_t nb_opt, + const arg_handler_t hndl[], union parse_val val[]) +{ + uint32_t j, k; + struct rte_kvargs *kvl; + + kvl = rte_kvargs_parse(arg, NULL); + if (kvl == NULL) { + RTE_LOG(ERR, USER1, + "%s: invalid parameter: %s\n", + __func__, arg); + return -EINVAL; + } + + for (j = 0; j != nb_man; j++) { + if (rte_kvargs_count(kvl, keys_man[j]) == 0) { + RTE_LOG(ERR, USER1, + "%s: %s missing mandatory key: %s\n", + __func__, arg, keys_man[j]); + rte_kvargs_free(kvl); + return -EINVAL; + } + } + + for (j = 0; j != nb_man; j++) { + if (rte_kvargs_process(kvl, keys_man[j], hndl[j], + val + j) != 0) { + RTE_LOG(ERR, USER1, + "%s: %s invalid value for key: %s\n", + __func__, arg, keys_man[j]); + rte_kvargs_free(kvl); + return -EINVAL; + } + } + + for (j = 0; j != nb_opt; j++) { + k = j + nb_man; + if (rte_kvargs_process(kvl, keys_opt[j], hndl[k], + val + k) != 0) { + RTE_LOG(ERR, USER1, + "%s: %s invalid value for key: %s\n", + __func__, arg, keys_opt[j]); + rte_kvargs_free(kvl); + return -EINVAL; + } + } + + rte_kvargs_free(kvl); + return 0; +} + +int +parse_netbe_arg(struct netbe_port *prt, const char *arg) +{ + int32_t rc; + + static const char *keys_man[] = { + "port", + "lcore", + }; + + static const char *keys_opt[] = { + "mtu", + "rx_offload", + "tx_offload", + "ipv4", + "ipv6", + }; + + static const arg_handler_t hndl[] = { + parse_uint_val, + parse_uint_val, + parse_uint_val, + parse_uint_val, + parse_uint_val, + parse_ipv4_val, + parse_ipv6_val, + }; + + union parse_val val[RTE_DIM(hndl)]; + + memset(val, 0, sizeof(val)); + val[2].u64 = ETHER_MAX_VLAN_FRAME_LEN - ETHER_CRC_LEN; + + rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man), + keys_opt, RTE_DIM(keys_opt), hndl, val); + if (rc != 0) + return rc; + + prt->id = val[0].u64; + prt->lcore = val[1].u64; + prt->mtu = val[2].u64; + prt->rx_offload = val[3].u64; + prt->tx_offload = val[4].u64; + prt->ipv4 = val[5].in.addr4.s_addr; + prt->ipv6 = val[6].in.addr6; + + return 0; +} +static int +check_netbe_dest(const struct netbe_dest *dst) +{ + if (dst->port >= RTE_MAX_ETHPORTS) { + RTE_LOG(ERR, USER1, "%s(line=%u) invalid port=%u", + __func__, dst->line, dst->port); + return -EINVAL; + } else if ((dst->family == AF_INET && + dst->prfx > sizeof(struct in_addr) * CHAR_BIT) || + (dst->family == AF_INET6 && + dst->prfx > sizeof(struct in6_addr) * CHAR_BIT)) { + RTE_LOG(ERR, USER1, "%s(line=%u) invalid masklen=%u", + __func__, dst->line, dst->prfx); + return -EINVAL; + } else if (dst->mtu > ETHER_MAX_JUMBO_FRAME_LEN - ETHER_CRC_LEN) { + RTE_LOG(ERR, USER1, "%s(line=%u) invalid mtu=%u", + __func__, dst->line, dst->mtu); + return -EINVAL; + } + return 0; +} + +static int +parse_netbe_dest(struct netbe_dest *dst, const char *arg) +{ + int32_t rc; + + static const char *keys_man[] = { + "port", + "addr", + "masklen", + "mac", + }; + + static const char *keys_opt[] = { + "mtu", + }; + + static const arg_handler_t hndl[] = { + parse_uint_val, + parse_ip_val, + parse_uint_val, + parse_mac_val, + parse_uint_val, + }; + + union parse_val val[RTE_DIM(hndl)]; + + /* set default values. */ + memset(val, 0, sizeof(val)); + val[4].u64 = ETHER_MAX_JUMBO_FRAME_LEN - ETHER_CRC_LEN; + + rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man), + keys_opt, RTE_DIM(keys_opt), hndl, val); + if (rc != 0) + return rc; + + dst->port = val[0].u64; + dst->family = val[1].in.family; + if (val[1].in.family == AF_INET) + dst->ipv4 = val[1].in.addr4; + else + dst->ipv6 = val[1].in.addr6; + dst->prfx = val[2].u64; + memcpy(&dst->mac, &val[3].mac, sizeof(dst->mac)); + dst->mtu = val[4].u64; + + return 0; +} + +int +netbe_parse_dest(const char *fname, struct netbe_dest_prm *prm) +{ + uint32_t i, ln, n, num; + int32_t rc; + size_t sz; + char *s; + FILE *f; + struct netbe_dest *dp; + char line[LINE_MAX]; + + f = fopen(fname, "r"); + if (f == NULL) { + RTE_LOG(ERR, USER1, "%s failed to open file \"%s\"\n", + __func__, fname); + return -EINVAL; + } + + n = 0; + num = 0; + dp = NULL; + + for (ln = 0; fgets(line, sizeof(line), f) != NULL; ln++) { + + /* skip spaces at the start. */ + for (s = line; isspace(s[0]); s++) + ; + + /* skip comment line. */ + if (s[0] == '#' || s[0] == 0) + continue; + + /* skip spaces at the end. */ + for (i = strlen(s); i-- != 0 && isspace(s[i]); s[i] = 0) + ; + + if (n == num) { + num += DEF_LINE_NUM; + sz = sizeof(dp[0]) * num; + dp = realloc(dp, sizeof(dp[0]) * num); + if (dp == NULL) { + RTE_LOG(ERR, USER1, + "%s(%s) allocation of %zu bytes " + "failed\n", + __func__, fname, sz); + rc = -ENOMEM; + break; + } + } + + dp[n].line = ln + 1; + if ((rc = parse_netbe_dest(dp + n, s)) != 0 || + (rc = check_netbe_dest(dp + n)) != 0) { + RTE_LOG(ERR, USER1, "%s(%s) failed to parse line %u\n", + __func__, fname, dp[n].line); + break; + } + n++; + } + + fclose(f); + + if (rc != 0) { + free(dp); + dp = NULL; + n = 0; + } + + prm->dest = dp; + prm->nb_dest = n; + return rc; +} + +static void +pv2saddr(struct sockaddr_storage *ss, const union parse_val *pva, + const union parse_val *pvp) +{ + ss->ss_family = pva->in.family; + if (pva->in.family == AF_INET) { + struct sockaddr_in *si = (struct sockaddr_in *)ss; + si->sin_addr = pva->in.addr4; + si->sin_port = rte_cpu_to_be_16((uint16_t)pvp->u64); + } else { + struct sockaddr_in6 *si = (struct sockaddr_in6 *)ss; + si->sin6_addr = pva->in.addr6; + si->sin6_port = rte_cpu_to_be_16((uint16_t)pvp->u64); + } +} + +static int +parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg) +{ + int32_t rc; + + static const char *keys_man[] = { + "lcore", + "op", + "laddr", + "lport", + "raddr", + "rport", + }; + + static const char *keys_opt[] = { + "txlen", + "fwladdr", + "fwlport", + "fwraddr", + "fwrport", + }; + + static const arg_handler_t hndl[] = { + parse_uint_val, + parse_feop_val, + parse_ip_val, + parse_uint_val, + parse_ip_val, + parse_uint_val, + parse_uint_val, + parse_ip_val, + parse_uint_val, + parse_ip_val, + parse_uint_val, + }; + + union parse_val val[RTE_DIM(hndl)]; + + memset(val, 0, sizeof(val)); + rc = parse_kvargs(arg, keys_man, RTE_DIM(keys_man), + keys_opt, RTE_DIM(keys_opt), hndl, val); + if (rc != 0) + return rc; + + sp->lcore = val[0].u64; + sp->op = val[1].u64; + pv2saddr(&sp->sprm.prm.local_addr, val + 2, val + 3); + pv2saddr(&sp->sprm.prm.remote_addr, val + 4, val + 5); + sp->txlen = val[6].u64; + pv2saddr(&sp->fprm.prm.local_addr, val + 7, val + 8); + pv2saddr(&sp->fprm.prm.remote_addr, val + 9, val + 10); + + return 0; +} + +static const char * +format_feop(uint16_t op) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(name2feop); i++) { + if (name2feop[i].op == op) + return name2feop[i].name; + } + + return NULL; +} + +static int +is_addr_wc(const struct sockaddr_storage *sp) +{ + const struct sockaddr_in *i4; + const struct sockaddr_in6 *i6; + + if (sp->ss_family == AF_INET) { + i4 = (const struct sockaddr_in *)sp; + return (i4->sin_addr.s_addr == INADDR_ANY); + } else if (sp->ss_family == AF_INET6) { + i6 = (const struct sockaddr_in6 *)sp; + return (memcmp(&i6->sin6_addr, &in6addr_any, + sizeof(i6->sin6_addr)) == 0); + } + return 0; +} + +static int +check_netfe_arg(const struct netfe_stream_prm *sp) +{ + char buf[INET6_ADDRSTRLEN]; + + if (sp->sprm.prm.local_addr.ss_family != + sp->sprm.prm.remote_addr.ss_family) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "laddr and raddr for different protocols\n", + sp->line); + return -EINVAL; + } + + if (sp->op == TXONLY) { + if (sp->txlen > RTE_MBUF_DEFAULT_DATAROOM || sp->txlen == 0) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: txlen=%u " + "exceeds allowed values: (0, %u]\n", + sp->line, sp->txlen, RTE_MBUF_DEFAULT_DATAROOM); + return -EINVAL; + } else if (is_addr_wc(&sp->sprm.prm.remote_addr)) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "raddr=%s are not allowed for op=%s;\n", + sp->line, + format_addr(&sp->sprm.prm.remote_addr, + buf, sizeof(buf)), + format_feop(sp->op)); + return -EINVAL; + } + } else if (sp->op == FWD) { + if (sp->fprm.prm.local_addr.ss_family != + sp->fprm.prm.remote_addr.ss_family) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "fwladdr and fwraddr for different protocols\n", + sp->line); + return -EINVAL; + } else if (is_addr_wc(&sp->fprm.prm.remote_addr)) { + RTE_LOG(ERR, USER1, "invalid arg at line %u: " + "fwaddr=%s are not allowed for op=%s;\n", + sp->line, + format_addr(&sp->fprm.prm.remote_addr, + buf, sizeof(buf)), + format_feop(sp->op)); + return -EINVAL; + } + } + + return 0; +} + +int +netfe_parse_cfg(const char *fname, struct netfe_lcore_prm *lp) +{ + uint32_t i, ln, n, num; + int32_t rc; + size_t sz; + char *s; + FILE *f; + struct netfe_stream_prm *sp; + char line[LINE_MAX]; + + f = fopen(fname, "r"); + if (f == NULL) { + RTE_LOG(ERR, USER1, "%s failed to open file \"%s\"\n", + __func__, fname); + return -EINVAL; + } + + n = 0; + num = 0; + sp = NULL; + + for (ln = 0; fgets(line, sizeof(line), f) != NULL; ln++) { + + /* skip spaces at the start. */ + for (s = line; isspace(s[0]); s++) + ; + + /* skip comment line. */ + if (s[0] == '#' || s[0] == 0) + continue; + + /* skip spaces at the end. */ + for (i = strlen(s); i-- != 0 && isspace(s[i]); s[i] = 0) + ; + + if (n == lp->max_streams) { + RTE_LOG(ERR, USER1, + "%s(%s) number of entries exceed max streams " + "value: %u\n", + __func__, fname, n); + rc = -EINVAL; + break; + } + + if (n == num) { + num += DEF_LINE_NUM; + sz = sizeof(sp[0]) * num; + sp = realloc(sp, sizeof(sp[0]) * num); + if (sp == NULL) { + RTE_LOG(ERR, USER1, + "%s(%s) allocation of %zu bytes " + "failed\n", + __func__, fname, sz); + rc = -ENOMEM; + break; + } + } + + sp[n].line = ln + 1; + if ((rc = parse_netfe_arg(sp + n, s)) != 0 || + (rc = check_netfe_arg(sp + n)) != 0) { + RTE_LOG(ERR, USER1, "%s(%s) failed to parse line %u\n", + __func__, fname, sp[n].line); + break; + } + n++; + } + + fclose(f); + + if (rc != 0) { + free(sp); + sp = NULL; + n = 0; + } + + lp->stream = sp; + lp->nb_streams = n; + return rc; +} diff --git a/examples/udpfwd/parse.h b/examples/udpfwd/parse.h new file mode 100644 index 0000000..911c874 --- /dev/null +++ b/examples/udpfwd/parse.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PARSE_H__ +#define __PARSE_H__ + +union parse_val { + uint64_t u64; + struct { + uint16_t family; + union { + struct in_addr addr4; + struct in6_addr addr6; + }; + } in; + struct ether_addr mac; +}; + +static int +parse_uint_val(__rte_unused const char *key, const char *val, void *prm) +{ + union parse_val *rv; + unsigned long v; + char *end; + + rv = prm; + errno = 0; + v = strtoul(val, &end, 0); + if (errno != 0 || end[0] != 0 || v > UINT32_MAX) + return -EINVAL; + + rv->u64 = v; + return 0; +} + +static const char * +format_addr(const struct sockaddr_storage *sp, char buf[], size_t len) +{ + const struct sockaddr_in *i4; + const struct sockaddr_in6 *i6; + const void *addr; + + if (sp->ss_family == AF_INET) { + i4 = (const struct sockaddr_in *)sp; + addr = &i4->sin_addr; + } else if (sp->ss_family == AF_INET6) { + i6 = (const struct sockaddr_in6 *)sp; + addr = &i6->sin6_addr; + } else + return NULL; + + + return inet_ntop(sp->ss_family, addr, buf, len); +} + +int parse_netbe_arg(struct netbe_port *prt, const char *arg); + +int netbe_parse_dest(const char *fname, struct netbe_dest_prm *prm); + +int netfe_parse_cfg(const char *fname, struct netfe_lcore_prm *lp); + +#endif /* __PARSE_H__ */ + diff --git a/examples/udpfwd/pkt.c b/examples/udpfwd/pkt.c new file mode 100644 index 0000000..b0d4452 --- /dev/null +++ b/examples/udpfwd/pkt.c @@ -0,0 +1,579 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "netbe.h" +#include <netinet/ip6.h> + +static inline void +fill_pkt_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t l3, uint32_t l4) +{ + m->l2_len = l2; + m->l3_len = l3; + m->l4_len = l4; +} + +static inline int +is_ipv4_frag(const struct ipv4_hdr *iph) +{ + const uint16_t mask = rte_cpu_to_be_16(~IPV4_HDR_DF_FLAG); + + return ((mask & iph->fragment_offset) != 0); +} + +static inline void +fill_ipv4_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t proto, + uint32_t frag) +{ + const struct ipv4_hdr *iph; + int32_t dlen, len; + + dlen = rte_pktmbuf_data_len(m); + dlen -= l2 + sizeof(struct udp_hdr); + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, l2); + len = (iph->version_ihl & IPV4_HDR_IHL_MASK) * IPV4_IHL_MULTIPLIER; + + if (frag != 0 && is_ipv4_frag(iph)) { + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_FRAG; + } + + if (len > dlen || (proto <= IPPROTO_MAX && iph->next_proto_id != proto)) + m->packet_type = RTE_PTYPE_UNKNOWN; + else + fill_pkt_hdr_len(m, l2, len, sizeof(struct udp_hdr)); +} + +static inline int +ipv6x_hdr(uint32_t proto) +{ + return (proto == IPPROTO_HOPOPTS || + proto == IPPROTO_ROUTING || + proto == IPPROTO_FRAGMENT || + proto == IPPROTO_AH || + proto == IPPROTO_NONE || + proto == IPPROTO_DSTOPTS); +} + +static inline void +fill_ipv6x_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t nproto, + uint32_t fproto) +{ + const struct ip6_ext *ipx; + int32_t dlen, len, ofs; + + len = sizeof(struct ipv6_hdr); + + dlen = rte_pktmbuf_data_len(m); + dlen -= l2 + sizeof(struct udp_hdr); + + ofs = l2 + len; + ipx = rte_pktmbuf_mtod_offset(m, const struct ip6_ext *, ofs); + + while (ofs > 0 && len < dlen) { + + switch (nproto) { + case IPPROTO_HOPOPTS: + case IPPROTO_ROUTING: + case IPPROTO_DSTOPTS: + ofs = (ipx->ip6e_len + 1) << 3; + break; + case IPPROTO_AH: + ofs = (ipx->ip6e_len + 2) << 2; + break; + case IPPROTO_FRAGMENT: + /* + * tso_segsz is not used by RX, so suse it as temporary + * buffer to store the fragment offset. + */ + m->tso_segsz = ofs; + ofs = sizeof(struct ip6_frag); + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_FRAG; + break; + default: + ofs = 0; + } + + if (ofs > 0) { + nproto = ipx->ip6e_nxt; + len += ofs; + ipx += ofs / sizeof(*ipx); + } + } + + /* undercognised or invalid packet. */ + if ((ofs == 0 && nproto != fproto) || len > dlen) + m->packet_type = RTE_PTYPE_UNKNOWN; + else + fill_pkt_hdr_len(m, l2, len, sizeof(struct udp_hdr)); +} + +static inline void +fill_ipv6_hdr_len(struct rte_mbuf *m, uint32_t l2, uint32_t fproto) +{ + const struct ipv6_hdr *iph; + + iph = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, + sizeof(struct ether_hdr)); + + if (iph->proto == fproto) + fill_pkt_hdr_len(m, l2, sizeof(struct ipv6_hdr), + sizeof(struct udp_hdr)); + else if (ipv6x_hdr(iph->proto) != 0) + fill_ipv6x_hdr_len(m, l2, iph->proto, fproto); +} + +static inline void +fill_eth_hdr_len(struct rte_mbuf *m) +{ + uint32_t dlen, l2; + uint16_t etp; + const struct ether_hdr *eth; + + dlen = rte_pktmbuf_data_len(m); + + /* check that first segment is at least 42B long. */ + if (dlen < sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + + sizeof(struct udp_hdr)) { + m->packet_type = RTE_PTYPE_UNKNOWN; + return; + } + + l2 = sizeof(*eth); + + eth = rte_pktmbuf_mtod(m, const struct ether_hdr *); + etp = eth->ether_type; + if (etp == rte_be_to_cpu_16(ETHER_TYPE_VLAN)) + l2 += sizeof(struct vlan_hdr); + + if (etp == rte_be_to_cpu_16(ETHER_TYPE_IPv4)) { + m->packet_type = RTE_PTYPE_L4_UDP | + RTE_PTYPE_L3_IPV4_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + fill_ipv4_hdr_len(m, l2, IPPROTO_UDP, 1); + } else if (etp == rte_be_to_cpu_16(ETHER_TYPE_IPv6) && + dlen >= l2 + sizeof(struct ipv6_hdr) + + sizeof(struct udp_hdr)) { + m->packet_type = RTE_PTYPE_L4_UDP | + RTE_PTYPE_L3_IPV6_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER; + fill_ipv6_hdr_len(m, l2, IPPROTO_UDP); + } else + m->packet_type = RTE_PTYPE_UNKNOWN; +} + +static inline void +fix_reassembled(struct rte_mbuf *m) +{ + /* update packet type. */ + m->packet_type &= ~RTE_PTYPE_L4_MASK; + m->packet_type |= RTE_PTYPE_L4_UDP; + + /* fix reassemble setting TX flags. */ + m->ol_flags &= ~PKT_TX_IP_CKSUM; + + /* fix l3_len after reassemble. */ + if (RTE_ETH_IS_IPV6_HDR(m->packet_type)) + m->l3_len = m->l3_len - sizeof(struct ipv6_extension_fragment); +} + +static struct rte_mbuf * +reassemble(struct rte_mbuf *m, struct rte_ip_frag_tbl *tbl, + struct rte_ip_frag_death_row *dr, uint64_t tms) +{ + if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) { + + struct ipv4_hdr *iph; + + iph = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len); + + /* process this fragment. */ + m = rte_ipv4_frag_reassemble_packet(tbl, dr, m, tms, iph); + + } else if (RTE_ETH_IS_IPV6_HDR(m->packet_type)) { + + struct ipv6_hdr *iph; + struct ipv6_extension_fragment *fhdr; + + iph = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *, m->l2_len); + + /* + * we store fragment header offset in tso_segsz before + * temporary, just to avoid another scan of ipv6 header. + */ + fhdr = rte_pktmbuf_mtod_offset(m, + struct ipv6_extension_fragment *, m->tso_segsz); + m->tso_segsz = 0; + + /* process this fragment. */ + m = rte_ipv6_frag_reassemble_packet(tbl, dr, m, tms, iph, fhdr); + + } else { + rte_pktmbuf_free(m); + m = NULL; + } + + /* got reassembled packet. */ + if (m != NULL) + fix_reassembled(m); + + return m; +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + } + } + + return nb_pkt; +} + +/* + * HW can recognise L2/L3 with/without extentions/L4 (ixgbe/igb/fm10k) + */ +static uint16_t +type0_rx_callback(__rte_unused uint8_t port, __rte_unused uint16_t queue, + struct rte_mbuf *pkt[], uint16_t nb_pkts, + __rte_unused uint16_t max_pkts, void *user_param) +{ + uint32_t j, tp, x; + uint64_t cts; + struct netbe_lcore *lc; + + lc = user_param; + cts = 0; + + x = 0; + for (j = 0; j != nb_pkts; j++) { + + NETBE_PKT_DUMP(pkt[j]); + + tp = pkt[j]->packet_type & (RTE_PTYPE_L4_MASK | + RTE_PTYPE_L3_MASK | RTE_PTYPE_L2_MASK); + + switch (tp) { + /* non fragmented udp packets. */ + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV4 | + RTE_PTYPE_L2_ETHER): + fill_pkt_hdr_len(pkt[j], sizeof(struct ether_hdr), + sizeof(struct ipv4_hdr), + sizeof(struct udp_hdr)); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV6 | + RTE_PTYPE_L2_ETHER): + fill_pkt_hdr_len(pkt[j], sizeof(struct ether_hdr), + sizeof(struct ipv6_hdr), + sizeof(struct udp_hdr)); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV4_EXT | + RTE_PTYPE_L2_ETHER): + fill_ipv4_hdr_len(pkt[j], sizeof(struct ether_hdr), + UINT32_MAX, 0); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV6_EXT | + RTE_PTYPE_L2_ETHER): + fill_ipv6_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP); + break; + /* possibly fragmented udp packets. */ + case (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV4_EXT | RTE_PTYPE_L2_ETHER): + fill_ipv4_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP, 1); + break; + case (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L2_ETHER): + case (RTE_PTYPE_L3_IPV6_EXT | RTE_PTYPE_L2_ETHER): + fill_ipv6_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP); + break; + default: + /* treat packet types as invalid. */ + pkt[j]->packet_type = RTE_PTYPE_UNKNOWN; + break; + } + + /* + * if it is a fragment, try to reassemble it, + * if by some reason it can't be done, then + * set pkt[] entry to NULL. + */ + if ((pkt[j]->packet_type & RTE_PTYPE_L4_MASK) == + RTE_PTYPE_L4_FRAG) { + cts = (cts == 0) ? rte_rdtsc() : cts; + pkt[j] = reassemble(pkt[j], lc->ftbl, &lc->death_row, + cts); + x += (pkt[j] == NULL); + } + } + + /* reassemble was invoked, cleanup its death-row. */ + if (cts != 0) + rte_ip_frag_free_death_row(&lc->death_row, 0); + + if (x == 0) + return nb_pkts; + + NETBE_TRACE("%s(port=%u, queue=%u, nb_pkts=%u): " + "%u non-reassembled fragments;\n", + __func__, port, queue, nb_pkts, x); + + return compress_pkt_list(pkt, nb_pkts, x); +} + +/* + * HW can recognise L2/L3/L4 and fragments (i40e). + */ +static uint16_t +type1_rx_callback(__rte_unused uint8_t port, __rte_unused uint16_t queue, + struct rte_mbuf *pkt[], uint16_t nb_pkts, + __rte_unused uint16_t max_pkts, void *user_param) +{ + uint32_t j, tp, x; + uint64_t cts; + struct netbe_lcore *lc; + + lc = user_param; + cts = 0; + + x = 0; + for (j = 0; j != nb_pkts; j++) { + + NETBE_PKT_DUMP(pkt[j]); + + tp = pkt[j]->packet_type & (RTE_PTYPE_L4_MASK | + RTE_PTYPE_L3_MASK | RTE_PTYPE_L2_MASK); + + switch (tp) { + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV4_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER): + fill_ipv4_hdr_len(pkt[j], sizeof(struct ether_hdr), + UINT32_MAX, 0); + break; + case (RTE_PTYPE_L4_UDP | RTE_PTYPE_L3_IPV6_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER): + fill_ipv6_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP); + break; + case (RTE_PTYPE_L4_FRAG | RTE_PTYPE_L3_IPV4_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER): + fill_ipv4_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP, 0); + break; + case (RTE_PTYPE_L4_FRAG | RTE_PTYPE_L3_IPV6_EXT_UNKNOWN | + RTE_PTYPE_L2_ETHER): + fill_ipv6_hdr_len(pkt[j], sizeof(struct ether_hdr), + IPPROTO_UDP); + break; + default: + /* treat packet types as invalid. */ + pkt[j]->packet_type = RTE_PTYPE_UNKNOWN; + break; + } + + /* + * if it is a fragment, try to reassemble it, + * if by some reason it can't be done, then + * set pkt[] entry to NULL. + */ + if ((pkt[j]->packet_type & RTE_PTYPE_L4_MASK) == + RTE_PTYPE_L4_FRAG) { + cts = (cts == 0) ? rte_rdtsc() : cts; + pkt[j] = reassemble(pkt[j], lc->ftbl, &lc->death_row, + cts); + x += (pkt[j] == NULL); + } + } + + /* reassemble was invoked, cleanup its death-row. */ + if (cts != 0) + rte_ip_frag_free_death_row(&lc->death_row, 0); + + if (x == 0) + return nb_pkts; + + NETBE_TRACE("%s(port=%u, queue=%u, nb_pkts=%u): " + "%u non-reassembled fragments;\n", + __func__, port, queue, nb_pkts, x); + + return compress_pkt_list(pkt, nb_pkts, x); +} + +/* + * generic, assumes HW doesn't recognise any packet type. + */ +static uint16_t +typen_rx_callback(__rte_unused uint8_t port, __rte_unused uint16_t queue, + struct rte_mbuf *pkt[], uint16_t nb_pkts, + __rte_unused uint16_t max_pkts, void *user_param) +{ + uint32_t j, x; + uint64_t cts; + struct netbe_lcore *lc; + + lc = user_param; + cts = 0; + + x = 0; + for (j = 0; j != nb_pkts; j++) { + + NETBE_PKT_DUMP(pkt[j]); + fill_eth_hdr_len(pkt[j]); + + /* + * if it is a fragment, try to reassemble it, + * if by some reason it can't be done, then + * set pkt[] entry to NULL. + */ + if ((pkt[j]->packet_type & RTE_PTYPE_L4_MASK) == + RTE_PTYPE_L4_FRAG) { + cts = (cts == 0) ? rte_rdtsc() : cts; + pkt[j] = reassemble(pkt[j], lc->ftbl, &lc->death_row, + cts); + x += (pkt[j] == NULL); + } + } + + /* reassemble was invoked, cleanup its death-row. */ + if (cts != 0) + rte_ip_frag_free_death_row(&lc->death_row, 0); + + if (x == 0) + return nb_pkts; + + NETBE_TRACE("%s(port=%u, queue=%u, nb_pkts=%u): " + "%u non-reassembled fragments;\n", + __func__, port, queue, nb_pkts, x); + + return compress_pkt_list(pkt, nb_pkts, x); +} + +int +setup_rx_cb(const struct netbe_port *uprt, struct netbe_lcore *lc) +{ + int32_t i, rc; + uint32_t smask; + void *cb; + + const uint32_t pmask = RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK | + RTE_PTYPE_L4_MASK; + + enum { + ETHER_PTYPE = 0x1, + IPV4_PTYPE = 0x2, + IPV4_EXT_PTYPE = 0x4, + IPV6_PTYPE = 0x8, + IPV6_EXT_PTYPE = 0x10, + UDP_PTYPE = 0x20, + }; + + static const struct { + uint32_t mask; + const char *name; + rte_rx_callback_fn fn; + } ptype2cb[] = { + { + .mask = ETHER_PTYPE | IPV4_PTYPE | IPV4_EXT_PTYPE | + IPV6_PTYPE | IPV6_EXT_PTYPE | UDP_PTYPE, + .name = "HW l2/l3x/l4 ptype", + .fn = type0_rx_callback, + }, + { + .mask = ETHER_PTYPE | IPV4_PTYPE | IPV6_PTYPE | + UDP_PTYPE, + .name = "HW l2/l3/l4 ptype", + .fn = type1_rx_callback, + }, + { + .mask = 0, + .name = "no HW ptype", + .fn = typen_rx_callback, + }, + }; + + rc = rte_eth_dev_get_supported_ptypes(uprt->id, pmask, NULL, 0); + if (rc < 0) { + RTE_LOG(ERR, USER1, + "%s(port=%u) failed to get supported ptypes;\n", + __func__, uprt->id); + return rc; + } + + uint32_t ptype[rc]; + rc = rte_eth_dev_get_supported_ptypes(uprt->id, pmask, ptype, rc); + + smask = 0; + for (i = 0; i != rc; i++) { + switch (ptype[i]) { + case RTE_PTYPE_L2_ETHER: + smask |= ETHER_PTYPE; + break; + case RTE_PTYPE_L3_IPV4: + case RTE_PTYPE_L3_IPV4_EXT_UNKNOWN: + smask |= IPV4_PTYPE; + break; + case RTE_PTYPE_L3_IPV4_EXT: + smask |= IPV4_EXT_PTYPE; + break; + case RTE_PTYPE_L3_IPV6: + case RTE_PTYPE_L3_IPV6_EXT_UNKNOWN: + smask |= IPV6_PTYPE; + break; + case RTE_PTYPE_L3_IPV6_EXT: + smask |= IPV6_EXT_PTYPE; + break; + case RTE_PTYPE_L4_UDP: + smask |= UDP_PTYPE; + break; + } + } + + for (i = 0; i != RTE_DIM(ptype2cb); i++) { + if ((smask & ptype2cb[i].mask) == ptype2cb[i].mask) { + cb = rte_eth_add_rx_callback(uprt->id, 0, + ptype2cb[i].fn, lc); + rc = -rte_errno; + RTE_LOG(ERR, USER1, + "%s(port=%u), setup RX callback \"%s\" " + "returns %p;\n", + __func__, uprt->id, ptype2cb[i].name, cb); + return ((cb == NULL) ? rc : 0); + } + } + + /* no proper callback found. */ + RTE_LOG(ERR, USER1, + "%s(port=%u) failed to find an appropriate callback;\n", + __func__, uprt->id); + return -ENOENT; +} diff --git a/lib/libtle_udp/Makefile b/lib/libtle_udp/Makefile new file mode 100644 index 0000000..100755c --- /dev/null +++ b/lib/libtle_udp/Makefile @@ -0,0 +1,50 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overwritten by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = libtle_udp.a + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) + +EXPORT_MAP := tle_udp_version.map + +LIBABIVER := 1 + +#source files +SRCS-y += buf_cage.c +SRCS-y += event.c +SRCS-y += udp_ctl.c +SRCS-y += udp_rxtx.c + +# install this header file +SYMLINK-y-include += tle_udp_impl.h +SYMLINK-y-include += tle_event.h + +# this library depends on +DEPDIRS-y += $(RTE_SDK)/lib/librte_eal +DEPDIRS-y += $(RTE_SDK)/lib/librte_ether +DEPDIRS-y += $(RTE_SDK)/lib/librte_mbuf +DEPDIRS-y += $(RTE_SDK)lib/librte_net +DEPDIRS-y += $(RTE_SDK)lib/librte_ip_frag + +include $(RTE_SDK)/mk/rte.extlib.mk diff --git a/lib/libtle_udp/buf_cage.c b/lib/libtle_udp/buf_cage.c new file mode 100644 index 0000000..0ae21b0 --- /dev/null +++ b/lib/libtle_udp/buf_cage.c @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <rte_errno.h> +#include <rte_malloc.h> +#include <rte_log.h> + +#include "buf_cage.h" +#include "osdep.h" + +struct bcg_store * +bcg_create(const struct bcg_store_prm *prm) +{ + struct buf_cage *bc; + struct bcg_store *st; + uintptr_t end, p; + size_t sz, tsz; + uint32_t n; + + if (prm == NULL || (prm->cage_align != 0 && + rte_is_power_of_2(prm->cage_align) == 0)) { + rte_errno = EINVAL; + return NULL; + } + + /* number of cages required. */ + n = (prm->max_bufs + prm->cage_bufs - 1) / prm->cage_bufs; + n = RTE_MAX(n, prm->min_cages); + + /* size of each cage. */ + sz = prm->cage_bufs * sizeof(bc->bufs[0]) + sizeof(*bc); + sz = RTE_ALIGN_CEIL(sz, prm->cage_align); + + /* total number of bytes required. */ + tsz = n * sz + RTE_ALIGN_CEIL(sizeof(*st), prm->cage_align); + + st = rte_zmalloc_socket(NULL, tsz, RTE_CACHE_LINE_SIZE, prm->socket_id); + if (st == NULL) { + UDP_LOG(ERR, "%s: allocation of %zu bytes on " + "socket %d failed\n", + __func__, tsz, prm->socket_id); + return NULL; + } + + st->prm = prm[0]; + bcg_queue_reset(&st->free); + + p = (uintptr_t)RTE_PTR_ALIGN_CEIL((st + 1), prm->cage_align); + end = p + n * sz; + + for (; p != end; p += sz) { + bc = (struct buf_cage *)p; + bc->st = st; + bc->num = prm->cage_bufs; + STAILQ_INSERT_TAIL(&st->free.queue, bc, ql); + } + + st->free.num = n; + st->nb_cages = n; + st->cage_sz = sz; + st->total_sz = tsz; + return st; +} + +void +bcg_destroy(struct bcg_store *st) +{ + rte_free(st); +} diff --git a/lib/libtle_udp/buf_cage.h b/lib/libtle_udp/buf_cage.h new file mode 100644 index 0000000..3b3c429 --- /dev/null +++ b/lib/libtle_udp/buf_cage.h @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _BUF_CAGE_H_ +#define _BUF_CAGE_H_ + +#include <rte_common.h> +#include <rte_atomic.h> +#include <rte_spinlock.h> +#include <sys/queue.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct bcg_store; + +struct buf_cage { + struct bcg_store *st; + STAILQ_ENTRY(buf_cage) ql; + uint32_t num; + uint32_t rp; + uint32_t wp; + const void *bufs[0]; +}; + +struct bcg_queue { + rte_spinlock_t lock; + uint32_t num; + STAILQ_HEAD(, buf_cage) queue; +}; + +struct bcg_store_prm { + void *user_data; + int32_t socket_id; /* NUMA socket to allocate memory from. */ + uint32_t max_bufs; /* total number of bufs to cage. */ + uint32_t min_cages; /* min number of cages per store. */ + uint32_t cage_bufs; /* min number of bufs per cage. */ + uint32_t cage_align; /* each cage to be aligned (power of 2). */ +}; + +struct bcg_store { + struct bcg_queue free; + uint32_t nb_cages; + size_t cage_sz; + size_t total_sz; + struct bcg_store_prm prm; +} __rte_cache_aligned; + +struct bcg_store *bcg_create(const struct bcg_store_prm *prm); +void bcg_destroy(struct bcg_store *st); + +static inline int +bcg_store_full(const struct bcg_store *st) +{ + return st->nb_cages == st->free.num; +} + +static inline void +bcg_queue_reset(struct bcg_queue *bq) +{ + STAILQ_INIT(&bq->queue); + bq->num = 0; + rte_spinlock_init(&bq->lock); +} + +static inline void +bcg_reset(struct buf_cage *bc) +{ + bc->rp = 0; + bc->wp = 0; +} + +static inline void * +bcg_get_udata(struct buf_cage *bc) +{ + return bc->st->prm.user_data; +} + +static inline struct buf_cage * +__bcg_dequeue_head(struct bcg_queue *bq) +{ + struct buf_cage *bc; + + bc = STAILQ_FIRST(&bq->queue); + if (bc != NULL) { + STAILQ_REMOVE_HEAD(&bq->queue, ql); + bq->num--; + } + return bc; +} + +static inline struct buf_cage * +bcg_dequeue_head(struct bcg_queue *bq) +{ + struct buf_cage *bc; + + if (bq->num == 0) + return NULL; + + rte_compiler_barrier(); + + rte_spinlock_lock(&bq->lock); + bc = __bcg_dequeue_head(bq); + rte_spinlock_unlock(&bq->lock); + return bc; +} + +static inline uint32_t +__bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) +{ + STAILQ_INSERT_HEAD(&bq->queue, bc, ql); + return ++bq->num; +} + +static inline uint32_t +bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc) +{ + uint32_t n; + + rte_spinlock_lock(&bq->lock); + n = __bcg_enqueue_head(bq, bc); + rte_spinlock_unlock(&bq->lock); + return n; +} + +static inline uint32_t +__bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) +{ + STAILQ_INSERT_TAIL(&bq->queue, bc, ql); + return ++bq->num; +} + +static inline uint32_t +bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc) +{ + uint32_t n; + + rte_spinlock_lock(&bq->lock); + n = __bcg_enqueue_tail(bq, bc); + rte_spinlock_unlock(&bq->lock); + return n; +} + +static inline uint32_t +bcg_queue_append(struct bcg_queue *dst, struct bcg_queue *src) +{ + rte_spinlock_lock(&src->lock); + STAILQ_CONCAT(&dst->queue, &src->queue); + dst->num += src->num; + src->num = 0; + rte_spinlock_unlock(&src->lock); + return dst->num; +} + +static inline uint32_t +bcg_free_count(const struct buf_cage *bc) +{ + return bc->num - bc->wp; +} + + +static inline uint32_t +bcg_fill_count(const struct buf_cage *bc) +{ + return bc->wp - bc->rp; +} + +/* !!! if going to keep it - try to unroll copying stuff. !!! */ +static inline uint32_t +bcg_get(struct buf_cage *bc, const void *bufs[], uint32_t num) +{ + uint32_t i, n, r; + + r = bc->rp; + n = RTE_MIN(num, bc->wp - r); + for (i = 0; i != n; i++) + bufs[i] = bc->bufs[r + i]; + + bc->rp = r + n; + return n; +} + +static inline uint32_t +bcg_put(struct buf_cage *bc, const void *bufs[], uint32_t num) +{ + uint32_t i, n, w; + + w = bc->wp; + n = RTE_MIN(num, bc->num - w); + for (i = 0; i != n; i++) + bc->bufs[w + i] = bufs[i]; + + bc->wp = w + n; + return n; +} + + +static inline struct buf_cage * +bcg_alloc(struct bcg_store *st) +{ + return bcg_dequeue_head(&st->free); +} + +static inline uint32_t +bcg_free(struct buf_cage *bc) +{ + struct bcg_store *st; + + st = bc->st; + bcg_reset(bc); + return bcg_enqueue_head(&st->free, bc); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _BUF_CAGE_H_ */ diff --git a/lib/libtle_udp/event.c b/lib/libtle_udp/event.c new file mode 100644 index 0000000..7e340e8 --- /dev/null +++ b/lib/libtle_udp/event.c @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <rte_errno.h> +#include <rte_malloc.h> +#include <rte_log.h> +#include <tle_event.h> + +#include "osdep.h" + +struct tle_evq * +tle_evq_create(const struct tle_evq_param *prm) +{ + struct tle_evq *evq; + size_t sz; + uint32_t i; + + if (prm == NULL) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*evq) + sizeof(evq->events[0]) * prm->max_events; + evq = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + prm->socket_id); + if (evq == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for " + "new tle_evq(%u) on socket %d failed\n", + sz, prm->max_events, prm->socket_id); + return NULL; + } + + TAILQ_INIT(&evq->armed); + TAILQ_INIT(&evq->free); + + for (i = 0; i != prm->max_events; i++) { + evq->events[i].head = evq; + TAILQ_INSERT_TAIL(&evq->free, evq->events + i, ql); + } + + evq->nb_events = i; + evq->nb_free = i; + + return evq; +} + +void +tle_evq_destroy(struct tle_evq *evq) +{ + rte_free(evq); +} + +struct tle_event * +tle_event_alloc(struct tle_evq *evq, const void *data) +{ + struct tle_event *h; + + if (evq == NULL) { + rte_errno = EINVAL; + return NULL; + } + + rte_spinlock_lock(&evq->lock); + h = TAILQ_FIRST(&evq->free); + if (h != NULL) { + TAILQ_REMOVE(&evq->free, h, ql); + evq->nb_free--; + h->data = data; + } else + rte_errno = -ENOMEM; + rte_spinlock_unlock(&evq->lock); + return h; +} + +void +tle_event_free(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev == NULL) { + rte_errno = EINVAL; + return; + } + + q = ev->head; + rte_spinlock_lock(&q->lock); + ev->data = NULL; + ev->state = TLE_SEV_IDLE; + TAILQ_INSERT_HEAD(&q->free, ev, ql); + q->nb_free++; + rte_spinlock_unlock(&q->lock); +} diff --git a/lib/libtle_udp/misc.h b/lib/libtle_udp/misc.h new file mode 100644 index 0000000..3874647 --- /dev/null +++ b/lib/libtle_udp/misc.h @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MISC_H_ +#define _MISC_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +static inline int +ymm_mask_cmp(const _ymm_t *da, const _ymm_t *sa, const _ymm_t *sm) +{ + uint64_t ret; + + ret = ((sa->u64[0] & sm->u64[0]) ^ da->u64[0]) | + ((sa->u64[1] & sm->u64[1]) ^ da->u64[1]) | + ((sa->u64[2] & sm->u64[2]) ^ da->u64[2]) | + ((sa->u64[3] & sm->u64[3]) ^ da->u64[3]); + + return (ret == 0); +} + +/* + * Setup tx_offload field inside mbuf using raw 64-bit field. + * Consider to move it into DPDK librte_mbuf. + */ +static inline uint64_t +_mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso, + uint64_t ol3, uint64_t ol2) +{ + return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49; +} + +/* + * Routines to calculate L3/L4 checksums in SW. + * Pretty similar to ones from DPDK librte_net/rte_ip.h, + * but provide better performance (at least for tested configurations), + * and extended functionality. + * Consider to move them into DPDK librte_net/rte_ip.h. + */ + +/* make compiler to generate: add %r1, %r2; adc $0, %r1. */ +#define CKSUM_ADD_CARRY(s, v) do { \ + (s) += (v); \ + (s) = ((s) < (v)) ? (s) + 1 : (s); \ +} while (0) + +/** + * Process the non-complemented checksum of a buffer. + * Similar to rte_raw_cksum(), but provide better perfomance + * (at least on IA platforms). + * @param buf + * Pointer to the buffer. + * @param len + * Length of the buffer. + * @return + * The non-complemented checksum. + */ +static inline uint16_t +__raw_cksum(const uint8_t *buf, uint32_t size) +{ + uint64_t s, sum; + uint32_t i, n; + uint32_t dw1, dw2; + uint16_t w1, w2; + const uint64_t *b; + + b = (const uint64_t *)buf; + n = size / sizeof(*b); + sum = 0; + + /* main loop, consume 8 bytes per iteration. */ + for (i = 0; i != n; i++) { + s = b[i]; + CKSUM_ADD_CARRY(sum, s); + } + + /* consume the remainder. */ + n = size % sizeof(*b); + if (n != 0) { + /* position of the of last 8 bytes of data. */ + b = (const uint64_t *)((uintptr_t)(b + i) + n - sizeof(*b)); + /* calculate shift amount. */ + n = (sizeof(*b) - n) * CHAR_BIT; + s = b[0] >> n; + CKSUM_ADD_CARRY(sum, s); + } + + /* reduce to 16 bits */ + dw1 = sum; + dw2 = sum >> 32; + CKSUM_ADD_CARRY(dw1, dw2); + w1 = dw1; + w2 = dw1 >> 16; + CKSUM_ADD_CARRY(w1, w2); + return w1; +} + + +/** + * Process UDP or TCP checksum over possibly multi-segmented packet. + * @param mb + * The pointer to the mbuf with the packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param cksum + * Already pre-calculated pseudo-header checksum value. + * @return + * The complemented checksum. + */ +static inline uint32_t +__udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + uint32_t cksum) +{ + uint32_t dlen, i, plen; + const struct rte_mbuf *ms; + const void *data; + + plen = rte_pktmbuf_pkt_len(mb); + ms = mb; + + for (i = l4_ofs; i < plen && ms != NULL; i += dlen) { + data = rte_pktmbuf_mtod_offset(ms, const void *, l4_ofs); + dlen = rte_pktmbuf_data_len(ms) - l4_ofs; + cksum += __raw_cksum(data, dlen); + ms = ms->next; + l4_ofs = 0; + } + + cksum = ((cksum & 0xffff0000) >> 16) + (cksum & 0xffff); + cksum = (~cksum) & 0xffff; + if (cksum == 0) + cksum = 0xffff; + + return cksum; +} + +/** + * Process the pseudo-header checksum of an IPv4 header. + * + * Depending on the ol_flags, the pseudo-header checksum expected by the + * drivers is not the same. For instance, when TSO is enabled, the IP + * payload length must not be included in the packet. + * + * When ol_flags is 0, it computes the standard pseudo-header checksum. + * + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @param ipv4_len + * Length of the IPv4 header. + * @param ol_flags + * The ol_flags of the associated mbuf. + * @return + * The non-complemented checksum to set in the L4 header. + */ +static inline uint16_t +_ipv4x_phdr_cksum(const struct ipv4_hdr *ipv4_hdr, size_t ipv4h_len, + uint64_t ol_flags) +{ + uint32_t s0, s1; + + s0 = ipv4_hdr->src_addr; + s1 = ipv4_hdr->dst_addr; + CKSUM_ADD_CARRY(s0, s1); + + if (ol_flags & PKT_TX_TCP_SEG) + s1 = 0; + else + s1 = rte_cpu_to_be_16( + (uint16_t)(rte_be_to_cpu_16(ipv4_hdr->total_length) - + ipv4h_len)); + + s1 += rte_cpu_to_be_16(ipv4_hdr->next_proto_id); + CKSUM_ADD_CARRY(s0, s1); + + return __rte_raw_cksum_reduce(s0); +} + +/** + * Process the IPv4 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv4 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv4_hdr + * The pointer to the contiguous IPv4 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv4_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv4_hdr *ipv4_hdr) +{ + uint32_t cksum; + + cksum = _ipv4x_phdr_cksum(ipv4_hdr, mb->l3_len, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +/** + * Process the IPv6 UDP or TCP checksum. + * + * @param mb + * The pointer to the IPv6 packet. + * @param l4_ofs + * Offset to the beginning of the L4 header (should be in first segment). + * @param ipv6_hdr + * The pointer to the contiguous IPv6 header. + * @return + * The complemented checksum to set in the IP packet. + */ +static inline int +_ipv6_udptcp_mbuf_cksum(const struct rte_mbuf *mb, uint16_t l4_ofs, + const struct ipv6_hdr *ipv6_hdr) +{ + uint32_t cksum; + + cksum = rte_ipv6_phdr_cksum(ipv6_hdr, 0); + cksum = __udptcp_mbuf_cksum(mb, l4_ofs, cksum); + + return cksum; +} + +static inline uint16_t +_ipv4x_cksum(const void *iph, size_t len) +{ + uint16_t cksum; + + cksum = __raw_cksum(iph, len); + return (cksum == 0xffff) ? cksum : ~cksum; +} + + +/* + * Analog of read-write locks, very much in favour of read side. + * Assumes, that there are no more then INT32_MAX concurrent readers. + * Consider to move into DPDK librte_eal. + */ + +static inline int +rwl_try_acquire(rte_atomic32_t *p) +{ + return rte_atomic32_add_return(p, 1); +} + +static inline void +rwl_release(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, 1); +} + +static inline int +rwl_acquire(rte_atomic32_t *p) +{ + int32_t rc; + + rc = rwl_try_acquire(p); + if (rc < 0) + rwl_release(p); + return rc; +} + +static inline void +rwl_down(rte_atomic32_t *p) +{ + while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0) + rte_pause(); +} + +static inline void +rwl_up(rte_atomic32_t *p) +{ + rte_atomic32_sub(p, INT32_MIN); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _MISC_H_ */ diff --git a/lib/libtle_udp/osdep.h b/lib/libtle_udp/osdep.h new file mode 100644 index 0000000..6161242 --- /dev/null +++ b/lib/libtle_udp/osdep.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _OSDEP_H_ +#define _OSDEP_H_ + +#include <rte_vect.h> +#include <rte_log.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define UDP_LOG(lvl, fmt, args...) RTE_LOG(lvl, USER1, fmt, ##args) + +/* + * if no AVX support, define _ymm_t here. + */ + +#ifdef __AVX__ + +#define _ymm_t rte_ymm_t + +#else + +#define YMM_SIZE (2 * sizeof(rte_xmm_t)) +#define YMM_MASK (YMM_SIZE - 1) + +typedef union _ymm { + xmm_t x[YMM_SIZE / sizeof(xmm_t)]; + uint8_t u8[YMM_SIZE / sizeof(uint8_t)]; + uint16_t u16[YMM_SIZE / sizeof(uint16_t)]; + uint32_t u32[YMM_SIZE / sizeof(uint32_t)]; + uint64_t u64[YMM_SIZE / sizeof(uint64_t)]; + double pd[YMM_SIZE / sizeof(double)]; +} _ymm_t; + +#endif /* __AVX__ */ + +#ifdef __cplusplus +} +#endif + +#endif /* _OSDEP_H_ */ diff --git a/lib/libtle_udp/port_bitmap.h b/lib/libtle_udp/port_bitmap.h new file mode 100644 index 0000000..6aff4e6 --- /dev/null +++ b/lib/libtle_udp/port_bitmap.h @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PORT_BITMAP_H_ +#define _PORT_BITMAP_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Simple implementation of bitmap for all possible UDP ports [0-UINT16_MAX]. + */ + +#define MAX_PORT_NUM (UINT16_MAX + 1) + +#define PORT_BLK(p) ((p) / (sizeof(uint32_t) * CHAR_BIT)) +#define PORT_IDX(p) ((p) % (sizeof(uint32_t) * CHAR_BIT)) + +#define MAX_PORT_BLK PORT_BLK(MAX_PORT_NUM) + +struct udp_pbm { + uint32_t nb_set; /* number of bits set. */ + uint32_t blk; /* last block with free entry. */ + uint32_t bm[MAX_PORT_BLK]; +}; + +static inline void +udp_pbm_init(struct udp_pbm *pbm, uint32_t blk) +{ + pbm->bm[0] = 1; + pbm->nb_set = 1; + pbm->blk = blk; +} + +static inline void +udp_pbm_set(struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v | b; + pbm->nb_set += (v & b) == 0; +} + +static inline void +udp_pbm_clear(struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, b, v; + + i = PORT_BLK(port); + b = 1 << PORT_IDX(port); + v = pbm->bm[i]; + pbm->bm[i] = v & ~b; + pbm->nb_set -= (v & b) != 0; +} + + +static inline uint32_t +udp_pbm_check(const struct udp_pbm *pbm, uint16_t port) +{ + uint32_t i, v; + + i = PORT_BLK(port); + v = pbm->bm[i] >> PORT_IDX(port); + return v & 1; +} + +static inline uint16_t +udp_pbm_find_range(struct udp_pbm *pbm, uint32_t start_blk, uint32_t end_blk) +{ + uint32_t i, v; + uint16_t p; + + if (pbm->nb_set == MAX_PORT_NUM) + return 0; + + p = 0; + for (i = start_blk; i != end_blk; i++) { + i %= RTE_DIM(pbm->bm); + v = pbm->bm[i]; + if (v != UINT32_MAX) { + for (p = i * (sizeof(pbm->bm[0]) * CHAR_BIT); + (v & 1) != 0; v >>= 1, p++) + ; + + pbm->blk = i; + break; + } + } + return p; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _PORT_BITMAP_H_ */ diff --git a/lib/libtle_udp/tle_event.h b/lib/libtle_udp/tle_event.h new file mode 100644 index 0000000..1a5c436 --- /dev/null +++ b/lib/libtle_udp/tle_event.h @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _SEV_IMPL_H_ +#define _SEV_IMPL_H_ + +#include <rte_common.h> +#include <rte_spinlock.h> +#include <rte_atomic.h> +#include <sys/queue.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct tle_evq; + +/** + * Possible states of the event. + */ +enum tle_ev_state { + TLE_SEV_IDLE, + TLE_SEV_DOWN, + TLE_SEV_UP, + TLE_SEV_NUM +}; + +struct tle_event { + TAILQ_ENTRY(tle_event) ql; + struct tle_evq *head; + const void *data; + enum tle_ev_state state; +} __rte_cache_aligned; + +struct tle_evq { + rte_spinlock_t lock; + uint32_t nb_events; + uint32_t nb_armed; + uint32_t nb_free; + TAILQ_HEAD(, tle_event) armed; + TAILQ_HEAD(, tle_event) free; + struct tle_event events[0]; +}; + +/** + * event queue creation parameters. + */ +struct tle_evq_param { + int32_t socket_id; /**< socket ID to allocate memory from. */ + uint32_t max_events; /**< max number of events in queue. */ +}; + +/** + * create event queue. + * @param prm + * Parameters used to create and initialise the queue. + * @return + * Pointer to new event queue structure, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_evq *tle_evq_create(const struct tle_evq_param *prm); + +/** + * Destroy given event queue. + * + * @param evq + * event queue to destroy + */ +void tle_evq_destroy(struct tle_evq *evq); + +/** + * allocate a new event within given event queue. + * @param evq + * event queue to allocate a new stream within. + * @param data + * User data to be associated with that event. + * @return + * Pointer to event structure that can be used in future tle_event API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - max limit of allocated events reached for that context + */ +struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data); + +/** + * free an allocated event. + * @param ev + * Pointer to the event to free. + */ +void tle_event_free(struct tle_event *ev); + + +/** + * move event from DOWN to UP state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_raise(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_DOWN) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_DOWN) { + ev->state = TLE_SEV_UP; + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event from UP to DOWN state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_down(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state != TLE_SEV_UP) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move from IDLE to DOWN/UP state. + * @param ev + * Pointer to the event. + * @param st + * new state for the event. + */ +static inline void +tle_event_active(struct tle_event *ev, enum tle_ev_state st) +{ + struct tle_evq *q; + + if (st == ev->state) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (st > ev->state) { + if (st == TLE_SEV_UP) { + TAILQ_INSERT_TAIL(&q->armed, ev, ql); + q->nb_armed++; + } + ev->state = st; + } + rte_spinlock_unlock(&q->lock); +} + +/** + * move event IDLE state. + * @param ev + * Pointer to the event. + */ +static inline void +tle_event_idle(struct tle_event *ev) +{ + struct tle_evq *q; + + if (ev->state == TLE_SEV_IDLE) + return; + + q = ev->head; + rte_compiler_barrier(); + + rte_spinlock_lock(&q->lock); + if (ev->state == TLE_SEV_UP) { + TAILQ_REMOVE(&q->armed, ev, ql); + q->nb_armed--; + } + ev->state = TLE_SEV_IDLE; + rte_spinlock_unlock(&q->lock); +} + + +/* + * return up to *num* user data pointers associated with + * the events that were in the UP state. + * Each retrieved event is automatically moved into the DOWN state. + * @param evq + * event queue to retrieve events from. + * @param evd + * An array of user data pointers associated with the events retrieved. + * It must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *evd* array. + * @return + * number of of entries filled inside *evd* array. + */ +static inline int32_t +tle_evq_get(struct tle_evq *evq, const void *evd[], uint32_t num) +{ + uint32_t i, n; + struct tle_event *ev; + + if (evq->nb_armed == 0) + return 0; + + rte_compiler_barrier(); + + rte_spinlock_lock(&evq->lock); + n = RTE_MIN(num, evq->nb_armed); + for (i = 0; i != n; i++) { + ev = TAILQ_FIRST(&evq->armed); + ev->state = TLE_SEV_DOWN; + TAILQ_REMOVE(&evq->armed, ev, ql); + evd[i] = ev->data; + } + evq->nb_armed -= n; + rte_spinlock_unlock(&evq->lock); + return n; +} + + +#ifdef __cplusplus +} +#endif + +#endif /* _SEV_IMPL_H_ */ diff --git a/lib/libtle_udp/tle_udp_impl.h b/lib/libtle_udp/tle_udp_impl.h new file mode 100644 index 0000000..a5d17e1 --- /dev/null +++ b/lib/libtle_udp/tle_udp_impl.h @@ -0,0 +1,373 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_UDP_IMPL_H_ +#define _TLE_UDP_IMPL_H_ + +#include <stdint.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <rte_common.h> +#include <rte_mbuf.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * <udp_ctx> - each such ctx represents an 'independent copy of the stack'. + * It owns set of <udp_stream>s and <udp_dev>s entities and provides + * (de)multiplexing input/output packets from/into UDP devices into/from + * UDP streams. + * <udp_dev> is an abstraction for the underlying device, that is able + * to RX/TX packets and may provide some HW offload capabilities. + * It is a user responsibility to add to the <udp_ctx> all <udp_dev>s, + * that context has to manage, before starting to do stream operations + * (open/send/recv,close) over that context. + * Right now adding/deleting <udp_dev>s to the context with open + * streams is not supported. + * <udp_stream> represents an UDP endpoint <addr, port> and is an analogy to + * socket entity. + * As with a socket, there are ability to do recv/send over it. + * <udp_stream> belongs to particular <udp_ctx> but is visible globally across + * the process, i.e. any thread within the process can do recv/send over it + * without any further synchronisation. + * While 'upper' layer API is thread safe, lower layer API (rx_bulk/tx_bulk) + * is not thread safe and is not supposed to be run on multiple threads + * in parallel. + * So single thread can drive multiple <udp_ctx>s and do IO for them, + * but multiple threads can't drive same <udp_ctx> without some + * explicit synchronization. + */ + +struct tle_udp_ctx; +struct tle_udp_dev; + +/** + * UDP device parameters. + */ +struct tle_udp_dev_param { + uint32_t rx_offload; /**< DEV_RX_OFFLOAD_* supported. */ + uint32_t tx_offload; /**< DEV_TX_OFFLOAD_* supported. */ + struct in_addr local_addr4; /**< local IPv4 address assigned. */ + struct in6_addr local_addr6; /**< local IPv6 address assigned. */ +}; + +#define TLE_UDP_MAX_HDR 0x60 + +struct tle_udp_dest { + struct rte_mempool *head_mp; /**< MP for fragment feaders. */ + struct tle_udp_dev *dev; /**< device to send packets through. */ + uint16_t mtu; /**< MTU for given destination. */ + uint8_t l2_len; /**< L2 header lenght. */ + uint8_t l3_len; /**< L3 header lenght. */ + uint8_t hdr[TLE_UDP_MAX_HDR]; /**< L2/L3 headers. */ +}; + +/** + * UDP context creation parameters. + */ +struct tle_udp_ctx_param { + int32_t socket_id; /**< socket ID to allocate memory for. */ + uint32_t max_streams; /**< max number of streams in context. */ + uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */ + uint32_t max_stream_sbufs; /**< max send mbufs per stream. */ + + int (*lookup4)(void *opaque, const struct in_addr *addr, + struct tle_udp_dest *res); + /**< will be called by send() to get IPv4 packet destination info. */ + void *lookup4_data; + /**< opaque data pointer for lookup4() callback. */ + + int (*lookup6)(void *opaque, const struct in6_addr *addr, + struct tle_udp_dest *res); + /**< will be called by send() to get IPv6 packet destination info. */ + void *lookup6_data; + /**< opaque data pointer for lookup6() callback. */ +}; + +/** + * create UDP context. + * @param ctx_prm + * Parameters used to create and initialise the UDP context. + * @return + * Pointer to UDP context structure that can be used in future UDP + * operations, or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOMEM - out of memory + */ +struct tle_udp_ctx * +tle_udp_create(const struct tle_udp_ctx_param *ctx_prm); + +/** + * Destroy given UDP context. + * + * @param ctx + * UDP context to destroy + */ +void tle_udp_destroy(struct tle_udp_ctx *ctx); + +/** + * Add new device into the given UDP context. + * This function is not multi-thread safe. + * + * @param ctx + * UDP context to add new device into. + * @param dev_prm + * Parameters used to create and initialise new device inside the + * UDP context. + * @return + * Pointer to UDP device structure that can be used in future UDP + * operations, or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENODEV - max possible value of open devices is reached + * - ENOMEM - out of memory + */ +struct tle_udp_dev * +tle_udp_add_dev(struct tle_udp_ctx *ctx, + const struct tle_udp_dev_param *dev_prm); + +/** + * Remove and destroy previously added device from the given UDP context. + * This function is not multi-thread safe. + * + * @param dev + * UDP device to remove and destroy. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_del_dev(struct tle_udp_dev *dev); + +/** + * Flags to the UDP context that destinations info might be changed, + * so if it has any destinations data cached, then + * it has to be invalidated. + * @param ctx + * UDP context to invalidate. + */ +void tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx); + +struct tle_udp_stream; + +/** + * Stream asynchronous notification mechanisms: + * a) recv/send callback. + * Stream recv/send notification callbacks behaviour is edge-triggered (ET). + * recv callback will be invoked if stream receive buffer was empty and + * new packet(s) have arrived. + * send callback will be invoked when stream send buffer was full, + * and some packets belonging to that stream were sent + * (part of send buffer became free again). + * Note that both recv and send callbacks are called with sort of read lock + * held on that stream. So it is not permitted to call stream_close() + * within the callback function. Doing that would cause a deadlock. + * While it is allowed to call stream send/recv functions within the + * callback, it is not recommended: callback function will be invoked + * within tle_udp_rx_bulk/tle_udp_tx_bulk context and some heavy processing + * within the callback functions might cause performance degradation + * or even loss of packets for further streams. + * b) recv/send event. + * Stream recv/send events behavour is level-triggered (LT). + * receive event will be raised by either + * tle_udp_rx_burst() or tle_udp_stream_recv() as long as there are any + * remaining packets inside stream receive buffer. + * send event will be raised by either + * tle_udp_tx_burst() or tle_udp_stream_send() as long as there are any + * free space inside stream send buffer. + * Note that callback and event are mutually exclusive on <stream, op> basis. + * It is not possible to open a stream with both recv event and callback + * specified. + * Though it is possible to open a stream with recv callback and send event, + * or visa-versa. + * If the user doesn't need any notification mechanism for that stream, + * both event and callback could be set to zero. + */ + +/** + * Stream recv/send callback function and data. + */ +struct tle_udp_stream_cb { + void (*func)(void *, struct tle_udp_stream *); + void *data; +}; + +struct tle_event; + +/** + * UDP stream creation parameters. + */ +struct tle_udp_stream_param { + struct sockaddr_storage local_addr; /**< stream local address. */ + struct sockaddr_storage remote_addr; /**< stream remote address. */ + + /* _cb and _ev are mutually exclusive */ + struct tle_event *recv_ev; /**< recv event to use. */ + struct tle_udp_stream_cb recv_cb; /**< recv callback to use. */ + + struct tle_event *send_ev; /**< send event to use. */ + struct tle_udp_stream_cb send_cb; /**< send callback to use. */ +}; + +/** + * create a new stream within given UDP context. + * @param ctx + * UDP context to create new stream within. + * @param prm + * Parameters used to create and initialise the new stream. + * @return + * Pointer to UDP stream structure that can be used in future UDP API calls, + * or NULL on error, with error code set in rte_errno. + * Possible rte_errno errors include: + * - EINVAL - invalid parameter passed to function + * - ENOFILE - max limit of open streams reached for that context + */ +struct tle_udp_stream * +tle_udp_stream_open(struct tle_udp_ctx *ctx, + const struct tle_udp_stream_param *prm); + +/** + * close an open stream. + * All packets still remaining in stream receive buffer will be freed. + * All packets still remaining in stream transmit buffer will be kept + * for father transmission. + * @param s + * Pointer to the stream to close. + * @return + * zero on successful completion. + * - -EINVAL - invalid parameter passed to function + */ +int tle_udp_stream_close(struct tle_udp_stream *s); + +/** + * get open stream parameters. + * @param s + * Pointer to the stream. + * @return + * zero on successful completion. + * - EINVAL - invalid parameter passed to function + */ +int +tle_udp_stream_get_param(const struct tle_udp_stream *s, + struct tle_udp_stream_param *prm); + +/** + * Take input mbufs and distribute them to open UDP streams. + * expects that for each input packet: + * - l2_len, l3_len, l4_len are setup correctly + * - (packet_type & (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6)) != 0, + * - (packet_type & RTE_PTYPE_L4_UDP) != 0, + * During delivery L3/L4 checksums will be verified + * (either relies on HW offload or in SW). + * This function is not multi-thread safe. + * @param dev + * UDP device the packets were received from. + * @param pkt + * The burst of input packets that need to be processed. + * @param rp + * The array that will contain pointers of unprocessed packets at return. + * Should contain at least *num* elements. + * @param rc + * The array that will contain error code for corresponding rp[] entry: + * - ENOENT - no open stream matching this packet. + * - ENOBUFS - receive buffer of the destination stream is full. + * Should contain at least *num* elements. + * @param num + * Number of elements in the *pkt* input array. + * @return + * number of packets delivered to the UDP streams. + */ +uint16_t tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num); + +/** + * Fill *pkt* with pointers to the packets that have to be transmitted + * over given UDP device. + * Output packets have to be ready to be passed straight to rte_eth_tx_burst() + * without any extra processing. + * UDP/IPv4 checksum either already calculated or appropriate mbuf fields set + * properly for HW offload. + * This function is not multi-thread safe. + * @param dev + * UDP device the output packets will be transmitted over. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + uint16_t num); + +/* + * return up to *num* mbufs that was received for given UDP stream. + * For each returned mbuf: + * data_off set to the start of the packet's UDP data + * l2_len, l3_len, l4_len are setup properly + * (so user can still extract L2/L3 address info if needed) + * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly. + * L3/L4 checksum is verified. + * Packets with invalid L3/L4 checksum will be silently dropped. + * @param s + * UDP stream to receive packets from. + * @param pkt + * An array of pointers to *rte_mbuf* structures that + * must be large enough to store up to *num* pointers in it. + * @param num + * Number of elements in the *pkt* array. + * @return + * number of of entries filled inside *pkt* array. + */ +uint16_t tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num); + +/** + * Consume and queue up to *num* packets, that will be sent eventually + * by tle_udp_tx_bulk(). + * If *dst_addr* is NULL, then default remote address associated with that + * stream (if any) will be used. + * The main purpose of that function is to determine over which UDP dev + * given packets have to be sent out and do necessary preparations for that. + * Based on the *dst_addr* it does route lookup, fills L2/L3/L4 headers, + * and, if necessary, fragments packets. + * Depending on the underlying device information, it either does + * IP/UDP checksum calculations in SW or sets mbuf TX checksum + * offload fields properly. + * For each input mbuf the following conditions have to be met: + * - data_off point to the start of packet's UDP data. + * - there is enough header space to prepend L2/L3/L4 headers. + * @param s + * UDP stream to send packets over. + * @param pkt + * The burst of output packets that need to be send. + * @param num + * Number of elements in the *pkt* array. + * @param dst_addr + * Destination address to send packets to. + * @return + * number of packets successfully queued in the stream send buffer. + */ +uint16_t tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_UDP_IMPL_H_ */ diff --git a/lib/libtle_udp/udp_ctl.c b/lib/libtle_udp/udp_ctl.c new file mode 100644 index 0000000..36ec8a6 --- /dev/null +++ b/lib/libtle_udp/udp_ctl.c @@ -0,0 +1,723 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string.h> +#include <rte_malloc.h> +#include <rte_errno.h> +#include <rte_ethdev.h> +#include <rte_ip.h> +#include <rte_udp.h> + +#include "udp_impl.h" +#include "misc.h" + +#define LPORT_START 0x8000 +#define LPORT_END MAX_PORT_NUM + +#define LPORT_START_BLK PORT_BLK(LPORT_START) +#define LPORT_END_BLK PORT_BLK(LPORT_END) + +#define MAX_BURST 0x20 + +static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT; +static const struct in6_addr tle_udp6_none = { + { + .__u6_addr32 = { + UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX + }, + }, +}; + +static int +check_dev_prm(const struct tle_udp_dev_param *dev_prm) +{ + /* no valid IPv4/IPv6 addresses provided. */ + if (dev_prm->local_addr4.s_addr == INADDR_ANY && + memcmp(&dev_prm->local_addr6, &tle_udp6_any, + sizeof(tle_udp6_any)) == 0) + return -EINVAL; + + return 0; +} + +static void +unuse_stream(struct tle_udp_stream *s) +{ + s->type = TLE_UDP_VNUM; + rte_atomic32_set(&s->rx.use, INT32_MIN); + rte_atomic32_set(&s->tx.use, INT32_MIN); +} + +static int +init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + size_t sz; + uint32_t n; + struct bcg_store_prm sp; + char name[RTE_RING_NAMESIZE]; + + /* init RX part. */ + + n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U); + n = rte_align32pow2(n); + sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]); + + s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx->prm.socket_id); + if (s->rx.q == NULL) { + UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d " + "failed with error code: %d\n", + __func__, s, sz, ctx->prm.socket_id, rte_errno); + return ENOMEM; + } + + snprintf(name, sizeof(name), "%p@%zu", s, sz); + rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ); + + /* init TX part. */ + + sp.socket_id = ctx->prm.socket_id; + sp.max_bufs = ctx->prm.max_stream_sbufs; + sp.min_cages = RTE_DIM(ctx->dev) + 1; + sp.cage_bufs = MAX_BURST; + sp.cage_align = RTE_CACHE_LINE_SIZE; + sp.user_data = s; + + s->tx.st = bcg_create(&sp); + if (s->tx.st == NULL) { + UDP_LOG(ERR, + "%s(%p): bcg_create() failed with error code: %d\n", + __func__, s, rte_errno); + return ENOMEM; + } + + s->ctx = ctx; + unuse_stream(s); + STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); + + return 0; +} + +static void +fini_stream(struct tle_udp_stream *s) +{ + bcg_destroy(s->tx.st); + rte_free(s->rx.q); +} + +struct tle_udp_ctx * +tle_udp_create(const struct tle_udp_ctx_param *ctx_prm) +{ + struct tle_udp_ctx *ctx; + size_t sz; + uint32_t i; + + if (ctx_prm == NULL) { + rte_errno = EINVAL; + return NULL; + } + + sz = sizeof(*ctx); + ctx = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx_prm->socket_id); + if (ctx == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes for new udp_ctx " + "on socket %d failed\n", + sz, ctx_prm->socket_id); + return NULL; + } + + ctx->prm = *ctx_prm; + + sz = sizeof(*ctx->streams.buf) * ctx_prm->max_streams; + ctx->streams.buf = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + ctx_prm->socket_id); + if (ctx->streams.buf == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on socket %d " + "for %u udp_streams failed\n", + sz, ctx_prm->socket_id, ctx_prm->max_streams); + tle_udp_destroy(ctx); + return NULL; + } + + STAILQ_INIT(&ctx->streams.free); + for (i = 0; i != ctx_prm->max_streams && + init_stream(ctx, &ctx->streams.buf[i]) == 0; + i++) + ; + + if (i != ctx_prm->max_streams) { + UDP_LOG(ERR, "initalisation of %u-th stream failed", i); + tle_udp_destroy(ctx); + return NULL; + } + + for (i = 0; i != RTE_DIM(ctx->use); i++) + udp_pbm_init(ctx->use + i, LPORT_START_BLK); + + ctx->streams.nb_free = ctx->prm.max_streams; + return ctx; +} + +void +tle_udp_destroy(struct tle_udp_ctx *ctx) +{ + uint32_t i; + + if (ctx == NULL) { + rte_errno = EINVAL; + return; + } + + if (ctx->streams.buf != 0) { + for (i = 0; i != ctx->prm.max_streams; i++) + fini_stream(&ctx->streams.buf[i]); + rte_free(ctx->streams.buf); + } + + for (i = 0; i != RTE_DIM(ctx->dev); i++) + tle_udp_del_dev(ctx->dev + i); + + rte_free(ctx); +} + +void +tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx) +{ + RTE_SET_USED(ctx); +} + +static int +init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id) +{ + size_t sz; + + sz = sizeof(*dev->dp[idx]); + dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, + socket_id); + + if (dev->dp[idx] == NULL) { + UDP_LOG(ERR, "allocation of %zu bytes on " + "socket %d for %u-th device failed\n", + sz, socket_id, idx); + return ENOMEM; + } + + udp_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK); + return 0; +} + +static struct tle_udp_dev * +find_free_dev(struct tle_udp_ctx *ctx) +{ + uint32_t i; + + if (ctx->nb_dev < RTE_DIM(ctx->dev)) { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].ctx != ctx) + return ctx->dev + i; + } + } + + rte_errno = ENODEV; + return NULL; +} + +struct tle_udp_dev * +tle_udp_add_dev(struct tle_udp_ctx *ctx, + const struct tle_udp_dev_param *dev_prm) +{ + int32_t rc; + struct tle_udp_dev *dev; + + if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + dev = find_free_dev(ctx); + if (dev == NULL) + return NULL; + rc = 0; + + /* device can handle IPv4 traffic */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY) + rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id); + + /* device can handle IPv6 traffic */ + if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_udp6_any, + sizeof(tle_udp6_any)) != 0) + rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id); + + if (rc != 0) { + /* cleanup and return an error. */ + rte_free(dev->dp[TLE_UDP_V4]); + rte_free(dev->dp[TLE_UDP_V6]); + rte_errno = rc; + return NULL; + } + + /* setup RX data. */ + if (dev_prm->local_addr4.s_addr != INADDR_ANY && + (dev_prm->rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM) == 0) + dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_IP_CKSUM_BAD; + if ((dev_prm->rx_offload & DEV_RX_OFFLOAD_UDP_CKSUM) == 0) { + dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_L4_CKSUM_BAD; + dev->rx.ol_flags[TLE_UDP_V6] |= PKT_RX_L4_CKSUM_BAD; + } + + /* setup TX data. */ + bcg_queue_reset(&dev->tx.beq); + bcg_queue_reset(&dev->tx.feq); + + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) { + dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM; + dev->tx.ol_flags[TLE_UDP_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM; + } + if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0) + dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM; + + dev->prm = *dev_prm; + dev->ctx = ctx; + ctx->nb_dev++; + + return dev; +} + +static void +empty_cage(struct buf_cage *bc) +{ + uint32_t i, n; + struct rte_mbuf *pkt[MAX_BURST]; + + do { + n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt)); + for (i = 0; i != n; i++) + rte_pktmbuf_free(pkt[i]); + } while (n != 0); + + bcg_free(bc); +} + +int +tle_udp_del_dev(struct tle_udp_dev *dev) +{ + uint32_t p; + struct buf_cage *bc; + struct tle_udp_ctx *ctx; + + ctx = dev->ctx; + + if (dev == NULL || dev->ctx == NULL) + return -EINVAL; + + p = dev - ctx->dev; + + if (p >= RTE_DIM(ctx->dev) || + (dev->dp[TLE_UDP_V4] == NULL && + dev->dp[TLE_UDP_V6] == NULL)) + return -EINVAL; + + /* emtpy TX queues. */ + if (dev->tx.bc != NULL) + empty_cage(dev->tx.bc); + + bcg_queue_append(&dev->tx.beq, &dev->tx.feq); + + while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL) + empty_cage(bc); + + rte_free(dev->dp[TLE_UDP_V4]); + rte_free(dev->dp[TLE_UDP_V6]); + memset(dev, 0, sizeof(*dev)); + ctx->nb_dev--; + return 0; +} + +static inline void +stream_down(struct tle_udp_stream *s) +{ + rwl_down(&s->rx.use); + rwl_down(&s->tx.use); +} + +static inline void +stream_up(struct tle_udp_stream *s) +{ + rwl_up(&s->rx.use); + rwl_up(&s->tx.use); +} + +static struct tle_udp_dev * +find_ipv4_dev(struct tle_udp_ctx *ctx, const struct in_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr && + ctx->dev[i].dp[TLE_UDP_V4] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static struct tle_udp_dev * +find_ipv6_dev(struct tle_udp_ctx *ctx, const struct in6_addr *addr) +{ + uint32_t i; + + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (memcmp(&ctx->dev[i].prm.local_addr6, addr, + sizeof(*addr)) == 0 && + ctx->dev[i].dp[TLE_UDP_V6] != NULL) + return ctx->dev + i; + } + + return NULL; +} + +static int +stream_fill_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + struct tle_udp_dev *dev; + struct udp_pbm *pbm; + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + uint32_t i, p, sp, t; + + if (s->prm.local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&s->prm.local_addr; + t = TLE_UDP_V4; + p = lin4->sin_port; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&s->prm.local_addr; + t = TLE_UDP_V6; + p = lin6->sin6_port; + } else + return EINVAL; + + p = ntohs(p); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_UDP_V4 && lin4->sin_addr.s_addr != INADDR_ANY) { + dev = find_ipv4_dev(ctx, &lin4->sin_addr); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &lin6->sin6_addr, + sizeof(tle_udp6_any)) != 0) { + dev = find_ipv6_dev(ctx, &lin6->sin6_addr); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + if (dev != NULL) + pbm = &dev->dp[t]->use; + else + pbm = &ctx->use[t]; + + /* try to acquire local port number. */ + if (p == 0) { + p = udp_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK); + if (p == 0 && pbm->blk > LPORT_START_BLK) + p = udp_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk); + } else if (udp_pbm_check(pbm, p) != 0) + return EEXIST; + + if (p == 0) + return ENFILE; + + /* fill socket's dst port and type */ + + sp = htons(p); + s->type = t; + s->port.dst = sp; + + /* mark port as in-use */ + + udp_pbm_set(&ctx->use[t], p); + if (dev != NULL) { + udp_pbm_set(pbm, p); + dev->dp[t]->streams[sp] = s; + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL) { + udp_pbm_set(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = s; + } + } + } + + return 0; +} + +static int +stream_clear_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s) +{ + struct tle_udp_dev *dev; + uint32_t i, p, sp, t; + + t = s->type; + sp = s->port.dst; + p = ntohs(sp); + + /* if local address is not wildcard, find device it belongs to. */ + if (t == TLE_UDP_V4 && s->ipv4.addr.dst != INADDR_ANY) { + dev = find_ipv4_dev(ctx, (struct in_addr *)&s->ipv4.addr.dst); + if (dev == NULL) + return ENODEV; + } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &s->ipv6.addr.dst, + sizeof(tle_udp6_any)) != 0) { + dev = find_ipv6_dev(ctx, (struct in6_addr *)&s->ipv6.addr.dst); + if (dev == NULL) + return ENODEV; + } else + dev = NULL; + + udp_pbm_clear(&ctx->use[t], p); + if (dev != NULL) { + udp_pbm_clear(&dev->dp[t]->use, p); + dev->dp[t]->streams[sp] = NULL; + } else { + for (i = 0; i != RTE_DIM(ctx->dev); i++) { + if (ctx->dev[i].dp[t] != NULL) { + udp_pbm_clear(&ctx->dev[i].dp[t]->use, p); + ctx->dev[i].dp[t]->streams[sp] = NULL; + } + } + } + + return 0; +} + +static struct tle_udp_stream * +get_stream(struct tle_udp_ctx *ctx) +{ + struct tle_udp_stream *s; + + s = NULL; + if (ctx->streams.nb_free == 0) + return s; + + rte_spinlock_lock(&ctx->streams.lock); + if (ctx->streams.nb_free != 0) { + s = STAILQ_FIRST(&ctx->streams.free); + STAILQ_REMOVE_HEAD(&ctx->streams.free, link); + ctx->streams.nb_free--; + } + rte_spinlock_unlock(&ctx->streams.lock); + return s; +} + +static void +put_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s, int32_t head) +{ + s->type = TLE_UDP_VNUM; + rte_spinlock_lock(&ctx->streams.lock); + if (head != 0) + STAILQ_INSERT_HEAD(&ctx->streams.free, s, link); + else + STAILQ_INSERT_TAIL(&ctx->streams.free, s, link); + ctx->streams.nb_free++; + rte_spinlock_unlock(&ctx->streams.lock); +} + +static void +fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask) +{ + *addr = in->sin_addr.s_addr; + *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE; +} + +static void +fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask) +{ + const struct in6_addr *pm; + + memcpy(addr, &in->sin6_addr, sizeof(*addr)); + if (memcmp(&tle_udp6_any, addr, sizeof(*addr)) == 0) + pm = &tle_udp6_any; + else + pm = &tle_udp6_none; + + memcpy(mask, pm, sizeof(*mask)); +} + +static int +check_stream_prm(const struct tle_udp_stream_param *prm) +{ + if ((prm->local_addr.ss_family != AF_INET && + prm->local_addr.ss_family != AF_INET6) || + prm->local_addr.ss_family != prm->remote_addr.ss_family) + return EINVAL; + + /* callback and event notifications mechanisms are mutually exclusive */ + if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) || + (prm->send_ev != NULL && prm->send_cb.func != NULL)) + return EINVAL; + + return 0; +} + +struct tle_udp_stream * +tle_udp_stream_open(struct tle_udp_ctx *ctx, + const struct tle_udp_stream_param *prm) +{ + struct tle_udp_stream *s; + const struct sockaddr_in *rin; + int32_t rc; + + if (ctx == NULL || prm == NULL || check_stream_prm(prm) != 0) { + rte_errno = EINVAL; + return NULL; + } + + s = get_stream(ctx); + if (s == NULL) { + rte_errno = ENFILE; + return NULL; + + /* some TX still pending for that stream. */ + } else if (bcg_store_full(s->tx.st) == 0) { + put_stream(ctx, s, 0); + rte_errno = EAGAIN; + return NULL; + } + + /* copy input parameters. */ + s->prm = *prm; + + /* setup ports and port mask fields (except dst port). */ + rin = (const struct sockaddr_in *)&prm->remote_addr; + s->port.src = rin->sin_port; + s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX; + s->pmsk.dst = UINT16_MAX; + + /* setup src and dst addresses. */ + if (prm->local_addr.ss_family == AF_INET) { + fill_ipv4_am((const struct sockaddr_in *)&prm->local_addr, + &s->ipv4.addr.dst, &s->ipv4.mask.dst); + fill_ipv4_am((const struct sockaddr_in *)&prm->remote_addr, + &s->ipv4.addr.src, &s->ipv4.mask.src); + } else if (prm->local_addr.ss_family == AF_INET6) { + fill_ipv6_am((const struct sockaddr_in6 *)&prm->local_addr, + &s->ipv6.addr.dst, &s->ipv6.mask.dst); + fill_ipv6_am((const struct sockaddr_in6 *)&prm->remote_addr, + &s->ipv6.addr.src, &s->ipv6.mask.src); + } + + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_fill_dev(ctx, s); + rte_spinlock_unlock(&ctx->dev_lock); + + if (rc != 0) { + put_stream(ctx, s, 1); + s = NULL; + rte_errno = rc; + } else { + /* setup stream notification menchanism */ + s->rx.ev = prm->recv_ev; + s->rx.cb = prm->recv_cb; + s->tx.ev = prm->send_ev; + s->tx.cb = prm->send_cb; + + /* mark stream as avaialbe for RX/TX */ + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + stream_up(s); + } + + return s; +} + +int +tle_udp_stream_close(struct tle_udp_stream *s) +{ + uint32_t i, n; + int32_t rc; + struct tle_udp_ctx *ctx; + struct rte_mbuf *m[MAX_BURST]; + + static const struct tle_udp_stream_cb zcb; + + if (s == NULL || s->type >= TLE_UDP_VNUM) + return EINVAL; + + ctx = s->ctx; + + /* mark stream as unavaialbe for RX/TX. */ + stream_down(s); + + /* reset TX cages. */ + rte_spinlock_lock(&s->tx.lock); + memset(s->tx.cg, 0, sizeof(s->tx.cg)); + rte_spinlock_unlock(&s->tx.lock); + + /* reset stream events if any. */ + if (s->rx.ev != NULL) { + tle_event_idle(s->rx.ev); + s->rx.ev = NULL; + } + if (s->tx.ev != NULL) { + tle_event_idle(s->tx.ev); + s->tx.ev = NULL; + } + + s->rx.cb = zcb; + s->tx.cb = zcb; + + /* free stream's destination port */ + rte_spinlock_lock(&ctx->dev_lock); + rc = stream_clear_dev(ctx, s); + rte_spinlock_unlock(&ctx->dev_lock); + + /* empty stream's RX queue */ + do { + n = rte_ring_dequeue_burst(s->rx.q, (void **)m, RTE_DIM(m)); + for (i = 0; i != n; i++) + rte_pktmbuf_free(m[i]); + } while (n != 0); + + /* + * mark the stream as free again. + * if there still are pkts queued for TX, + * then put this stream to the tail of free list. + */ + put_stream(ctx, s, bcg_store_full(s->tx.st)); + return rc; +} + +int +tle_udp_stream_get_param(const struct tle_udp_stream *s, + struct tle_udp_stream_param *prm) +{ + struct sockaddr_in *lin4; + struct sockaddr_in6 *lin6; + + if (prm == NULL || s == NULL || s->type >= TLE_UDP_VNUM) + return EINVAL; + + prm[0] = s->prm; + if (prm->local_addr.ss_family == AF_INET) { + lin4 = (struct sockaddr_in *)&prm->local_addr; + lin4->sin_port = s->port.dst; + } else if (s->prm.local_addr.ss_family == AF_INET6) { + lin6 = (struct sockaddr_in6 *)&prm->local_addr; + lin6->sin6_port = s->port.dst; + } + + return 0; +} diff --git a/lib/libtle_udp/udp_impl.h b/lib/libtle_udp/udp_impl.h new file mode 100644 index 0000000..fbdb743 --- /dev/null +++ b/lib/libtle_udp/udp_impl.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _UDP_IMPL_H_ +#define _UDP_IMPL_H_ + +#include <rte_spinlock.h> +#include <rte_vect.h> +#include <tle_udp_impl.h> +#include <tle_event.h> + +#include "buf_cage.h" +#include "port_bitmap.h" +#include "osdep.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum { + TLE_UDP_V4, + TLE_UDP_V6, + TLE_UDP_VNUM +}; + +union udp_ports { + uint32_t raw; + struct { + uint16_t src; + uint16_t dst; + }; +}; + +union udph { + uint64_t raw; + struct { + union udp_ports ports; + uint16_t len; + uint16_t cksum; + }; +}; + +union ipv4_addrs { + uint64_t raw; + struct { + uint32_t src; + uint32_t dst; + }; +}; + +union ipv6_addrs { + _ymm_t raw; + struct { + rte_xmm_t src; + rte_xmm_t dst; + }; +}; + +union ip_addrs { + union ipv4_addrs v4; + union ipv6_addrs v6; +}; + + +struct tle_udp_stream { + + STAILQ_ENTRY(tle_udp_stream) link; + struct tle_udp_ctx *ctx; + + uint8_t type; /* TLE_UDP_V4 or TLE_UDP_V6 */ + + struct { + struct rte_ring *q; + struct tle_event *ev; + struct tle_udp_stream_cb cb; + rte_atomic32_t use; + } rx; + + union udp_ports port; + union udp_ports pmsk; + + union { + struct { + union ipv4_addrs addr; + union ipv4_addrs mask; + } ipv4; + struct { + union ipv6_addrs addr; + union ipv6_addrs mask; + } ipv6; + }; + + struct { + rte_atomic32_t use; + rte_spinlock_t lock; + struct tle_event *ev; + struct tle_udp_stream_cb cb; + struct bcg_store *st; + struct buf_cage *cg[RTE_MAX_ETHPORTS]; + } tx __rte_cache_aligned; + + struct tle_udp_stream_param prm; +} __rte_cache_aligned; + +struct tle_udp_dport { + struct udp_pbm use; /* ports in use. */ + struct tle_udp_stream *streams[MAX_PORT_NUM]; /* port to stream. */ +}; + +struct tle_udp_dev { + struct tle_udp_ctx *ctx; + struct { + uint64_t ol_flags[TLE_UDP_VNUM]; + } rx; + struct { + /* used by FE. */ + uint64_t ol_flags[TLE_UDP_VNUM]; + rte_atomic32_t packet_id[TLE_UDP_VNUM]; + struct bcg_queue feq; + + /* used by BE only. */ + struct bcg_queue beq __rte_cache_min_aligned; + struct buf_cage *bc; + } tx; + struct tle_udp_dev_param prm; /* copy of device paramaters. */ + struct tle_udp_dport *dp[TLE_UDP_VNUM]; /* device udp ports */ +}; + +struct tle_udp_ctx { + struct tle_udp_ctx_param prm; + + struct { + rte_spinlock_t lock; + uint32_t nb_free; /* number of free streams. */ + STAILQ_HEAD(, tle_udp_stream) free; + struct tle_udp_stream *buf; /* array of streams */ + } streams; + + rte_spinlock_t dev_lock; + uint32_t nb_dev; + struct udp_pbm use[TLE_UDP_VNUM]; /* all ports in use. */ + struct tle_udp_dev dev[RTE_MAX_ETHPORTS]; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* _UDP_IMPL_H_ */ diff --git a/lib/libtle_udp/udp_rxtx.c b/lib/libtle_udp/udp_rxtx.c new file mode 100644 index 0000000..d5d248e --- /dev/null +++ b/lib/libtle_udp/udp_rxtx.c @@ -0,0 +1,767 @@ + +#include <rte_malloc.h> +#include <rte_errno.h> +#include <rte_ethdev.h> +#include <rte_ip.h> +#include <rte_ip_frag.h> +#include <rte_udp.h> + +#include "udp_impl.h" +#include "misc.h" + +static inline struct tle_udp_stream * +rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port) +{ + struct tle_udp_stream *s; + + if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL) + return NULL; + + s = dev->dp[type]->streams[port]; + if (s == NULL) + return NULL; + + if (rwl_acquire(&s->rx.use) < 0) + return NULL; + + return s; +} + +static inline uint16_t +get_pkt_type(const struct rte_mbuf *m) +{ + uint32_t v; + + v = m->packet_type & + (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK); + if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V4; + else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP)) + return TLE_UDP_V6; + else + return TLE_UDP_VNUM; +} + +static inline union udp_ports +pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m, + union udp_ports *ports, union ipv4_addrs *addr4, + union ipv6_addrs **addr6) +{ + uint32_t len; + union udp_ports ret, *up; + union ipv4_addrs *pa4; + + ret.src = get_pkt_type(m); + + len = m->l2_len; + if (ret.src == TLE_UDP_V4) { + pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *, + len + offsetof(struct ipv4_hdr, src_addr)); + addr4->raw = pa4->raw; + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4]; + } else if (ret.src == TLE_UDP_V6) { + *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *, + len + offsetof(struct ipv6_hdr, src_addr)); + m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6]; + } + + len += m->l3_len; + up = rte_pktmbuf_mtod_offset(m, union udp_ports *, + len + offsetof(struct udp_hdr, src_port)); + ports->raw = up->raw; + ret.dst = ports->dst; + return ret; +} + +/* + * Helper routine, enqueues packets to the stream and calls RX + * notification callback, if needed. + */ +static inline uint16_t +rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[], + int32_t rc[], uint32_t num) +{ + uint32_t i, k, r; + + r = rte_ring_enqueue_burst(s->rx.q, mb, num); + + /* if RX queue was empty invoke user RX notification callback. */ + if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r) + s->rx.cb.func(s->rx.cb.data, s); + + for (i = r, k = 0; i != num; i++, k++) { + rc[k] = ENOBUFS; + rp[k] = mb[i]; + } + + return r; +} + +static inline uint16_t +rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv6_addrs *addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((port[i].raw & s->pmsk.raw) != s->port.raw || + ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw, + &s->ipv6.mask.raw) != 0) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +static inline uint16_t +rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + union ipv4_addrs addr[], union udp_ports port[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + uint32_t i, k, n; + void *mb[num]; + + k = 0; + n = 0; + + for (i = 0; i != num; i++) { + + if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw || + (port[i].raw & s->pmsk.raw) != + s->port.raw) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } else { + mb[n] = pkt[i]; + n++; + } + } + + return rx_stream(s, mb, rp + k, rc + k, n); +} + +uint16_t +tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], + struct rte_mbuf *rp[], int32_t rc[], uint16_t num) +{ + struct tle_udp_stream *s; + uint32_t i, j, k, n, p, t; + union udp_ports tp[num], port[num]; + union ipv4_addrs a4[num]; + union ipv6_addrs *pa6[num]; + + for (i = 0; i != num; i++) + tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]); + + k = 0; + for (i = 0; i != num; i = j) { + + for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++) + ; + + t = tp[i].src; + p = tp[i].dst; + s = rx_stream_obtain(dev, t, p); + if (s != NULL) { + + if (t == TLE_UDP_V4) + n = rx_stream4(s, pkt + i, a4 + i, + port + i, rp + k, rc + k, j - i); + else + n = rx_stream6(s, pkt + i, pa6 + i, port + i, + rp + k, rc + k, j - i); + + k += j - i - n; + + if (s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + + } else { + for (; i != j; i++) { + rc[k] = ENOENT; + rp[k] = pkt[i]; + k++; + } + } + } + + return num - k; +} + +static inline void +tx_cage_release(struct buf_cage *bc) +{ + struct tle_udp_stream *s; + uint32_t n; + + s = bcg_get_udata(bc); + n = bcg_free(bc); + + /* If stream is still open, then mark it as avaialble for writing. */ + if (rwl_try_acquire(&s->tx.use) > 0) { + + if (s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + + /* if stream send buffer was full invoke TX callback */ + else if (s->tx.cb.func != NULL && n == 1) + s->tx.cb.func(s->tx.cb.data, s); + + } + + rwl_release(&s->tx.use); +} + +static inline void +tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc) +{ + struct tle_udp_stream *s; + struct tle_udp_ctx *ctx; + uint32_t idx; + + ctx = dev->ctx; + s = bcg_get_udata(bc); + idx = dev - ctx->dev; + + /* mark cage as closed to the stream. */ + rte_spinlock_lock(&s->tx.lock); + if (bc == s->tx.cg[idx]) + s->tx.cg[idx] = NULL; + rte_spinlock_unlock(&s->tx.lock); +} + +uint16_t +tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + for (i = 0; i != num; i += n) { + + bc = dev->tx.bc; + if (bc == NULL) { + if (dev->tx.beq.num == 0) + bcg_queue_append(&dev->tx.beq, &dev->tx.feq); + bc = __bcg_dequeue_head(&dev->tx.beq); + if (bc == NULL) + break; + tx_cage_update(dev, bc); + dev->tx.bc = bc; + } + + n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i); + + /* cage is empty, need to free it and notify related stream. */ + if (bcg_fill_count(bc) == 0) { + tx_cage_release(bc); + dev->tx.bc = NULL; + } + } + + return i; +} + +static int +check_pkt_csum(const struct rte_mbuf *m, uint32_t type) +{ + const struct ipv4_hdr *l3h4; + const struct ipv6_hdr *l3h6; + const struct udp_hdr *l4h; + int32_t ret; + uint16_t csum; + + ret = 0; + l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len); + l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len); + + if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) { + csum = _ipv4x_cksum(l3h4, m->l3_len); + ret = (csum != UINT16_MAX); + } + + if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) { + + /* + * for IPv4 it is allowed to have zero UDP cksum, + * for IPv6 valid UDP cksum is mandatory. + */ + if (type == TLE_UDP_V4) { + l4h = (const struct udp_hdr *)((uintptr_t)l3h4 + + m->l3_len); + csum = (l4h->dgram_cksum == 0) ? UINT16_MAX : + _ipv4_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h4); + } else + csum = _ipv6_udptcp_mbuf_cksum(m, + m->l2_len + m->l3_len, l3h6); + + ret = (csum != UINT16_MAX); + } + + return ret; +} + +/* exclude NULLs from the final list of packets. */ +static inline uint32_t +compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero) +{ + uint32_t i, j, k, l; + + for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) { + + /* found a hole. */ + if (pkt[j] == NULL) { + + /* find how big is it. */ + for (i = j; i-- != 0 && pkt[i] == NULL; ) + ; + /* fill the hole. */ + for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++) + pkt[l] = pkt[k]; + + nb_pkt -= j - i; + nb_zero -= j - i; + } + } + + return nb_pkt; +} + +/* + * helper function, do the necessary pre-processing for the received packets + * before handiing them to the strem_recv caller. + */ +static inline struct rte_mbuf * +recv_pkt_process(struct rte_mbuf *m, uint32_t type) +{ + uint64_t f; + + f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD); + if (f != 0) { + if (check_pkt_csum(m, type) == 0) + m->ol_flags ^= f; + else { + rte_pktmbuf_free(m); + return NULL; + } + } + + rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len); + return m; +} + +uint16_t +tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num) +{ + uint32_t i, k, n; + + n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num); + if (n == 0) + return 0; + + /* + * if we still have packets to read, + * then rearm stream RX event. + */ + if (n == num && rte_ring_count(s->rx.q) != 0) { + if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL) + tle_event_raise(s->rx.ev); + rwl_release(&s->rx.use); + } + + k = 0; + for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) { + pkt[i] = recv_pkt_process(pkt[i], s->type); + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type); + k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) + + (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL); + } + + switch (n % 4) { + case 3: + pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type); + k += (pkt[i + 2] == NULL); + case 2: + pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type); + k += (pkt[i + 1] == NULL); + case 1: + pkt[i] = recv_pkt_process(pkt[i], s->type); + k += (pkt[i] == NULL); + } + + return compress_pkt_list(pkt, n, k); +} + +static int32_t +udp_get_dest(struct tle_udp_stream *s, const void *dst_addr, + struct tle_udp_dest *dst) +{ + int32_t rc; + const struct in_addr *d4; + const struct in6_addr *d6; + struct tle_udp_ctx *ctx; + struct tle_udp_dev *dev; + + ctx = s->ctx; + + /* it is here just to keep gcc happy. */ + d4 = NULL; + + if (s->type == TLE_UDP_V4) { + d4 = dst_addr; + rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst); + } else if (s->type == TLE_UDP_V6) { + d6 = dst_addr; + rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst); + } else + rc = -ENOENT; + + if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx) + return -ENOENT; + + dev = dst->dev; + if (s->type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len); + l3h->src_addr = dev->prm.local_addr4.s_addr; + l3h->dst_addr = d4->s_addr; + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len); + rte_memcpy(l3h->src_addr, &dev->prm.local_addr6, + sizeof(l3h->src_addr)); + rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr)); + } + + return dev - ctx->dev; +} + +static inline int +udp_fill_mbuf(struct rte_mbuf *m, + uint32_t type, uint64_t ol_flags, uint32_t pid, + union udph udph, const struct tle_udp_dest *dst) +{ + uint32_t len, plen; + char *l2h; + union udph *l4h; + + len = dst->l2_len + dst->l3_len; + plen = m->pkt_len; + + /* copy to mbuf L2/L3 header template. */ + + l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h)); + if (l2h == NULL) + return -ENOBUFS; + + /* copy L2/L3 header */ + rte_memcpy(l2h, dst->hdr, len); + + /* copy UDP header */ + l4h = (union udph *)(l2h + len); + l4h->raw = udph.raw; + + /* setup mbuf TX offload related fields. */ + m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, + sizeof(*l4h), 0, 0, 0); + m->ol_flags |= ol_flags; + + l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + + /* update proto specific fields. */ + + if (type == TLE_UDP_V4) { + struct ipv4_hdr *l3h; + l3h = (struct ipv4_hdr *)(l2h + dst->l2_len); + l3h->packet_id = rte_cpu_to_be_16(pid); + l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + + sizeof(*l4h)); + + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len, + ol_flags); + else + l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h); + + if ((ol_flags & PKT_TX_IP_CKSUM) == 0) + l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len); + } else { + struct ipv6_hdr *l3h; + l3h = (struct ipv6_hdr *)(l2h + dst->l2_len); + l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h)); + if ((ol_flags & PKT_TX_UDP_CKSUM) != 0) + l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags); + else + l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h); + } + + return 0; +} + +/* ??? + * probably this function should be there - + * rte_ipv[4,6]_fragment_packet should do that. + */ +static inline void +frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type) +{ + struct ipv4_hdr *l3h; + + mf->ol_flags = ms->ol_flags; + mf->tx_offload = ms->tx_offload; + + if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) { + l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *); + l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len); + } +} + +/* + * Returns negative for failure to fragment or actual number of fragments. + */ +static inline int +fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num, + uint32_t type, const struct tle_udp_dest *dst) +{ + int32_t frag_num, i; + uint16_t mtu; + void *eth_hdr; + + /* Remove the Ethernet header from the input packet */ + rte_pktmbuf_adj(pkt, dst->l2_len); + mtu = dst->mtu - dst->l2_len; + + /* fragment packet */ + if (type == TLE_UDP_V4) + frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + else + frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu, + dst->head_mp, dst->head_mp); + + if (frag_num > 0) { + for (i = 0; i != frag_num; i++) { + + frag_fixup(pkt, frag[i], type); + + /* Move data_off to include l2 header first */ + eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len); + + /* copy l2 header into fragment */ + rte_memcpy(eth_hdr, dst->hdr, dst->l2_len); + } + } + + return frag_num; +} + +/* enqueue up to num packets to the destination device queue. */ +static inline uint16_t +queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc; + uint32_t i, n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + for (i = 0; i != num; i += n) { + if (bc == NULL) { + bc = bcg_alloc(s->tx.st); + if (bc == NULL) + break; + n = bcg_put(bc, pkt + i, num - i); + bcg_enqueue_tail(bq, bc); + } else + n = bcg_put(bc, pkt + i, num - i); + + if (n != num - i) + bc = NULL; + } + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return i; +} + +/* + * etiher enqueue all num packets or none. + * assumes that all number of input packets not exceed size of buf_cage. + */ +static inline uint16_t +queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di, + const void *pkt[], uint16_t num) +{ + struct buf_cage *bc, *bcp; + uint32_t n; + + rte_spinlock_lock(&s->tx.lock); + bc = s->tx.cg[di]; + + n = 0; + if (bc == NULL || bcg_free_count(bc) < num) { + bcp = bc; + bc = bcg_alloc(s->tx.st); + if (bc != NULL) { + if (bcp != NULL) + n = bcg_put(bcp, pkt, num); + n += bcg_put(bc, pkt, num - n); + bcg_enqueue_tail(bq, bc); + } + } else + n = bcg_put(bc, pkt, num); + + s->tx.cg[di] = bc; + rte_spinlock_unlock(&s->tx.lock); + return n; +} + +uint16_t +tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[], + uint16_t num, const struct sockaddr *dst_addr) +{ + int32_t di, frg, rc; + uint64_t ol_flags; + uint32_t i, k, n; + uint32_t mtu, pid, type; + const struct sockaddr_in *d4; + const struct sockaddr_in6 *d6; + const void *da; + union udph udph; + struct tle_udp_dest dst; + + type = s->type; + + /* start filling UDP header. */ + udph.raw = 0; + udph.ports.src = s->port.dst; + + /* figure out what destination addr/port to use. */ + if (dst_addr != NULL) { + if (dst_addr->sa_family != s->prm.remote_addr.ss_family) { + rte_errno = EINVAL; + return 0; + } + if (type == TLE_UDP_V4) { + d4 = (const struct sockaddr_in *)dst_addr; + da = &d4->sin_addr; + udph.ports.dst = d4->sin_port; + } else { + d6 = (const struct sockaddr_in6 *)dst_addr; + da = &d6->sin6_addr; + udph.ports.dst = d6->sin6_port; + } + } else { + udph.ports.dst = s->port.src; + if (type == TLE_UDP_V4) + da = &s->ipv4.addr.src; + else + da = &s->ipv6.addr.src; + } + + di = udp_get_dest(s, da, &dst); + if (di < 0) { + rte_errno = -di; + return 0; + } + + pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num; + mtu = dst.mtu - dst.l2_len - dst.l3_len; + + /* mark stream as not closable. */ + if (rwl_acquire(&s->tx.use) < 0) + return 0; + + for (i = 0, k = 0; k != num; k = i) { + + /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */ + + frg = 0; + ol_flags = dst.dev->tx.ol_flags[type]; + + while (i != num && frg == 0) { + frg = pkt[i]->pkt_len > mtu; + if (frg != 0) + ol_flags &= ~PKT_TX_UDP_CKSUM; + rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i, + udph, &dst); + if (rc != 0) { + rte_errno = -rc; + goto out; + } + i += (frg == 0); + } + + /* enqueue non-fragment packets to the destination device. */ + if (k != i) { + k += queue_pkt_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)&pkt[k], i - k); + + /* stream TX queue is full. */ + if (k != i) + break; + } + + /* enqueue packet that need to be fragmented */ + if (i != num) { + + struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG]; + + /* fragment the packet. */ + rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst); + if (rc < 0) { + rte_errno = -rc; + break; + } + + n = queue_frg_out(s, &dst.dev->tx.feq, di, + (const void **)(uintptr_t)frag, rc); + if (n == 0) { + while (rc-- != 0) + rte_pktmbuf_free(frag[rc]); + break; + } + + /* all fragments enqueued, free the original packet. */ + rte_pktmbuf_free(pkt[i]); + i++; + } + } + + /* if possible, rearm socket write event. */ + if (k == num && s->tx.ev != NULL) + tle_event_raise(s->tx.ev); + +out: + rwl_release(&s->tx.use); + + /* + * remove pkt l2/l3 headers, restore ol_flags for unsent, but + * already modified packets. + */ + ol_flags = ~dst.dev->tx.ol_flags[type]; + for (n = k; n != i; n++) { + rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph)); + pkt[n]->ol_flags &= ol_flags; + } + + return k; +} |