aboutsummaryrefslogtreecommitdiffstats
path: root/test/asf
diff options
context:
space:
mode:
Diffstat (limited to 'test/asf')
-rw-r--r--test/asf/README1
-rw-r--r--test/asf/asfframework.py1641
-rw-r--r--test/asf/debug_internal.py40
-rw-r--r--test/asf/test_adl.py108
-rw-r--r--test/asf/test_api_client.py20
-rw-r--r--test/asf/test_api_trace.py61
-rw-r--r--test/asf/test_bihash.py80
-rw-r--r--test/asf/test_buffers.py29
-rw-r--r--test/asf/test_cli.py91
-rw-r--r--test/asf/test_counters.py38
-rw-r--r--test/asf/test_crypto.py29
-rw-r--r--test/asf/test_endian.py43
-rw-r--r--test/asf/test_fib.py48
-rw-r--r--test/asf/test_http.py39
-rw-r--r--test/asf/test_http_static.py164
-rw-r--r--test/asf/test_lb_api.py82
-rw-r--r--test/asf/test_mactime.py163
-rw-r--r--test/asf/test_mpcap.py57
-rw-r--r--test/asf/test_node_variants.py148
-rw-r--r--test/asf/test_offload.py77
-rw-r--r--test/asf/test_perfmon.py48
-rw-r--r--test/asf/test_policer.py126
-rw-r--r--test/asf/test_prom.py58
-rw-r--r--test/asf/test_quic.py601
-rw-r--r--test/asf/test_session.py195
-rw-r--r--test/asf/test_sparse_vec.py34
-rw-r--r--test/asf/test_string.py59
-rw-r--r--test/asf/test_tap.py37
-rw-r--r--test/asf/test_tcp.py125
-rw-r--r--test/asf/test_tls.py154
-rw-r--r--test/asf/test_util.py40
-rw-r--r--test/asf/test_vapi.py82
-rw-r--r--test/asf/test_vcl.py1263
-rw-r--r--test/asf/test_vhost.py145
-rw-r--r--test/asf/test_vpe_api.py54
-rw-r--r--test/asf/test_vppinfra.py38
36 files changed, 6018 insertions, 0 deletions
diff --git a/test/asf/README b/test/asf/README
new file mode 100644
index 00000000000..997be7c91d1
--- /dev/null
+++ b/test/asf/README
@@ -0,0 +1 @@
+A Scapy Free (ASF) Test Framework \ No newline at end of file
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py
new file mode 100644
index 00000000000..24880044cec
--- /dev/null
+++ b/test/asf/asfframework.py
@@ -0,0 +1,1641 @@
+#!/usr/bin/env python3
+
+from __future__ import print_function
+import logging
+import sys
+import os
+import select
+import signal
+import subprocess
+import unittest
+import re
+import time
+import faulthandler
+import random
+import copy
+import platform
+import shutil
+from pathlib import Path
+from collections import deque
+from threading import Thread, Event
+from inspect import getdoc, isclass
+from traceback import format_exception
+from logging import FileHandler, DEBUG, Formatter
+from enum import Enum
+from abc import ABC, abstractmethod
+
+from config import config, max_vpp_cpus
+import hook as hookmodule
+from vpp_lo_interface import VppLoInterface
+from vpp_papi_provider import VppPapiProvider
+import vpp_papi
+from vpp_papi.vpp_stats import VPPStats
+from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
+from log import (
+ RED,
+ GREEN,
+ YELLOW,
+ double_line_delim,
+ single_line_delim,
+ get_logger,
+ colorize,
+)
+from vpp_object import VppObjectRegistry
+from util import is_core_present
+from test_result_code import TestResultCode
+
+logger = logging.getLogger(__name__)
+
+# Set up an empty logger for the testcase that can be overridden as necessary
+null_logger = logging.getLogger("VppAsfTestCase")
+null_logger.addHandler(logging.NullHandler())
+
+
+if config.debug_framework:
+ import debug_internal
+
+"""
+ Test framework module.
+
+ The module provides a set of tools for constructing and running tests and
+ representing the results.
+"""
+
+
+class VppDiedError(Exception):
+ """exception for reporting that the subprocess has died."""
+
+ signals_by_value = {
+ v: k
+ for k, v in signal.__dict__.items()
+ if k.startswith("SIG") and not k.startswith("SIG_")
+ }
+
+ def __init__(self, rv=None, testcase=None, method_name=None):
+ self.rv = rv
+ self.signal_name = None
+ self.testcase = testcase
+ self.method_name = method_name
+
+ try:
+ self.signal_name = VppDiedError.signals_by_value[-rv]
+ except (KeyError, TypeError):
+ pass
+
+ if testcase is None and method_name is None:
+ in_msg = ""
+ else:
+ in_msg = " while running %s.%s" % (testcase, method_name)
+
+ if self.rv:
+ msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
+ in_msg,
+ self.rv,
+ " [%s]" % (self.signal_name if self.signal_name is not None else ""),
+ )
+ else:
+ msg = "VPP subprocess died unexpectedly%s." % in_msg
+
+ super(VppDiedError, self).__init__(msg)
+
+
+def pump_output(testclass):
+ """pump output from vpp stdout/stderr to proper queues"""
+ stdout_fragment = ""
+ stderr_fragment = ""
+ while not testclass.pump_thread_stop_flag.is_set():
+ readable = select.select(
+ [
+ testclass.vpp.stdout.fileno(),
+ testclass.vpp.stderr.fileno(),
+ testclass.pump_thread_wakeup_pipe[0],
+ ],
+ [],
+ [],
+ )[0]
+ if testclass.vpp.stdout.fileno() in readable:
+ read = os.read(testclass.vpp.stdout.fileno(), 102400)
+ if len(read) > 0:
+ split = read.decode("ascii", errors="backslashreplace").splitlines(True)
+ if len(stdout_fragment) > 0:
+ split[0] = "%s%s" % (stdout_fragment, split[0])
+ if len(split) > 0 and split[-1].endswith("\n"):
+ limit = None
+ else:
+ limit = -1
+ stdout_fragment = split[-1]
+ testclass.vpp_stdout_deque.extend(split[:limit])
+ if not config.cache_vpp_output:
+ for line in split[:limit]:
+ testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
+ if testclass.vpp.stderr.fileno() in readable:
+ read = os.read(testclass.vpp.stderr.fileno(), 102400)
+ if len(read) > 0:
+ split = read.decode("ascii", errors="backslashreplace").splitlines(True)
+ if len(stderr_fragment) > 0:
+ split[0] = "%s%s" % (stderr_fragment, split[0])
+ if len(split) > 0 and split[-1].endswith("\n"):
+ limit = None
+ else:
+ limit = -1
+ stderr_fragment = split[-1]
+
+ testclass.vpp_stderr_deque.extend(split[:limit])
+ if not config.cache_vpp_output:
+ for line in split[:limit]:
+ testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
+ # ignoring the dummy pipe here intentionally - the
+ # flag will take care of properly terminating the loop
+
+
+def _is_platform_aarch64():
+ return platform.machine() == "aarch64"
+
+
+is_platform_aarch64 = _is_platform_aarch64()
+
+
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+
+
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+class KeepAliveReporter(object):
+ """
+ Singleton object which reports test start to parent process
+ """
+
+ _shared_state = {}
+
+ def __init__(self):
+ self.__dict__ = self._shared_state
+ self._pipe = None
+
+ @property
+ def pipe(self):
+ return self._pipe
+
+ @pipe.setter
+ def pipe(self, pipe):
+ if self._pipe is not None:
+ raise Exception("Internal error - pipe should only be set once.")
+ self._pipe = pipe
+
+ def send_keep_alive(self, test, desc=None):
+ """
+ Write current test tmpdir & desc to keep-alive pipe to signal liveness
+ """
+ if self.pipe is None:
+ # if not running forked..
+ return
+
+ if isclass(test):
+ desc = "%s (%s)" % (desc, unittest.util.strclass(test))
+ else:
+ desc = test.id()
+
+ self.pipe.send((desc, config.vpp, test.tempdir, test.vpp.pid))
+
+
+class TestCaseTag(Enum):
+ # marks the suites that must run at the end
+ # using only a single test runner
+ RUN_SOLO = 1
+ # marks the suites broken on VPP multi-worker
+ FIXME_VPP_WORKERS = 2
+ # marks the suites broken when ASan is enabled
+ FIXME_ASAN = 3
+ # marks suites broken on Ubuntu-22.04
+ FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
+
+
+def create_tag_decorator(e):
+ def decorator(cls):
+ try:
+ cls.test_tags.append(e)
+ except AttributeError:
+ cls.test_tags = [e]
+ return cls
+
+ return decorator
+
+
+tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
+tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
+tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
+tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
+
+
+class DummyVpp:
+ returncode = None
+ pid = 0xCAFEBAFE
+
+ def poll(self):
+ pass
+
+ def terminate(self):
+ pass
+
+
+class CPUInterface(ABC):
+ cpus = []
+ skipped_due_to_cpu_lack = False
+
+ @classmethod
+ @abstractmethod
+ def get_cpus_required(cls):
+ pass
+
+ @classmethod
+ def assign_cpus(cls, cpus):
+ cls.cpus = cpus
+
+
+class VppAsfTestCase(CPUInterface, unittest.TestCase):
+ """This subclass is a base class for VPP test cases that are implemented as
+ classes. It provides methods to create and run test case.
+ """
+
+ extra_vpp_statseg_config = ""
+ extra_vpp_config = []
+ extra_vpp_plugin_config = []
+ logger = null_logger
+ vapi_response_timeout = 5
+ remove_configured_vpp_objects_on_tear_down = True
+
+ @classmethod
+ def has_tag(cls, tag):
+ """if the test case has a given tag - return true"""
+ try:
+ return tag in cls.test_tags
+ except AttributeError:
+ pass
+ return False
+
+ @classmethod
+ def is_tagged_run_solo(cls):
+ """if the test case class is timing-sensitive - return true"""
+ return cls.has_tag(TestCaseTag.RUN_SOLO)
+
+ @classmethod
+ def skip_fixme_asan(cls):
+ """if @tag_fixme_asan & ASan is enabled - mark for skip"""
+ if cls.has_tag(TestCaseTag.FIXME_ASAN):
+ vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
+ if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
+ cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
+
+ @classmethod
+ def instance(cls):
+ """Return the instance of this testcase"""
+ return cls.test_instance
+
+ @classmethod
+ def set_debug_flags(cls, d):
+ cls.gdbserver_port = 7777
+ cls.debug_core = False
+ cls.debug_gdb = False
+ cls.debug_gdbserver = False
+ cls.debug_all = False
+ cls.debug_attach = False
+ if d is None:
+ return
+ dl = d.lower()
+ if dl == "core":
+ cls.debug_core = True
+ elif dl == "gdb" or dl == "gdb-all":
+ cls.debug_gdb = True
+ elif dl == "gdbserver" or dl == "gdbserver-all":
+ cls.debug_gdbserver = True
+ elif dl == "attach":
+ cls.debug_attach = True
+ else:
+ raise Exception("Unrecognized DEBUG option: '%s'" % d)
+ if dl == "gdb-all" or dl == "gdbserver-all":
+ cls.debug_all = True
+
+ @classmethod
+ def get_vpp_worker_count(cls):
+ if not hasattr(cls, "vpp_worker_count"):
+ if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+ cls.vpp_worker_count = 0
+ else:
+ cls.vpp_worker_count = config.vpp_worker_count
+ return cls.vpp_worker_count
+
+ @classmethod
+ def get_cpus_required(cls):
+ return 1 + cls.get_vpp_worker_count()
+
+ @classmethod
+ def setUpConstants(cls):
+ """Set-up the test case class based on environment variables"""
+ cls.step = config.step
+ cls.plugin_path = ":".join(config.vpp_plugin_dir)
+ cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
+ cls.extern_plugin_path = ":".join(config.extern_plugin_dir)
+ debug_cli = ""
+ if cls.step or cls.debug_gdb or cls.debug_gdbserver:
+ debug_cli = "cli-listen localhost:5002"
+ size = re.search(r"\d+[gG]", config.coredump_size)
+ if size:
+ coredump_size = f"coredump-size {config.coredump_size}".lower()
+ else:
+ coredump_size = "coredump-size unlimited"
+ default_variant = config.variant
+ if default_variant is not None:
+ default_variant = "default { variant %s 100 }" % default_variant
+ else:
+ default_variant = ""
+
+ api_fuzzing = config.api_fuzz
+ if api_fuzzing is None:
+ api_fuzzing = "off"
+
+ cls.vpp_cmdline = [
+ config.vpp,
+ "unix",
+ "{",
+ "nodaemon",
+ debug_cli,
+ "full-coredump",
+ coredump_size,
+ "runtime-dir",
+ cls.tempdir,
+ "}",
+ "api-trace",
+ "{",
+ "on",
+ "}",
+ "api-segment",
+ "{",
+ "prefix",
+ cls.get_api_segment_prefix(),
+ "}",
+ "cpu",
+ "{",
+ "main-core",
+ str(cls.cpus[0]),
+ ]
+ if cls.extern_plugin_path not in (None, ""):
+ cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
+ if cls.get_vpp_worker_count():
+ cls.vpp_cmdline.extend(
+ ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
+ )
+ cls.vpp_cmdline.extend(
+ [
+ "}",
+ "physmem",
+ "{",
+ "max-size",
+ "32m",
+ "}",
+ "statseg",
+ "{",
+ "socket-name",
+ cls.get_stats_sock_path(),
+ cls.extra_vpp_statseg_config,
+ "}",
+ "socksvr",
+ "{",
+ "socket-name",
+ cls.get_api_sock_path(),
+ "}",
+ "node { ",
+ default_variant,
+ "}",
+ "api-fuzz {",
+ api_fuzzing,
+ "}",
+ "plugins",
+ "{",
+ "plugin",
+ "dpdk_plugin.so",
+ "{",
+ "disable",
+ "}",
+ "plugin",
+ "rdma_plugin.so",
+ "{",
+ "disable",
+ "}",
+ "plugin",
+ "lisp_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ ]
+ + cls.extra_vpp_plugin_config
+ + [
+ "}",
+ ]
+ )
+
+ if cls.extra_vpp_config is not None:
+ cls.vpp_cmdline.extend(cls.extra_vpp_config)
+
+ if not cls.debug_attach:
+ cls.logger.info("vpp_cmdline args: %s" % cls.vpp_cmdline)
+ cls.logger.info("vpp_cmdline: %s" % " ".join(cls.vpp_cmdline))
+
+ @classmethod
+ def wait_for_enter(cls):
+ if cls.debug_gdbserver:
+ print(double_line_delim)
+ print("Spawned GDB server with PID: %d" % cls.vpp.pid)
+ elif cls.debug_gdb:
+ print(double_line_delim)
+ print("Spawned VPP with PID: %d" % cls.vpp.pid)
+ else:
+ cls.logger.debug("Spawned VPP with PID: %d" % cls.vpp.pid)
+ return
+ print(single_line_delim)
+ print("You can debug VPP using:")
+ if cls.debug_gdbserver:
+ print(
+ f"sudo gdb {config.vpp} "
+ f"-ex 'target remote localhost:{cls.gdbserver_port}'"
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command, set up breakpoints etc., then resume VPP from "
+ "within gdb by issuing the 'continue' command"
+ )
+ cls.gdbserver_port += 1
+ elif cls.debug_gdb:
+ print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command and set up breakpoints etc., then resume VPP from"
+ " within gdb by issuing the 'continue' command"
+ )
+ print(single_line_delim)
+ input("Press ENTER to continue running the testcase...")
+
+ @classmethod
+ def attach_vpp(cls):
+ cls.vpp = DummyVpp()
+
+ @classmethod
+ def run_vpp(cls):
+ cls.logger.debug(f"Assigned cpus: {cls.cpus}")
+ cmdline = cls.vpp_cmdline
+
+ if cls.debug_gdbserver:
+ gdbserver = "/usr/bin/gdbserver"
+ if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
+ raise Exception(
+ "gdbserver binary '%s' does not exist or is "
+ "not executable" % gdbserver
+ )
+
+ cmdline = [
+ gdbserver,
+ "localhost:{port}".format(port=cls.gdbserver_port),
+ ] + cls.vpp_cmdline
+ cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
+
+ try:
+ cls.vpp = subprocess.Popen(
+ cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+ except subprocess.CalledProcessError as e:
+ cls.logger.critical(
+ "Subprocess returned with non-0 return code: (%s)", e.returncode
+ )
+ raise
+ except OSError as e:
+ cls.logger.critical(
+ "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
+ )
+ raise
+ except Exception as e:
+ cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
+ raise
+
+ cls.wait_for_enter()
+
+ @classmethod
+ def wait_for_coredump(cls):
+ corefile = cls.tempdir + "/core"
+ if os.path.isfile(corefile):
+ cls.logger.error("Waiting for coredump to complete: %s", corefile)
+ curr_size = os.path.getsize(corefile)
+ deadline = time.time() + 60
+ ok = False
+ while time.time() < deadline:
+ cls.sleep(1)
+ size = curr_size
+ curr_size = os.path.getsize(corefile)
+ if size == curr_size:
+ ok = True
+ break
+ if not ok:
+ cls.logger.error(
+ "Timed out waiting for coredump to complete: %s", corefile
+ )
+ else:
+ cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
+
+ @classmethod
+ def get_stats_sock_path(cls):
+ return "%s/stats.sock" % cls.tempdir
+
+ @classmethod
+ def get_api_sock_path(cls):
+ return "%s/api.sock" % cls.tempdir
+
+ @classmethod
+ def get_memif_sock_path(cls):
+ return "%s/memif.sock" % cls.tempdir
+
+ @classmethod
+ def get_api_segment_prefix(cls):
+ return os.path.basename(cls.tempdir) # Only used for VAPI
+
+ @classmethod
+ def get_tempdir(cls):
+ if cls.debug_attach:
+ tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
+ else:
+ tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+ os.mkdir(tmpdir)
+ return tmpdir
+
+ @classmethod
+ def create_file_handler(cls):
+ if config.log_dir is None:
+ cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
+ return
+
+ logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}"
+ if config.wipe_tmp_dir:
+ shutil.rmtree(logdir, ignore_errors=True)
+ os.mkdir(logdir)
+ cls.file_handler = FileHandler(f"{logdir}/log.txt")
+
+ @classmethod
+ def setUpClass(cls):
+ """
+ Perform class setup before running the testcase
+ Remove shared memory files, start vpp and connect the vpp-api
+ """
+ super(VppAsfTestCase, cls).setUpClass()
+ cls.logger = get_logger(cls.__name__)
+ cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---")
+ random.seed(config.rnd_seed)
+ if hasattr(cls, "parallel_handler"):
+ cls.logger.addHandler(cls.parallel_handler)
+ cls.logger.propagate = False
+ cls.set_debug_flags(config.debug)
+ cls.tempdir = cls.get_tempdir()
+ cls.create_file_handler()
+ cls.file_handler.setFormatter(
+ Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
+ )
+ cls.file_handler.setLevel(DEBUG)
+ cls.logger.addHandler(cls.file_handler)
+ cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
+ os.chdir(cls.tempdir)
+ cls.logger.info(
+ "Temporary dir is %s, api socket is %s",
+ cls.tempdir,
+ cls.get_api_sock_path(),
+ )
+ cls.logger.debug("Random seed is %s", config.rnd_seed)
+ cls.setUpConstants()
+ cls.verbose = 0
+ cls.vpp_dead = False
+ cls.registry = VppObjectRegistry()
+ cls.vpp_startup_failed = False
+ cls.reporter = KeepAliveReporter()
+ # need to catch exceptions here because if we raise, then the cleanup
+ # doesn't get called and we might end with a zombie vpp
+ try:
+ if cls.debug_attach:
+ cls.attach_vpp()
+ else:
+ cls.run_vpp()
+ cls.reporter.send_keep_alive(cls, "setUpClass")
+ VppTestResult.current_test_case_info = TestCaseInfo(
+ cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
+ )
+ cls.vpp_stdout_deque = deque()
+ cls.vpp_stderr_deque = deque()
+ if not cls.debug_attach:
+ cls.pump_thread_stop_flag = Event()
+ cls.pump_thread_wakeup_pipe = os.pipe()
+ cls.pump_thread = Thread(target=pump_output, args=(cls,))
+ cls.pump_thread.daemon = True
+ cls.pump_thread.start()
+ if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
+ cls.vapi_response_timeout = 0
+ cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
+ if cls.step:
+ hook = hookmodule.StepHook(cls)
+ else:
+ hook = hookmodule.PollHook(cls)
+ cls.vapi.register_hook(hook)
+ cls.statistics = VPPStats(socketname=cls.get_stats_sock_path())
+ try:
+ hook.poll_vpp()
+ except VppDiedError:
+ cls.wait_for_coredump()
+ cls.vpp_startup_failed = True
+ cls.logger.critical(
+ "VPP died shortly after startup, check the"
+ " output to standard error for possible cause"
+ )
+ raise
+ try:
+ cls.vapi.connect()
+ except (vpp_papi.VPPIOError, Exception) as e:
+ cls.logger.debug("Exception connecting to vapi: %s" % e)
+ cls.vapi.disconnect()
+
+ if cls.debug_gdbserver:
+ print(
+ colorize(
+ "You're running VPP inside gdbserver but "
+ "VPP-API connection failed, did you forget "
+ "to 'continue' VPP from within gdb?",
+ RED,
+ )
+ )
+ raise e
+ if cls.debug_attach:
+ last_line = cls.vapi.cli("show thread").split("\n")[-2]
+ cls.vpp_worker_count = int(last_line.split(" ")[0])
+ print("Detected VPP with %s workers." % cls.vpp_worker_count)
+ except vpp_papi.VPPRuntimeError as e:
+ cls.logger.debug("%s" % e)
+ cls.quit()
+ raise e
+ except Exception as e:
+ cls.logger.debug("Exception connecting to VPP: %s" % e)
+ cls.quit()
+ raise e
+ cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---")
+
+ @classmethod
+ def _debug_quit(cls):
+ if cls.debug_gdbserver or cls.debug_gdb:
+ try:
+ cls.vpp.poll()
+
+ if cls.vpp.returncode is None:
+ print()
+ print(double_line_delim)
+ print("VPP or GDB server is still running")
+ print(single_line_delim)
+ input(
+ "When done debugging, press ENTER to kill the "
+ "process and finish running the testcase..."
+ )
+ except AttributeError:
+ pass
+
+ @classmethod
+ def quit(cls):
+ """
+ Disconnect vpp-api, kill vpp and cleanup shared memory files
+ """
+ cls._debug_quit()
+
+ # first signal that we want to stop the pump thread, then wake it up
+ if hasattr(cls, "pump_thread_stop_flag"):
+ cls.pump_thread_stop_flag.set()
+ if hasattr(cls, "pump_thread_wakeup_pipe"):
+ os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
+ if hasattr(cls, "pump_thread"):
+ cls.logger.debug("Waiting for pump thread to stop")
+ cls.pump_thread.join()
+ if hasattr(cls, "vpp_stderr_reader_thread"):
+ cls.logger.debug("Waiting for stderr pump to stop")
+ cls.vpp_stderr_reader_thread.join()
+
+ if hasattr(cls, "vpp"):
+ if hasattr(cls, "vapi"):
+ cls.logger.debug(cls.vapi.vpp.get_stats())
+ cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
+ cls.vapi.disconnect()
+ cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
+ del cls.vapi
+ cls.vpp.poll()
+ if not cls.debug_attach and cls.vpp.returncode is None:
+ cls.wait_for_coredump()
+ cls.logger.debug("Sending TERM to vpp")
+ cls.vpp.terminate()
+ cls.logger.debug("Waiting for vpp to die")
+ try:
+ outs, errs = cls.vpp.communicate(timeout=5)
+ except subprocess.TimeoutExpired:
+ cls.vpp.kill()
+ outs, errs = cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
+ if not cls.debug_attach:
+ cls.vpp.stdout.close()
+ cls.vpp.stderr.close()
+ del cls.vpp
+
+ if cls.vpp_startup_failed:
+ stdout_log = cls.logger.info
+ stderr_log = cls.logger.critical
+ else:
+ stdout_log = cls.logger.info
+ stderr_log = cls.logger.info
+
+ if hasattr(cls, "vpp_stdout_deque"):
+ stdout_log(single_line_delim)
+ stdout_log("VPP output to stdout while running %s:", cls.__name__)
+ stdout_log(single_line_delim)
+ vpp_output = "".join(cls.vpp_stdout_deque)
+ with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
+ f.write(vpp_output)
+ stdout_log("\n%s", vpp_output)
+ stdout_log(single_line_delim)
+
+ if hasattr(cls, "vpp_stderr_deque"):
+ stderr_log(single_line_delim)
+ stderr_log("VPP output to stderr while running %s:", cls.__name__)
+ stderr_log(single_line_delim)
+ vpp_output = "".join(cls.vpp_stderr_deque)
+ with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
+ f.write(vpp_output)
+ stderr_log("\n%s", vpp_output)
+ stderr_log(single_line_delim)
+
+ @classmethod
+ def tearDownClass(cls):
+ """Perform final cleanup after running all tests in this test-case"""
+ cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---")
+ cls.reporter.send_keep_alive(cls, "tearDownClass")
+ cls.quit()
+ cls.file_handler.close()
+ if config.debug_framework:
+ debug_internal.on_tear_down_class(cls)
+ cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---")
+
+ def show_commands_at_teardown(self):
+ """Allow subclass specific teardown logging additions."""
+ self.logger.info("--- No test specific show commands provided. ---")
+
+ def unlink_testcase_file(self, path):
+ MAX_ATTEMPTS = 9
+ retries = MAX_ATTEMPTS
+ while retries > 0:
+ retries = retries - 1
+ self.logger.debug(f"Unlinking {path}")
+ try:
+ path.unlink()
+ # Loop until unlink() fails with FileNotFoundError to ensure file is removed
+ except FileNotFoundError:
+ break
+ except OSError:
+ self.logger.debug(f"OSError: unlinking {path}")
+ self.sleep(0.25, f"{retries} retries left")
+ if retries == 0 and os.path.isfile(path):
+ self.logger.error(
+ f"Unable to delete testcase file in {MAX_ATTEMPTS} attempts: {path}"
+ )
+ raise OSError
+
+ def tearDown(self):
+ """Show various debug prints after each test"""
+ self.logger.debug(
+ f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---"
+ )
+
+ try:
+ if not self.vpp_dead:
+ self.logger.debug(self.vapi.cli("show trace max 1000"))
+ self.logger.info(self.vapi.ppcli("show interface"))
+ self.logger.info(self.vapi.ppcli("show hardware"))
+ self.logger.info(self.statistics.set_errors_str())
+ self.logger.info(self.vapi.ppcli("show run"))
+ self.logger.info(self.vapi.ppcli("show log"))
+ self.logger.info(self.vapi.ppcli("show bihash"))
+ self.logger.info("Logging testcase specific show commands.")
+ self.show_commands_at_teardown()
+ if self.remove_configured_vpp_objects_on_tear_down:
+ self.registry.remove_vpp_config(self.logger)
+ # Save/Dump VPP api trace log
+ m = self._testMethodName
+ api_trace = "vpp_api_trace.%s.%d.log" % (m, self.vpp.pid)
+ tmp_api_trace = "/tmp/%s" % api_trace
+ vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
+ self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
+ self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
+ shutil.move(tmp_api_trace, vpp_api_trace_log)
+ except VppTransportSocketIOError:
+ self.logger.debug(
+ "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
+ )
+ self.vpp_dead = True
+ else:
+ self.registry.unregister_all(self.logger)
+ # Remove any leftover pcap files
+ if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path)
+ for p in Path(testcase_dir).glob("pg*.pcap"):
+ self.unlink_testcase_file(p)
+ self.logger.debug(
+ f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+
+ def setUp(self):
+ """Clear trace before running each test"""
+ super(VppAsfTestCase, self).setUp()
+ self.logger.debug(
+ f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+ # Save testname include in pcap history filenames
+ if hasattr(self, "pg_interfaces"):
+ for i in self.pg_interfaces:
+ i.test_name = self._testMethodName
+ self.reporter.send_keep_alive(self)
+ if self.vpp_dead:
+ self.wait_for_coredump()
+ raise VppDiedError(
+ rv=None,
+ testcase=self.__class__.__name__,
+ method_name=self._testMethodName,
+ )
+ self.sleep(0.1, "during setUp")
+ self.vpp_stdout_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n"
+ % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ )
+ self.vpp_stderr_deque.append(
+ "--- test setUp() for %s.%s(%s) starts here ---\n"
+ % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ )
+ self.vapi.cli("clear trace")
+ # store the test instance inside the test class - so that objects
+ # holding the class can access instance methods (like assertEqual)
+ type(self).test_instance = self
+ self.logger.debug(
+ f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+
+ @classmethod
+ def get_vpp_time(cls):
+ # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
+ # returns float("2.190522")
+ timestr = cls.vapi.cli("show clock")
+ head, sep, tail = timestr.partition(",")
+ head, sep, tail = head.partition("Time now")
+ return float(tail)
+
+ @classmethod
+ def sleep_on_vpp_time(cls, sec):
+ """Sleep according to time in VPP world"""
+ # On a busy system with many processes
+ # we might end up with VPP time being slower than real world
+ # So take that into account when waiting for VPP to do something
+ start_time = cls.get_vpp_time()
+ while cls.get_vpp_time() - start_time < sec:
+ cls.sleep(0.1)
+
+ @classmethod
+ def create_loopback_interfaces(cls, count):
+ """
+ Create loopback interfaces.
+
+ :param count: number of interfaces created.
+ :returns: List of created interfaces.
+ """
+ result = [VppLoInterface(cls) for i in range(count)]
+ for intf in result:
+ setattr(cls, intf.name, intf)
+ cls.lo_interfaces = result
+ return result
+
+ def assert_equal(self, real_value, expected_value, name_or_class=None):
+ if name_or_class is None:
+ self.assertEqual(real_value, expected_value)
+ return
+ try:
+ msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
+ msg = msg % (
+ getdoc(name_or_class).strip(),
+ real_value,
+ str(name_or_class(real_value)),
+ expected_value,
+ str(name_or_class(expected_value)),
+ )
+ except Exception:
+ msg = "Invalid %s: %s does not match expected value %s" % (
+ name_or_class,
+ real_value,
+ expected_value,
+ )
+
+ self.assertEqual(real_value, expected_value, msg)
+
+ def assert_in_range(self, real_value, expected_min, expected_max, name=None):
+ if name is None:
+ msg = None
+ else:
+ msg = "Invalid %s: %s out of range <%s,%s>" % (
+ name,
+ real_value,
+ expected_min,
+ expected_max,
+ )
+ self.assertTrue(expected_min <= real_value <= expected_max, msg)
+
+ def get_counter(self, counter):
+ if counter.startswith("/"):
+ counter_value = self.statistics.get_counter(counter)
+ else:
+ counters = self.vapi.cli("sh errors").split("\n")
+ counter_value = 0
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == counter:
+ counter_value = int(results[0])
+ break
+ return counter_value
+
+ def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
+ c = self.get_counter(counter)
+ if thread is not None:
+ c = c[thread][index]
+ else:
+ c = sum(x[index] for x in c)
+ self.logger.debug(
+ "validate counter `%s[%s]', expected: %s, real value: %s"
+ % (counter, index, expected_value, c)
+ )
+ self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
+
+ def assert_error_counter_equal(self, counter, expected_value):
+ counter_value = self.statistics[counter].sum()
+ self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
+
+ @classmethod
+ def sleep(cls, timeout, remark=None):
+ # /* Allow sleep(0) to maintain win32 semantics, and as decreed
+ # * by Guido, only the main thread can be interrupted.
+ # */
+ # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
+ if timeout == 0:
+ # yield quantum
+ if hasattr(os, "sched_yield"):
+ os.sched_yield()
+ else:
+ time.sleep(0)
+ return
+
+ cls.logger.debug("Starting sleep for %es (%s)", timeout, remark)
+ before = time.time()
+ time.sleep(timeout)
+ after = time.time()
+ if after - before > 2 * timeout:
+ cls.logger.error(
+ "unexpected self.sleep() result - slept for %es instead of ~%es!",
+ after - before,
+ timeout,
+ )
+
+ cls.logger.debug(
+ "Finished sleep (%s) - slept %es (wanted %es)",
+ remark,
+ after - before,
+ timeout,
+ )
+
+ def virtual_sleep(self, timeout, remark=None):
+ self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
+ self.vapi.cli("set clock adjust %s" % timeout)
+
+ def snapshot_stats(self, stats_diff):
+ """Return snapshot of interesting stats based on diff dictionary."""
+ stats_snapshot = {}
+ for sw_if_index in stats_diff:
+ for counter in stats_diff[sw_if_index]:
+ stats_snapshot[counter] = self.statistics[counter]
+ self.logger.debug(f"Took statistics stats_snapshot: {stats_snapshot}")
+ return stats_snapshot
+
+ def compare_stats_with_snapshot(self, stats_diff, stats_snapshot):
+ """Assert appropriate difference between current stats and snapshot."""
+ for sw_if_index in stats_diff:
+ for cntr, diff in stats_diff[sw_if_index].items():
+ if sw_if_index == "err":
+ self.assert_equal(
+ self.statistics[cntr].sum(),
+ stats_snapshot[cntr].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr].sum()}, "
+ f"expected diff: {diff})",
+ )
+ else:
+ try:
+ self.assert_equal(
+ self.statistics[cntr][:, sw_if_index].sum(),
+ stats_snapshot[cntr][:, sw_if_index].sum() + diff,
+ f"'{cntr}' counter value (previous value: "
+ f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
+ f"expected diff: {diff})",
+ )
+ except IndexError as e:
+ # if diff is 0, then this most probably a case where
+ # test declares multiple interfaces but traffic hasn't
+ # passed through this one yet - which means the counter
+ # value is 0 and can be ignored
+ if 0 != diff:
+ raise Exception(
+ f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
+ ) from e
+
+
+def get_testcase_doc_name(test):
+ return getdoc(test.__class__).splitlines()[0]
+
+
+def get_test_description(descriptions, test):
+ short_description = test.shortDescription()
+ if descriptions and short_description:
+ return short_description
+ else:
+ return str(test)
+
+
+def get_failed_testcase_linkname(failed_dir, testcase_dirname):
+ return os.path.join(failed_dir, f"{testcase_dirname}-FAILED")
+
+
+def get_testcase_dirname(testcase_class_name):
+ return f"vpp-unittest-{testcase_class_name}"
+
+
+class TestCaseInfo(object):
+ def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
+ self.logger = logger
+ self.tempdir = tempdir
+ self.vpp_pid = vpp_pid
+ self.vpp_bin_path = vpp_bin_path
+ self.core_crash_test = None
+
+
+class VppTestResult(unittest.TestResult):
+ """
+ @property result_string
+ String variable to store the test case result string.
+ @property errors
+ List variable containing 2-tuples of TestCase instances and strings
+ holding formatted tracebacks. Each tuple represents a test which
+ raised an unexpected exception.
+ @property failures
+ List variable containing 2-tuples of TestCase instances and strings
+ holding formatted tracebacks. Each tuple represents a test where
+ a failure was explicitly signalled using the TestCase.assert*()
+ methods.
+ """
+
+ failed_test_cases_info = set()
+ core_crash_test_cases_info = set()
+ current_test_case_info = None
+
+ def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
+ """
+ :param stream File descriptor to store where to report test results.
+ Set to the standard error stream by default.
+ :param descriptions Boolean variable to store information if to use
+ test case descriptions.
+ :param verbosity Integer variable to store required verbosity level.
+ """
+ super(VppTestResult, self).__init__(stream, descriptions, verbosity)
+ self.stream = stream
+ self.descriptions = descriptions
+ self.verbosity = verbosity
+ self.result_code = TestResultCode.TEST_RUN
+ self.result_string = None
+ self.runner = runner
+ self.printed = []
+
+ def decodePcapFiles(self, test, when_configured=False):
+ if when_configured == False or config.decode_pcaps == True:
+ if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path)
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, f"suite{test.__class__.__name__}"
+ )
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, test._testMethodName
+ )
+
+ def addSuccess(self, test):
+ """
+ Record a test succeeded result
+
+ :param test:
+
+ """
+ self.log_result("addSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
+ unittest.TestResult.addSuccess(self, test)
+ self.result_string = colorize("OK", GREEN)
+ self.result_code = TestResultCode.PASS
+ self.send_result_through_pipe(test, self.result_code)
+
+ def addExpectedFailure(self, test, err):
+ self.log_result("addExpectedFailure", test, err)
+ self.decodePcapFiles(test)
+ super().addExpectedFailure(test, err)
+ self.result_string = colorize("FAIL", GREEN)
+ self.result_code = TestResultCode.EXPECTED_FAIL
+ self.send_result_through_pipe(test, self.result_code)
+
+ def addUnexpectedSuccess(self, test):
+ self.log_result("addUnexpectedSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
+ super().addUnexpectedSuccess(test)
+ self.result_string = colorize("OK", RED)
+ self.result_code = TestResultCode.UNEXPECTED_PASS
+ self.send_result_through_pipe(test, self.result_code)
+
+ def addSkip(self, test, reason):
+ """
+ Record a test skipped.
+
+ :param test:
+ :param reason:
+
+ """
+ self.log_result("addSkip", test, reason=reason)
+ unittest.TestResult.addSkip(self, test, reason)
+ self.result_string = colorize("SKIP", YELLOW)
+
+ if reason == "not enough cpus":
+ self.result_code = TestResultCode.SKIP_CPU_SHORTAGE
+ else:
+ self.result_code = TestResultCode.SKIP
+ self.send_result_through_pipe(test, self.result_code)
+
+ def symlink_failed(self):
+ if self.current_test_case_info:
+ try:
+ failed_dir = config.failed_dir
+ link_path = get_failed_testcase_linkname(
+ failed_dir, os.path.basename(self.current_test_case_info.tempdir)
+ )
+
+ self.current_test_case_info.logger.debug(
+ "creating a link to the failed test"
+ )
+ self.current_test_case_info.logger.debug(
+ "os.symlink(%s, %s)"
+ % (self.current_test_case_info.tempdir, link_path)
+ )
+ if os.path.exists(link_path):
+ self.current_test_case_info.logger.debug("symlink already exists")
+ else:
+ os.symlink(self.current_test_case_info.tempdir, link_path)
+
+ except Exception as e:
+ self.current_test_case_info.logger.error(e)
+
+ def send_result_through_pipe(self, test, result):
+ if hasattr(self, "test_framework_result_pipe"):
+ pipe = self.test_framework_result_pipe
+ if pipe:
+ pipe.send((test.id(), result))
+
+ def log_result(self, fn, test, err=None, reason=None):
+ if self.current_test_case_info:
+ if isinstance(test, unittest.suite._ErrorHolder):
+ test_name = test.description
+ else:
+ test_name = "%s.%s(%s)" % (
+ test.__class__.__name__,
+ test._testMethodName,
+ test._testMethodDoc,
+ )
+ extra_msg = ""
+ if err:
+ extra_msg += f", error is {err}"
+ if reason:
+ extra_msg += f", reason is {reason}"
+ self.current_test_case_info.logger.debug(
+ f"--- {fn}() {test_name} called{extra_msg}"
+ )
+ if err:
+ self.current_test_case_info.logger.debug(
+ "formatted exception is:\n%s" % "".join(format_exception(*err))
+ )
+
+ def add_error(self, test, err, unittest_fn, result_code):
+ self.result_code = result_code
+ if result_code == TestResultCode.FAIL:
+ self.log_result("addFailure", test, err=err)
+ error_type_str = colorize("FAIL", RED)
+ elif result_code == TestResultCode.ERROR:
+ self.log_result("addError", test, err=err)
+ error_type_str = colorize("ERROR", RED)
+ else:
+ raise Exception(f"Unexpected result code {result_code}")
+ self.decodePcapFiles(test)
+
+ unittest_fn(self, test, err)
+ if self.current_test_case_info:
+ self.result_string = "%s [ temp dir used by test case: %s ]" % (
+ error_type_str,
+ self.current_test_case_info.tempdir,
+ )
+ self.symlink_failed()
+ self.failed_test_cases_info.add(self.current_test_case_info)
+ if is_core_present(self.current_test_case_info.tempdir):
+ if not self.current_test_case_info.core_crash_test:
+ if isinstance(test, unittest.suite._ErrorHolder):
+ test_name = str(test)
+ else:
+ test_name = "'{!s}' ({!s})".format(
+ get_testcase_doc_name(test), test.id()
+ )
+ self.current_test_case_info.core_crash_test = test_name
+ self.core_crash_test_cases_info.add(self.current_test_case_info)
+ else:
+ self.result_string = "%s [no temp dir]" % error_type_str
+
+ self.send_result_through_pipe(test, result_code)
+
+ def addFailure(self, test, err):
+ """
+ Record a test failed result
+
+ :param test:
+ :param err: error message
+
+ """
+ self.add_error(test, err, unittest.TestResult.addFailure, TestResultCode.FAIL)
+
+ def addError(self, test, err):
+ """
+ Record a test error result
+
+ :param test:
+ :param err: error message
+
+ """
+ self.add_error(test, err, unittest.TestResult.addError, TestResultCode.ERROR)
+
+ def getDescription(self, test):
+ """
+ Get test description
+
+ :param test:
+ :returns: test description
+
+ """
+ return get_test_description(self.descriptions, test)
+
+ def startTest(self, test):
+ """
+ Start a test
+
+ :param test:
+
+ """
+
+ def print_header(test):
+ if test.__class__ in self.printed:
+ return
+
+ test_doc = getdoc(test)
+ if not test_doc:
+ raise Exception("No doc string for test '%s'" % test.id())
+
+ test_title = test_doc.splitlines()[0].rstrip()
+ test_title = colorize(test_title, GREEN)
+ if test.is_tagged_run_solo():
+ test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
+
+ # This block may overwrite the colorized title above,
+ # but we want this to stand out and be fixed
+ if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+ test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
+
+ if test.has_tag(TestCaseTag.FIXME_ASAN):
+ test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
+ test.skip_fixme_asan()
+
+ if hasattr(test, "vpp_worker_count"):
+ if test.vpp_worker_count == 0:
+ test_title += " [main thread only]"
+ elif test.vpp_worker_count == 1:
+ test_title += " [1 worker thread]"
+ else:
+ test_title += f" [{test.vpp_worker_count} worker threads]"
+
+ if test.__class__.skipped_due_to_cpu_lack:
+ test_title = colorize(
+ f"{test_title} [skipped - not enough cpus, "
+ f"required={test.__class__.get_cpus_required()}, "
+ f"available={max_vpp_cpus}]",
+ YELLOW,
+ )
+
+ print(double_line_delim)
+ print(test_title)
+ print(double_line_delim)
+ self.printed.append(test.__class__)
+
+ print_header(test)
+ self.start_test = time.time()
+ unittest.TestResult.startTest(self, test)
+ if self.verbosity > 0:
+ self.stream.writeln("Starting " + self.getDescription(test) + " ...")
+ self.stream.writeln(single_line_delim)
+
+ def stopTest(self, test):
+ """
+ Called when the given test has been run
+
+ :param test:
+
+ """
+ unittest.TestResult.stopTest(self, test)
+
+ result_code_to_suffix = {
+ TestResultCode.PASS: "",
+ TestResultCode.FAIL: "",
+ TestResultCode.ERROR: "",
+ TestResultCode.SKIP: "",
+ TestResultCode.TEST_RUN: "",
+ TestResultCode.SKIP_CPU_SHORTAGE: "",
+ TestResultCode.EXPECTED_FAIL: " [EXPECTED FAIL]",
+ TestResultCode.UNEXPECTED_PASS: " [UNEXPECTED PASS]",
+ }
+
+ if self.verbosity > 0:
+ self.stream.writeln(single_line_delim)
+ self.stream.writeln(
+ "%-72s%s%s"
+ % (
+ self.getDescription(test),
+ self.result_string,
+ result_code_to_suffix[self.result_code],
+ )
+ )
+ self.stream.writeln(single_line_delim)
+ else:
+ self.stream.writeln(
+ "%-67s %4.2f %s%s"
+ % (
+ self.getDescription(test),
+ time.time() - self.start_test,
+ self.result_string,
+ result_code_to_suffix[self.result_code],
+ )
+ )
+
+ self.send_result_through_pipe(test, TestResultCode.TEST_RUN)
+
+ def printErrors(self):
+ """
+ Print errors from running the test case
+ """
+ if len(self.errors) > 0 or len(self.failures) > 0:
+ self.stream.writeln()
+ self.printErrorList("ERROR", self.errors)
+ self.printErrorList("FAIL", self.failures)
+
+ # ^^ that is the last output from unittest before summary
+ if not self.runner.print_summary:
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
+ self.stream = devnull
+ self.runner.stream = devnull
+
+ def printErrorList(self, flavour, errors):
+ """
+ Print error list to the output stream together with error type
+ and test case description.
+
+ :param flavour: error type
+ :param errors: iterable errors
+
+ """
+ for test, err in errors:
+ self.stream.writeln(double_line_delim)
+ self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
+ self.stream.writeln(single_line_delim)
+ self.stream.writeln("%s" % err)
+
+
+class VppTestRunner(unittest.TextTestRunner):
+ """
+ A basic test runner implementation which prints results to standard error.
+ """
+
+ @property
+ def resultclass(self):
+ """Class maintaining the results of the tests"""
+ return VppTestResult
+
+ def __init__(
+ self,
+ keep_alive_pipe=None,
+ descriptions=True,
+ verbosity=1,
+ result_pipe=None,
+ failfast=False,
+ buffer=False,
+ resultclass=None,
+ print_summary=True,
+ **kwargs,
+ ):
+ # ignore stream setting here, use hard-coded stdout to be in sync
+ # with prints from VppAsfTestCase methods ...
+ super(VppTestRunner, self).__init__(
+ sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
+ )
+ KeepAliveReporter.pipe = keep_alive_pipe
+
+ self.orig_stream = self.stream
+ self.resultclass.test_framework_result_pipe = result_pipe
+
+ self.print_summary = print_summary
+
+ def _makeResult(self):
+ return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
+
+ def run(self, test):
+ """
+ Run the tests
+
+ :param test:
+
+ """
+ faulthandler.enable() # emit stack trace to stderr if killed by signal
+
+ result = super(VppTestRunner, self).run(test)
+ if not self.print_summary:
+ self.stream = self.orig_stream
+ result.stream = self.orig_stream
+ return result
+
+
+class Worker(Thread):
+ def __init__(self, executable_args, logger, env=None, *args, **kwargs):
+ super(Worker, self).__init__(*args, **kwargs)
+ self.logger = logger
+ self.args = executable_args
+ if hasattr(self, "testcase") and self.testcase.debug_all:
+ if self.testcase.debug_gdbserver:
+ self.args = [
+ "/usr/bin/gdbserver",
+ "localhost:{port}".format(port=self.testcase.gdbserver_port),
+ ] + args
+ elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
+ self.args.append(self.wait_for_gdb)
+ self.app_bin = executable_args[0]
+ self.app_name = os.path.basename(self.app_bin)
+ if hasattr(self, "role"):
+ self.app_name += " {role}".format(role=self.role)
+ self.process = None
+ self.result = None
+ env = {} if env is None else env
+ self.env = copy.deepcopy(env)
+
+ def wait_for_enter(self):
+ if not hasattr(self, "testcase"):
+ return
+ if self.testcase.debug_all and self.testcase.debug_gdbserver:
+ print()
+ print(double_line_delim)
+ print(
+ "Spawned GDB Server for '{app}' with PID: {pid}".format(
+ app=self.app_name, pid=self.process.pid
+ )
+ )
+ elif self.testcase.debug_all and self.testcase.debug_gdb:
+ print()
+ print(double_line_delim)
+ print(
+ "Spawned '{app}' with PID: {pid}".format(
+ app=self.app_name, pid=self.process.pid
+ )
+ )
+ else:
+ return
+ print(single_line_delim)
+ print("You can debug '{app}' using:".format(app=self.app_name))
+ if self.testcase.debug_gdbserver:
+ print(
+ "sudo gdb "
+ + self.app_bin
+ + " -ex 'target remote localhost:{port}'".format(
+ port=self.testcase.gdbserver_port
+ )
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command, set up breakpoints etc., then resume from "
+ "within gdb by issuing the 'continue' command"
+ )
+ self.testcase.gdbserver_port += 1
+ elif self.testcase.debug_gdb:
+ print(
+ "sudo gdb "
+ + self.app_bin
+ + " -ex 'attach {pid}'".format(pid=self.process.pid)
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command and set up breakpoints etc., then resume from"
+ " within gdb by issuing the 'continue' command"
+ )
+ print(single_line_delim)
+ input("Press ENTER to continue running the testcase...")
+
+ def run(self):
+ executable = self.args[0]
+ if not os.path.exists(executable) or not os.access(
+ executable, os.F_OK | os.X_OK
+ ):
+ # Exit code that means some system file did not exist,
+ # could not be opened, or had some other kind of error.
+ self.result = os.EX_OSFILE
+ raise EnvironmentError(
+ "executable '%s' is not found or executable." % executable
+ )
+ self.logger.debug(
+ "Running executable '{app}': '{cmd}'".format(
+ app=self.app_name, cmd=" ".join(self.args)
+ )
+ )
+ env = os.environ.copy()
+ env.update(self.env)
+ env["CK_LOG_FILE_NAME"] = "-"
+ self.process = subprocess.Popen(
+ ["stdbuf", "-o0", "-e0"] + self.args,
+ shell=False,
+ env=env,
+ preexec_fn=os.setpgrp,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ self.wait_for_enter()
+ out, err = self.process.communicate()
+ self.logger.debug("Finished running `{app}'".format(app=self.app_name))
+ self.logger.info("Return code is `%s'" % self.process.returncode)
+ self.logger.info(single_line_delim)
+ self.logger.info(
+ "Executable `{app}' wrote to stdout:".format(app=self.app_name)
+ )
+ self.logger.info(single_line_delim)
+ self.logger.info(out.decode("utf-8"))
+ self.logger.info(single_line_delim)
+ self.logger.info(
+ "Executable `{app}' wrote to stderr:".format(app=self.app_name)
+ )
+ self.logger.info(single_line_delim)
+ self.logger.info(err.decode("utf-8"))
+ self.logger.info(single_line_delim)
+ self.result = self.process.returncode
+
+
+if __name__ == "__main__":
+ pass
diff --git a/test/asf/debug_internal.py b/test/asf/debug_internal.py
new file mode 100644
index 00000000000..fe10db7ee7f
--- /dev/null
+++ b/test/asf/debug_internal.py
@@ -0,0 +1,40 @@
+import gc
+import pprint
+import vpp_papi
+from vpp_papi_provider import VppPapiProvider
+import objgraph
+from pympler import tracker
+
+tr = tracker.SummaryTracker()
+
+"""
+ Internal debug module
+
+ The module provides functions for debugging test framework
+"""
+
+
+def on_tear_down_class(cls):
+ gc.collect()
+ tr.print_diff()
+ objects = gc.get_objects()
+ counter = 0
+ with open(cls.tempdir + "/python_objects.txt", "w") as f:
+ interesting = [
+ o for o in objects if isinstance(o, (VppPapiProvider, vpp_papi.VPP))
+ ]
+ del objects
+ gc.collect()
+ for o in interesting:
+ objgraph.show_backrefs(
+ [o], max_depth=5, filename="%s/%s.png" % (cls.tempdir, counter)
+ )
+ counter += 1
+ refs = gc.get_referrers(o)
+ pp = pprint.PrettyPrinter(indent=2)
+ f.write("%s\n" % pp.pformat(o))
+ for r in refs:
+ try:
+ f.write("%s\n" % pp.pformat(r))
+ except:
+ f.write("%s\n" % type(r))
diff --git a/test/asf/test_adl.py b/test/asf/test_adl.py
new file mode 100644
index 00000000000..7e5ca8dcbe3
--- /dev/null
+++ b/test/asf/test_adl.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestAdl(VppAsfTestCase):
+ """Allow/Deny Plugin Unit Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestAdl, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestAdl, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestAdl, self).setUp()
+
+ def tearDown(self):
+ super(TestAdl, self).tearDown()
+
+ def test_adl1_unittest(self):
+ """Plugin API Test"""
+ cmds = [
+ "loop create\n",
+ "set int ip address loop0 192.168.1.1/24\n",
+ "set int ip6 table loop0 0\n",
+ "set int ip address loop0 2001:db01::1/64\n",
+ "set int state loop0 up\n",
+ "packet-generator new {\n"
+ " name ip4\n"
+ " limit 100\n"
+ " rate 0\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node adl-input\n"
+ " data { IP4: 1.2.40 -> 3cfd.fed0.b6c8\n"
+ " UDP: 192.168.1.2-192.168.1.10 -> 192.168.2.1\n"
+ " UDP: 1234 -> 2345\n"
+ " incrementing 114\n"
+ " }\n"
+ " }\n",
+ "packet-generator new {\n"
+ " name ip6-allow\n"
+ " limit 50\n"
+ " rate 0\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node adl-input\n"
+ " data { IP6: 1.2.40 -> 3cfd.fed0.b6c8\n"
+ " UDP: 2001:db01::2 -> 2001:db01::1\n"
+ " UDP: 1234 -> 2345\n"
+ " incrementing 80\n"
+ " }\n"
+ " }\n",
+ "packet-generator new {\n"
+ " name ip6-drop\n"
+ " limit 50\n"
+ " rate 0\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node adl-input\n"
+ " data { IP6: 1.2.40 -> 3cfd.fed0.b6c8\n"
+ " UDP: 2001:db01::3 -> 2001:db01::1\n"
+ " UDP: 1234 -> 2345\n"
+ " incrementing 80\n"
+ " }\n"
+ " }\n",
+ "ip table 1\n",
+ "ip route add 192.168.2.1/32 via drop\n",
+ "ip route add table 1 192.168.1.2/32 via local\n",
+ "ip6 table 1\n",
+ "ip route add 2001:db01::1/128 via drop\n",
+ "ip route add table 1 2001:db01::2/128 via local\n",
+ "bin adl_interface_enable_disable loop0\n",
+ "bin adl_allowlist_enable_disable loop0 fib-id 1 ip4 ip6\n",
+ "pa en\n",
+ ]
+
+ for cmd in cmds:
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, "reply"):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+
+ total_pkts = self.statistics.get_err_counter(
+ "/err/adl-input/Allow/Deny packets processed"
+ )
+
+ self.assertEqual(total_pkts, 200)
+
+ ip4_allow = self.statistics.get_err_counter(
+ "/err/ip4-adl-allowlist/ip4 allowlist allowed"
+ )
+ self.assertEqual(ip4_allow, 12)
+ ip6_allow = self.statistics.get_err_counter(
+ "/err/ip6-adl-allowlist/ip6 allowlist allowed"
+ )
+ self.assertEqual(ip6_allow, 50)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_api_client.py b/test/asf/test_api_client.py
new file mode 100644
index 00000000000..3f0fc8a1020
--- /dev/null
+++ b/test/asf/test_api_client.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestAPIClient(VppAsfTestCase):
+ """API Internal client Test Cases"""
+
+ def test_client_unittest(self):
+ """Internal API client"""
+ error = self.vapi.cli("test api internal")
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_api_trace.py b/test/asf/test_api_trace.py
new file mode 100644
index 00000000000..8776a79f0ac
--- /dev/null
+++ b/test/asf/test_api_trace.py
@@ -0,0 +1,61 @@
+import unittest
+from asfframework import VppAsfTestCase, VppTestRunner
+import json
+import shutil
+
+
+class TestJsonApiTrace(VppAsfTestCase):
+ """JSON API trace related tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestJsonApiTrace, cls).setUpClass()
+
+ def setUp(self):
+ self.vapi.cli("api trace free")
+ self.vapi.cli("api trace on")
+ self.vapi.cli("api trace tx on")
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestJsonApiTrace, cls).tearDownClass()
+
+ def test_json_api_trace_save(self):
+ self.vapi.show_version()
+
+ fname = "test_api_trace-%d.json" % self.vpp.pid
+ tmp_api_trace = "/tmp/%s" % fname
+ fpath = "%s/%s" % (self.tempdir, fname)
+ self.vapi.cli("api trace save-json {}".format(fname))
+ shutil.move(tmp_api_trace, fpath)
+ with open(fpath, encoding="utf-8") as f:
+ s = f.read()
+ trace = json.loads(s)
+ found = False
+ for o in trace:
+ if o["_msgname"] == "show_version":
+ found = True
+ break
+ self.assertTrue(found)
+ self.assertEquals(o["_msgname"], "show_version")
+
+ def test_json_api_trace_replay(self):
+ fname = "/tmp/create_loop.json"
+ req = """
+[
+{
+ "_msgname": "create_loopback",
+ "_crc": "42bb5d22",
+ "mac_address": "00:00:00:00:00:00"
+}]
+"""
+ with open(fname, "w") as f:
+ f.write(req)
+ self.vapi.cli("api trace replay-json {}".format(fname))
+ r = self.vapi.sw_interface_dump(name_filter="loop", name_filter_valid=True)
+ self.assertEqual(len(r), 1)
+ self.assertEqual(r[0].interface_name, "loop0")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_bihash.py b/test/asf/test_bihash.py
new file mode 100644
index 00000000000..b7df894be05
--- /dev/null
+++ b/test/asf/test_bihash.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestBihash(VppAsfTestCase):
+ """Bihash Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ # increase vapi timeout, to avoid spurious "test bihash ..."
+ # failures reported on aarch64 w/ test-debug
+ cls.vapi_response_timeout = 20
+ super(TestBihash, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestBihash, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestBihash, self).setUp()
+
+ def tearDown(self):
+ super(TestBihash, self).tearDown()
+
+ def test_bihash_unittest(self):
+ """Bihash Add/Del Test"""
+ error = self.vapi.cli("test bihash careful 0 verbose 0")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def test_bihash_thread(self):
+ """Bihash Thread Test"""
+
+ error = self.vapi.cli(
+ "test bihash threads 2 nbuckets" + " 64000 careful 0 verbose 0"
+ )
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def test_bihash_vec64(self):
+ """Bihash vec64 Test"""
+
+ error = self.vapi.cli("test bihash vec64")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ @unittest.skipUnless(config.gcov, "part of code coverage tests")
+ def test_bihash_coverage(self):
+ """Improve Code Coverage"""
+
+ error = self.vapi.cli(
+ "test bihash nitems 10 ncycles 3"
+ + "search 2 careful 1 verbose 2 non-random-keys"
+ )
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ error = self.vapi.cli(
+ "test bihash nitems 10 nbuckets 1 ncycles 3"
+ + "search 2 careful 1 verbose 2 non-random-keys"
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_buffers.py b/test/asf/test_buffers.py
new file mode 100644
index 00000000000..d22326f172a
--- /dev/null
+++ b/test/asf/test_buffers.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python3
+
+from asfframework import VppAsfTestCase
+
+
+class TestBuffers(VppAsfTestCase):
+ """Buffer C Unit Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestBuffers, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestBuffers, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestBuffers, self).setUp()
+
+ def tearDown(self):
+ super(TestBuffers, self).tearDown()
+
+ def test_linearize(self):
+ """Chained Buffer Linearization"""
+ error = self.vapi.cli("test chained-buffer-linearization")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
diff --git a/test/asf/test_cli.py b/test/asf/test_cli.py
new file mode 100644
index 00000000000..25ce3330d54
--- /dev/null
+++ b/test/asf/test_cli.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python3
+"""CLI functional tests"""
+
+import unittest
+
+from vpp_papi import VPPIOError
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestCLI(VppAsfTestCase):
+ """CLI Test Case"""
+
+ maxDiff = None
+
+ @classmethod
+ def setUpClass(cls):
+ # using the framework default
+ cls.vapi_response_timeout = 5
+ super(TestCLI, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCLI, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestCLI, self).setUp()
+
+ def tearDown(self):
+ super(TestCLI, self).tearDown()
+
+ def test_cli_retval(self):
+ """CLI inband retval"""
+ rv = self.vapi.papi.cli_inband(cmd="this command does not exist")
+ self.assertNotEqual(rv.retval, 0)
+
+ rv = self.vapi.papi.cli_inband(cmd="show version")
+ self.assertEqual(rv.retval, 0)
+
+ def test_long_cli_delay(self):
+ """Test that VppApiClient raises VppIOError if timeout.""" # noqa
+ with self.assertRaises(VPPIOError) as ctx:
+ rv = self.vapi.papi.cli_inband(cmd="wait 10")
+
+ def test_long_cli_delay_override(self):
+ """Test per-command _timeout option.""" # noqa
+ rv = self.vapi.papi.cli_inband(cmd="wait 10", _timeout=15)
+ self.assertEqual(rv.retval, 0)
+
+
+class TestCLIExtendedVapiTimeout(VppAsfTestCase):
+ maxDiff = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.vapi_response_timeout = 15
+ cls.__doc__ = (
+ " CLI Test Case w/ Extended (%ssec) Vapi Timeout "
+ % cls.vapi_response_timeout
+ )
+ super(TestCLIExtendedVapiTimeout, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCLIExtendedVapiTimeout, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestCLIExtendedVapiTimeout, self).setUp()
+
+ def tearDown(self):
+ super(TestCLIExtendedVapiTimeout, self).tearDown()
+
+ def test_long_cli_delay(self):
+ """Test that delayed result returns with extended timeout."""
+ wait_secs = self.vapi_response_timeout - 1
+
+ # get vpp time as float
+ start = self.vapi.papi.show_vpe_system_time(
+ _no_type_conversion=True
+ ).vpe_system_time
+ rv = self.vapi.papi.cli_inband(cmd="wait %s" % wait_secs)
+ now = self.vapi.papi.show_vpe_system_time(
+ _no_type_conversion=True
+ ).vpe_system_time
+
+ # assume that the overhead of the measurement is not more that .5 sec.
+ self.assertEqual(round(now - start), wait_secs)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_counters.py b/test/asf/test_counters.py
new file mode 100644
index 00000000000..086189ae517
--- /dev/null
+++ b/test/asf/test_counters.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+
+from asfframework import VppAsfTestCase, tag_fixme_vpp_workers
+
+
+@tag_fixme_vpp_workers
+class TestCounters(VppAsfTestCase):
+ """Counters C Unit Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestCounters, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCounters, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestCounters, self).setUp()
+
+ def tearDown(self):
+ super(TestCounters, self).tearDown()
+
+ def test_counter_simple_expand(self):
+ """Simple Counter Expand"""
+ error = self.vapi.cli("test counter simple expand")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def test_counter_combined_expand(self):
+ """Combined Counter Expand"""
+ error = self.vapi.cli("test counter combined expand")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
diff --git a/test/asf/test_crypto.py b/test/asf/test_crypto.py
new file mode 100644
index 00000000000..56c96b69575
--- /dev/null
+++ b/test/asf/test_crypto.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestCrypto(VppAsfTestCase):
+ """Crypto Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestCrypto, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCrypto, cls).tearDownClass()
+
+ def test_crypto(self):
+ """Crypto Unit Tests"""
+ error = self.vapi.cli("test crypto")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("FAIL", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_endian.py b/test/asf/test_endian.py
new file mode 100644
index 00000000000..9caed0efc4a
--- /dev/null
+++ b/test/asf/test_endian.py
@@ -0,0 +1,43 @@
+# Copyright (c) 2019. Vinci Consulting Corp. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from asfframework import VppAsfTestCase
+import vpp_papi_provider
+
+F64_ONE = 1.0
+
+
+class TestEndian(VppAsfTestCase):
+ """TestEndian"""
+
+ def test_f64_endian_value(self):
+ try:
+ rv = self.vapi.get_f64_endian_value(f64_one=F64_ONE)
+ self.assertEqual(
+ rv.f64_one_result,
+ F64_ONE,
+ "client incorrectly deserializes f64 values. "
+ "Expected: %r. Received: %r." % (F64_ONE, rv.f64_one_result),
+ )
+ except vpp_papi_provider.UnexpectedApiReturnValueError:
+ self.fail("client incorrectly serializes f64 values.")
+
+ def test_get_f64_increment_by_one(self):
+ expected = 43.0
+ rv = self.vapi.get_f64_increment_by_one(f64_value=42.0)
+ self.assertEqual(
+ rv.f64_value,
+ expected,
+ "Expected %r, received:%r." % (expected, rv.f64_value),
+ )
diff --git a/test/asf/test_fib.py b/test/asf/test_fib.py
new file mode 100644
index 00000000000..9d391f57ed1
--- /dev/null
+++ b/test/asf/test_fib.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner, tag_fixme_vpp_workers
+
+
+@tag_fixme_vpp_workers
+class TestFIB(VppAsfTestCase):
+ """FIB Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestFIB, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestFIB, cls).tearDownClass()
+
+ def test_fib(self):
+ """FIB Unit Tests"""
+ error = self.vapi.cli("test fib")
+
+ # shameless test of CLIs to bump lcov results...
+ # no i mean to ensure they don't crash
+ self.logger.info(self.vapi.cli("sh fib source"))
+ self.logger.info(self.vapi.cli("sh fib source prio"))
+ self.logger.info(self.vapi.cli("sh fib memory"))
+ self.logger.info(self.vapi.cli("sh fib entry"))
+ self.logger.info(self.vapi.cli("sh fib entry 0"))
+ self.logger.info(self.vapi.cli("sh fib entry 10000"))
+ self.logger.info(self.vapi.cli("sh fib entry-delegate"))
+ self.logger.info(self.vapi.cli("sh fib paths"))
+ self.logger.info(self.vapi.cli("sh fib paths 0"))
+ self.logger.info(self.vapi.cli("sh fib paths 10000"))
+ self.logger.info(self.vapi.cli("sh fib path-list"))
+ self.logger.info(self.vapi.cli("sh fib path-list 0"))
+ self.logger.info(self.vapi.cli("sh fib path-list 10000"))
+ self.logger.info(self.vapi.cli("sh fib walk"))
+ self.logger.info(self.vapi.cli("sh fib uRPF"))
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("Failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_http.py b/test/asf/test_http.py
new file mode 100644
index 00000000000..64f911c7bfa
--- /dev/null
+++ b/test/asf/test_http.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python3
+""" Vpp HTTP tests """
+
+import unittest
+import http.client
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+@unittest.skip("Requires root")
+class TestHttpTps(VppAsfTestCase):
+ """HTTP test class"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestHttpTps, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestHttpTps, cls).tearDownClass()
+
+ def setUp(self):
+ self.client_ip4 = "172.0.0.2"
+ self.server_ip4 = "172.0.0.1"
+ self.vapi.cli(f"create tap id 0 host-ip4-addr {self.client_ip4}/24")
+ self.vapi.cli(f"set int ip addr tap0 {self.server_ip4}/24")
+ self.vapi.cli("set int state tap0 up")
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def test_http_tps(self):
+ fname = "test_file_1M"
+ self.vapi.cli("http tps uri tcp://0.0.0.0/8080")
+ con = http.client.HTTPConnection(f"{self.server_ip4}", 8080)
+ con.request("GET", f"/{fname}")
+ r = con.getresponse()
+ self.assertEqual(len(r.read()), 1 << 20)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_http_static.py b/test/asf/test_http_static.py
new file mode 100644
index 00000000000..18e8ba56a1e
--- /dev/null
+++ b/test/asf/test_http_static.py
@@ -0,0 +1,164 @@
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner
+import unittest
+import subprocess
+import tempfile
+from vpp_qemu_utils import (
+ create_host_interface,
+ delete_host_interfaces,
+ create_namespace,
+ delete_namespace,
+)
+
+
+@unittest.skipIf(
+ "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
+)
+@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
+class TestHttpStaticVapi(VppAsfTestCase):
+ """enable the http static server and send requests [VAPI]"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestHttpStaticVapi, cls).setUpClass()
+ # 2 temp files to improve coverage of http_cache.c
+ cls.temp = tempfile.NamedTemporaryFile()
+ cls.temp.write(b"Hello world")
+
+ cls.temp2 = tempfile.NamedTemporaryFile()
+ cls.temp2.write(b"Hello world2")
+
+ try:
+ create_namespace("HttpStatic")
+ except Exception:
+ cls.logger.warning("Unable to create a namespace, retrying.")
+ delete_namespace("HttpStatic")
+ create_namespace("HttpStatic")
+
+ create_host_interface("vppHost", "vppOut", "HttpStatic", "10.10.1.1/24")
+
+ cls.vapi.cli("create host-interface name vppOut")
+ cls.vapi.cli("set int state host-vppOut up")
+ cls.vapi.cli("set int ip address host-vppOut 10.10.1.2/24")
+
+ @classmethod
+ def tearDownClass(cls):
+ delete_namespace("HttpStatic")
+ delete_host_interfaces("vppHost")
+ cls.temp.close()
+ cls.temp2.close()
+ super(TestHttpStaticVapi, cls).tearDownClass()
+
+ def test_http_static_vapi(self):
+ self.vapi.http_static_enable(
+ www_root="/tmp",
+ uri="tcp://0.0.0.0/80",
+ )
+ # move file pointer to the beginning
+ self.temp.seek(0)
+ process = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ "HttpStatic",
+ "curl",
+ f"10.10.1.2/{self.temp.name[5:]}",
+ ],
+ capture_output=True,
+ )
+ self.assertIn(b"Hello world", process.stdout)
+
+ self.temp2.seek(0)
+ process = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ "HttpStatic",
+ "curl",
+ f"10.10.1.2/{self.temp2.name[5:]}",
+ ],
+ capture_output=True,
+ )
+ self.assertIn(b"Hello world2", process.stdout)
+
+
+@unittest.skipIf(
+ "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
+)
+@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
+class TestHttpStaticCli(VppAsfTestCase):
+ """enable the static http server and send requests [CLI]"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestHttpStaticCli, cls).setUpClass()
+ # 2 temp files to improve coverage of http_cache.c
+ cls.temp = tempfile.NamedTemporaryFile()
+ cls.temp.write(b"Hello world")
+
+ cls.temp2 = tempfile.NamedTemporaryFile()
+ cls.temp2.write(b"Hello world2")
+
+ try:
+ create_namespace("HttpStatic2")
+ except Exception:
+ cls.logger.warning("Unable to create namespace, retrying.")
+ delete_namespace("HttpStatic2")
+ create_namespace("HttpStatic2")
+
+ create_host_interface("vppHost2", "vppOut2", "HttpStatic2", "10.10.1.1/24")
+
+ cls.vapi.cli("create host-interface name vppOut2")
+ cls.vapi.cli("set int state host-vppOut2 up")
+ cls.vapi.cli("set int ip address host-vppOut2 10.10.1.2/24")
+
+ @classmethod
+ def tearDownClass(cls):
+ delete_namespace("HttpStatic2")
+ delete_host_interfaces("vppHost2")
+ cls.temp.close()
+ cls.temp2.close()
+ super(TestHttpStaticCli, cls).tearDownClass()
+
+ def test_http_static_cli(self):
+ self.vapi.cli(
+ "http static server www-root /tmp uri tcp://0.0.0.0/80 cache-size 2m"
+ )
+ # move file pointer to the beginning
+ self.temp.seek(0)
+ process = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ "HttpStatic2",
+ "curl",
+ f"10.10.1.2/{self.temp.name[5:]}",
+ ],
+ capture_output=True,
+ )
+ self.assertIn(b"Hello world", process.stdout)
+
+ self.temp2.seek(0)
+ process = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ "HttpStatic2",
+ "curl",
+ f"10.10.1.2/{self.temp2.name[5:]}",
+ ],
+ capture_output=True,
+ )
+ self.assertIn(b"Hello world2", process.stdout)
+
+ self.vapi.cli("show http static server cache")
+ self.vapi.cli("clear http static cache")
+ self.vapi.cli("show http static server sessions")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_lb_api.py b/test/asf/test_lb_api.py
new file mode 100644
index 00000000000..9608d0473a6
--- /dev/null
+++ b/test/asf/test_lb_api.py
@@ -0,0 +1,82 @@
+# Copyright (c) 2019. Vinci Consulting Corp. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from asfframework import VppAsfTestCase
+
+DEFAULT_VIP = "lb_vip_details(_0=978, context=12, vip=vl_api_lb_ip_addr_t(pfx=IPv6Network(u'::/0'), protocol=<vl_api_ip_proto_t.IP_API_PROTO_RESERVED: 255>, port=0), encap=<vl_api_lb_encap_type_t.LB_API_ENCAP_TYPE_GRE4: 0>, dscp=<vl_api_ip_dscp_t.IP_API_DSCP_CS0: 0>, srv_type=<vl_api_lb_srv_type_t.LB_API_SRV_TYPE_CLUSTERIP: 0>, target_port=0, flow_table_length=0)" # noqa
+
+
+class TestLbEmptyApi(VppAsfTestCase):
+ """TestLbEmptyApi"""
+
+ def test_lb_empty_vip_dump(self):
+ # no records should normally return [], but
+ # lb initializes with a default VIP
+ rv = self.vapi.lb_vip_dump()
+ # print(rv)
+ self.assertEqual(rv, [], "Expected: [] Received: %r." % rv)
+
+ def test_lb_empty_as_dump(self):
+ # no records should return []
+ rv = self.vapi.lb_as_dump()
+ # print(rv)
+ self.assertEqual(rv, [], "Expected: [] Received: %r." % rv)
+
+
+class TestLbApi(VppAsfTestCase):
+ """TestLbApi"""
+
+ def test_lb_vip_dump(self):
+ # add some vips
+ # rv = self.vapi.lb_add_del_vip(pfx=ipaddress.IPv4Network(u'1.2.3.0/24'), # noqa
+ # protocol=17,
+ # encap=0)
+ # print(rv)
+ self.vapi.cli("lb vip 2001::/16 encap gre6")
+ rv = self.vapi.lb_vip_dump()
+ # print(rv)
+ self.assertEqual(
+ str(rv[-1].vip.pfx),
+ "2001::/16",
+ "Expected: 2001::/16 Received: %r." % rv[-1].vip.pfx,
+ )
+
+ self.vapi.cli("lb vip 2001::/16 del")
+
+
+class TestLbAsApi(VppAsfTestCase):
+ """TestLbAsApi"""
+
+ def test_lb_as_dump(self):
+ # add some vips
+ self.vapi.cli("lb vip 2001::/16 encap gre6")
+ self.vapi.cli("lb as 2001::/16 2000::1")
+ # add some as's for the vips
+ # rv = self.vapi.lb_add_del_as(
+ # pfx=ipaddress.IPv4Network(u"10.0.0.0/24"),
+ # as_address=ipaddress.IPv4Address(u"192.168.1.1"))
+
+ # print(rv)
+ rv = self.vapi.lb_as_dump()
+ # print(rv)
+ self.assertEqual(
+ str(rv[0].vip.pfx),
+ "2001::/16",
+ 'Expected: "2001::/16" Received: %r.' % rv[0].vip.pfx,
+ )
+ self.assertEqual(
+ str(rv[0].app_srv),
+ "2000::1",
+ 'Expected: "2000::1" Received: %r.' % rv[0].app_srv,
+ )
diff --git a/test/asf/test_mactime.py b/test/asf/test_mactime.py
new file mode 100644
index 00000000000..215bd132cf0
--- /dev/null
+++ b/test/asf/test_mactime.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestMactime(VppAsfTestCase):
+ """Mactime Unit Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestMactime, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMactime, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestMactime, self).setUp()
+
+ def tearDown(self):
+ super(TestMactime, self).tearDown()
+
+ def test_mactime_range_unittest(self):
+ """Time Range Test"""
+ error = self.vapi.cli("test time-range")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("FAILED", error)
+
+ @unittest.skipUnless(config.gcov, "part of code coverage tests")
+ def test_mactime_unittest(self):
+ """Mactime Plugin Code Coverage Test"""
+ cmds = [
+ "loopback create",
+ "mactime enable-disable disable",
+ "mactime enable-disable loop0",
+ "mactime enable-disable loop0 disable",
+ "mactime enable-disable sw_if_index 9999",
+ "bin mactime_enable_disable loop0",
+ "bin mactime_enable_disable loop0 disable",
+ "bin mactime_enable_disable sw_if_index 1",
+ "set interface state loop0 up",
+ "clear mactime",
+ "set ip neighbor loop0 192.168.1.1 00:d0:2d:5e:86:85",
+ "bin mactime_add_del_range name sallow "
+ "mac 00:d0:2d:5e:86:85 allow-static del",
+ "bin mactime_add_del_range name sallow "
+ "mac 00:d0:2d:5e:86:85 allow-static",
+ "bin mactime_add_del_range name sallow "
+ "mac 00:d0:2d:5e:86:85 allow-static del",
+ "bin mactime_add_del_range name sallow "
+ "mac 00:d0:2d:5e:86:85 allow-static",
+ "bin mactime_add_del_range name sblock "
+ "mac 01:00:5e:7f:ff:fa drop-static",
+ "bin mactime_add_del_range name ddrop "
+ "mac c8:bc:c8:5a:ba:f3 drop-range Sun - Sat "
+ "00:00 - 23:59",
+ "bin mactime_add_del_range name dallow "
+ "mac c8:bc:c8:5a:ba:f4 allow-range Sun - Sat "
+ "00:00 - 23:59",
+ "bin mactime_add_del_range name multi "
+ "mac c8:bc:c8:f0:f0:f0 allow-range Sun - Mon "
+ "00:00 - 23:59 Tue - Sat 00:00 - 23:59",
+ "bin mactime_add_del_range bogus",
+ "bin mactime_add_del_range mac 01:00:5e:7f:f0:f0 allow-static",
+ "bin mactime_add_del_range "
+ "name tooloooooooooooooooooooooooooooooooooooooooooooooooo"
+ "nnnnnnnnnnnnnnnnnnnnnnnnnnnng mac 00:00:de:ad:be:ef "
+ "allow-static",
+ "packet-generator new {\n"
+ " name allow\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node ethernet-input\n"
+ " data {\n"
+ " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n",
+ "}\n",
+ "packet-generator new {\n"
+ " name deny\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node ethernet-input\n"
+ " data {\n"
+ " IP6: 01:00:5e:7f:ff:fa -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n",
+ "}\n",
+ "packet-generator new {\n"
+ " name ddrop\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node ethernet-input\n"
+ " data {\n"
+ " IP6: c8:bc:c8:5a:ba:f3 -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n",
+ "}\n",
+ "packet-generator new {\n"
+ " name dallow\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node ethernet-input\n"
+ " data {\n"
+ " IP6: c8:bc:c8:5a:ba:f4 -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n"
+ "}\n"
+ "packet-generator new {\n"
+ " name makeentry\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " node ethernet-input\n"
+ " data {\n"
+ " IP6: c8:bc:c8:5a:b0:0b -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n"
+ "}\n"
+ "packet-generator new {\n"
+ " name tx\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface local0\n"
+ " tx-interface loop0\n"
+ " node loop0-output\n"
+ " data {\n"
+ " hex 0x01005e7ffffa000dead000000800"
+ "0102030405060708090a0b0c0d0e0f0102030405\n"
+ " }\n"
+ "}\n"
+ "trace add pg-input 2",
+ "pa en",
+ "show mactime verbose 2",
+ "show trace",
+ "show error",
+ ]
+
+ for cmd in cmds:
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, "reply"):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_mpcap.py b/test/asf/test_mpcap.py
new file mode 100644
index 00000000000..ed8ce1e0ea9
--- /dev/null
+++ b/test/asf/test_mpcap.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+import os
+
+
+class TestMpcap(VppAsfTestCase):
+ """Mpcap Unit Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestMpcap, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMpcap, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestMpcap, self).setUp()
+
+ def tearDown(self):
+ super(TestMpcap, self).tearDown()
+
+ def test_mpcap_unittest(self):
+ """Mapped pcap file test"""
+ cmds = [
+ "packet-generator new {\n"
+ " name mpcap\n"
+ " limit 15\n"
+ " size 128-128\n"
+ " interface local0\n"
+ " node mpcap-unittest\n"
+ " data {\n"
+ " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
+ " ICMP: db00::1 -> db00::2\n"
+ " incrementing 30\n"
+ " }\n",
+ "trace add pg-input 15",
+ "pa en",
+ "show trace",
+ "show error",
+ ]
+
+ for cmd in cmds:
+ self.logger.info(self.vapi.cli(cmd))
+
+ size = os.path.getsize("/tmp/mpcap_unittest.pcap")
+ os.remove("/tmp/mpcap_unittest.pcap")
+ if size != 2184:
+ self.logger.critical("BUG: file size %d not 2184" % size)
+ self.assertNotIn("WrongMPCAPFileSize", "WrongMPCAPFileSize")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_node_variants.py b/test/asf/test_node_variants.py
new file mode 100644
index 00000000000..80a18eb055a
--- /dev/null
+++ b/test/asf/test_node_variants.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python3
+import re
+import unittest
+import platform
+from asfframework import VppAsfTestCase
+
+
+def checkX86():
+ return platform.machine() in ["x86_64", "AMD64", "amd64"]
+
+
+def skipVariant(variant):
+ # TODO: We don't have cpu feature detection on FreeBSD yet, so always return
+ # that we don't have the requested variant.
+ if platform.uname().system == "FreeBSD":
+ return False
+
+ with open("/proc/cpuinfo") as f:
+ cpuinfo = f.read()
+
+ exp = re.compile(r"(?:flags\s+:)(?:\s\w+)+(?:\s(" + variant + r"))(?:\s\w+)+")
+ match = exp.search(cpuinfo, re.DOTALL | re.MULTILINE)
+
+ return checkX86() and match is not None
+
+
+class TestNodeVariant(VppAsfTestCase):
+ """Test Node Variants"""
+
+ @classmethod
+ def setUpConstants(cls, variant):
+ super(TestNodeVariant, cls).setUpConstants()
+ # find the position of node_variants in the cmdline args.
+
+ if checkX86():
+ node_variants = cls.vpp_cmdline.index("node { ") + 1
+ cls.vpp_cmdline[node_variants] = (
+ "default { variant default } "
+ "ip4-rewrite { variant " + variant + " } "
+ )
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNodeVariant, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNodeVariant, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestNodeVariant, self).setUp()
+
+ def tearDown(self):
+ super(TestNodeVariant, self).tearDown()
+
+ def getActiveVariant(self, node):
+ node_desc = self.vapi.cli("show node " + node)
+ self.logger.info(node_desc)
+
+ match = re.search(
+ r"\s+(\S+)\s+(\d+)\s+(:?yes)", node_desc, re.DOTALL | re.MULTILINE
+ )
+
+ return match.groups(0)
+
+ def checkVariant(self, variant):
+ """Test node variants defaults"""
+
+ variant_info = self.getActiveVariant("ip4-lookup")
+ self.assertEqual(variant_info[0], "default")
+
+ variant_info = self.getActiveVariant("ip4-rewrite")
+ self.assertEqual(variant_info[0], variant)
+
+
+class TestICLVariant(TestNodeVariant):
+ """Test icl Node Variants"""
+
+ VARIANT = "icl"
+ LINUX_VARIANT = "avx512_bitalg"
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestICLVariant, cls).setUpConstants(cls.VARIANT)
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestICLVariant, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestICLVariant, cls).tearDownClass()
+
+ @unittest.skipUnless(
+ skipVariant(LINUX_VARIANT), VARIANT + " not a supported variant, skip."
+ )
+ def test_icl(self):
+ self.checkVariant(self.VARIANT)
+
+
+class TestSKXVariant(TestNodeVariant):
+ """Test skx Node Variants"""
+
+ VARIANT = "skx"
+ LINUX_VARIANT = "avx512f"
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestSKXVariant, cls).setUpConstants(cls.VARIANT)
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSKXVariant, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSKXVariant, cls).tearDownClass()
+
+ @unittest.skipUnless(
+ skipVariant(LINUX_VARIANT), VARIANT + " not a supported variant, skip."
+ )
+ def test_skx(self):
+ self.checkVariant(self.VARIANT)
+
+
+class TestHSWVariant(TestNodeVariant):
+ """Test avx2 Node Variants"""
+
+ VARIANT = "hsw"
+ LINUX_VARIANT = "avx2"
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestHSWVariant, cls).setUpConstants(cls.VARIANT)
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestHSWVariant, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestHSWVariant, cls).tearDownClass()
+
+ @unittest.skipUnless(
+ skipVariant(LINUX_VARIANT), VARIANT + " not a supported variant, skip."
+ )
+ def test_hsw(self):
+ self.checkVariant(self.VARIANT)
diff --git a/test/asf/test_offload.py b/test/asf/test_offload.py
new file mode 100644
index 00000000000..4c800129094
--- /dev/null
+++ b/test/asf/test_offload.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestOffload(VppAsfTestCase):
+ """Offload Unit Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestOffload, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestOffload, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestOffload, self).setUp()
+
+ def tearDown(self):
+ super(TestOffload, self).tearDown()
+
+ def test_offload_unittest(self):
+ """Checksum Offload Test"""
+ cmds = [
+ "loop create",
+ "set int ip address loop0 11.22.33.1/24",
+ "set int state loop0 up",
+ "loop create",
+ "set int ip address loop1 11.22.34.1/24",
+ "set int state loop1 up",
+ "set ip neighbor loop1 11.22.34.44 03:00:11:22:34:44",
+ "packet-generator new {\n"
+ " name s0\n"
+ " limit 100\n"
+ " size 128-128\n"
+ " interface loop0\n"
+ " tx-interface loop1\n"
+ " node loop1-output\n"
+ " buffer-flags ip4 offload\n"
+ " buffer-offload-flags offload-ip-cksum offload-udp-cksum\n"
+ " data {\n"
+ " IP4: 1.2.3 -> dead.0000.0001\n"
+ " UDP: 11.22.33.44 -> 11.22.34.44\n"
+ " ttl 2 checksum 13\n"
+ " UDP: 1234 -> 2345\n"
+ " checksum 11\n"
+ " incrementing 114\n"
+ " }\n"
+ "}",
+ "trace add pg-input 1",
+ "pa en",
+ "show error",
+ ]
+
+ for cmd in cmds:
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, "reply"):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+
+ r = self.vapi.cli_return_response("show trace")
+ self.assertTrue(r.retval == 0)
+ self.assertTrue(hasattr(r, "reply"))
+ rv = r.reply
+ look_here = rv.find("ethernet-input")
+ self.assertFalse(look_here == -1)
+ bad_checksum_index = rv[look_here:].find("should be")
+ self.assertTrue(bad_checksum_index == -1)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_perfmon.py b/test/asf/test_perfmon.py
new file mode 100644
index 00000000000..611746ff203
--- /dev/null
+++ b/test/asf/test_perfmon.py
@@ -0,0 +1,48 @@
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_qemu_utils import can_create_namespaces
+from config import config
+import unittest
+
+
+@unittest.skipIf(
+ not can_create_namespaces("perfmon_chk"), "Test is not running with root privileges"
+)
+@unittest.skipIf("perfmon" in config.excluded_plugins, "Exclude Perfmon plugin tests")
+class TestPerfmon(VppAsfTestCase):
+ """Simple perfmon test"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestPerfmon, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestPerfmon, cls).tearDownClass()
+
+ def test_perfmon(self):
+ reply = self.vapi.cli("show perfmon active-bundle")
+ self.assertNotIn("context-switches", reply)
+
+ reply = self.vapi.cli("show perfmon bundle")
+ self.assertIn("context-switches", reply)
+
+ self.vapi.cli("perfmon start bundle context-switches type thread")
+ reply = self.vapi.cli("show perfmon active-bundle")
+ self.assertIn("name: context-switches", reply)
+
+ reply = self.vapi.cli("show perfmon statistics")
+ self.assertIn("per-thread context switches", reply)
+
+ reply = self.vapi.cli("show perfmon source linux verbose")
+ self.assertIn("description: Linux kernel performance counters", reply)
+ self.vapi.cli("perfmon reset")
+
+ reply = self.vapi.cli("show perfmon active-bundle")
+ self.assertNotIn("context-switches", reply)
+
+ self.vapi.cli("perfmon start bundle context-switches type thread")
+ self.vapi.cli("perfmon stop")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_policer.py b/test/asf/test_policer.py
new file mode 100644
index 00000000000..9c01bf0fc1c
--- /dev/null
+++ b/test/asf/test_policer.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python3
+# Copyright (c) 2021 Graphiant, Inc.
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_policer import VppPolicer
+
+# Default for the tests is 10s of "Green" packets at 8Mbps, ie. 10M bytes.
+# The policer helper CLI "sends" 500 byte packets, so default is 20000.
+
+TEST_RATE = 8000 # kbps
+TEST_BURST = 10000 # ms
+
+CIR_OK = 8500 # CIR in kbps, above test rate
+CIR_LOW = 7000 # CIR in kbps, below test rate
+EIR_OK = 9000 # EIR in kbps, above test rate
+EIR_LOW = 7500 # EIR in kbps, below test rate
+
+NUM_PKTS = 20000
+
+CBURST = 100000 # Committed burst in bytes
+EBURST = 200000 # Excess burst in bytes
+
+
+class TestPolicer(VppAsfTestCase):
+ """Policer Test Case"""
+
+ def run_policer_test(
+ self, type, cir, cb, eir, eb, rate=8000, burst=10000, colour=0
+ ):
+ """
+ Configure a Policer and push traffic through it.
+ """
+ types = {
+ "1R2C": 0,
+ "1R3C": 1,
+ "2R3C": 3,
+ }
+
+ pol_type = types.get(type)
+ policer = VppPolicer(
+ self,
+ "pol1",
+ cir,
+ eir,
+ cb,
+ eb,
+ rate_type=0,
+ type=pol_type,
+ color_aware=colour,
+ )
+ policer.add_vpp_config()
+
+ error = self.vapi.cli(
+ f"test policing index {policer.policer_index} rate {rate} "
+ f"burst {burst} colour {colour}"
+ )
+
+ stats = policer.get_stats()
+ policer.remove_vpp_config()
+
+ return stats
+
+ def test_policer_1r2c(self):
+ """Single rate, 2 colour policer"""
+ stats = self.run_policer_test("1R2C", CIR_OK, CBURST, 0, 0)
+ self.assertEqual(stats["conform_packets"], NUM_PKTS)
+
+ stats = self.run_policer_test("1R2C", CIR_LOW, CBURST, 0, 0)
+ self.assertLess(stats["conform_packets"], NUM_PKTS)
+ self.assertEqual(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("1R2C", CIR_LOW, CBURST, 0, 0, colour=2)
+ self.assertEqual(stats["violate_packets"], NUM_PKTS)
+
+ def test_policer_1r3c(self):
+ """Single rate, 3 colour policer"""
+ stats = self.run_policer_test("1R3C", CIR_OK, CBURST, 0, 0)
+ self.assertEqual(stats["conform_packets"], NUM_PKTS)
+
+ stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST)
+ self.assertLess(stats["conform_packets"], NUM_PKTS)
+ self.assertGreater(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST, colour=1)
+ self.assertEqual(stats["conform_packets"], 0)
+ self.assertGreater(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST, colour=2)
+ self.assertEqual(stats["violate_packets"], NUM_PKTS)
+
+ def test_policer_2r3c(self):
+ """Dual rate, 3 colour policer"""
+ stats = self.run_policer_test("2R3C", CIR_OK, CBURST, EIR_OK, EBURST)
+ self.assertEqual(stats["conform_packets"], NUM_PKTS)
+
+ stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST)
+ self.assertLess(stats["conform_packets"], NUM_PKTS)
+ self.assertGreater(stats["exceed_packets"], 0)
+ self.assertEqual(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_LOW, EBURST)
+ self.assertLess(stats["conform_packets"], NUM_PKTS)
+ self.assertGreater(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST, colour=1)
+ self.assertEqual(stats["exceed_packets"], NUM_PKTS)
+
+ stats = self.run_policer_test(
+ "2R3C", CIR_LOW, CBURST, EIR_LOW, EBURST, colour=1
+ )
+ self.assertEqual(stats["conform_packets"], 0)
+ self.assertGreater(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST, colour=2)
+ self.assertEqual(stats["violate_packets"], NUM_PKTS)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_prom.py b/test/asf/test_prom.py
new file mode 100644
index 00000000000..3f8fb4c7a44
--- /dev/null
+++ b/test/asf/test_prom.py
@@ -0,0 +1,58 @@
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner
+import unittest
+import subprocess
+from vpp_qemu_utils import (
+ create_host_interface,
+ delete_host_interfaces,
+ create_namespace,
+ delete_namespace,
+)
+
+
+@unittest.skipIf(
+ "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
+)
+@unittest.skipIf("prom" in config.excluded_plugins, "Exclude Prometheus plugin tests")
+@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
+class TestProm(VppAsfTestCase):
+ """Prometheus plugin test"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestProm, cls).setUpClass()
+
+ create_namespace("HttpStaticProm")
+ create_host_interface("vppHost", "vppOut", "HttpStaticProm", "10.10.1.1/24")
+
+ cls.vapi.cli("create host-interface name vppOut")
+ cls.vapi.cli("set int state host-vppOut up")
+ cls.vapi.cli("set int ip address host-vppOut 10.10.1.2/24")
+
+ @classmethod
+ def tearDownClass(cls):
+ delete_namespace(["HttpStaticProm"])
+ delete_host_interfaces("vppHost")
+ super(TestProm, cls).tearDownClass()
+
+ def test_prom(self):
+ """Enable HTTP Static server and prometheus exporter, get stats"""
+ self.vapi.cli("http static server uri tcp://0.0.0.0/80 url-handlers")
+ self.vapi.cli("prom enable")
+
+ process = subprocess.run(
+ [
+ "ip",
+ "netns",
+ "exec",
+ "HttpStaticProm",
+ "curl",
+ f"10.10.1.2/stats.prom",
+ ],
+ capture_output=True,
+ )
+ self.assertIn(b"TYPE", process.stdout)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_quic.py b/test/asf/test_quic.py
new file mode 100644
index 00000000000..e453bd5b3e5
--- /dev/null
+++ b/test/asf/test_quic.py
@@ -0,0 +1,601 @@
+#!/usr/bin/env python3
+""" Vpp QUIC tests """
+
+import unittest
+import os
+import signal
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_vpp_workers
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+
+
+class QUICAppWorker(Worker):
+ """QUIC Test Application Worker"""
+
+ process = None
+
+ def __init__(
+ self,
+ appname,
+ executable_args,
+ logger,
+ role,
+ testcase,
+ env=None,
+ *args,
+ **kwargs,
+ ):
+ if env is None:
+ env = {}
+ app = f"{config.vpp_build_dir}/vpp/bin/{appname}"
+ self.args = [app] + executable_args
+ self.role = role
+ self.wait_for_gdb = "wait-for-gdb"
+ self.testcase = testcase
+ super(QUICAppWorker, self).__init__(self.args, logger, env, *args, **kwargs)
+
+ def run(self):
+ super(QUICAppWorker, self).run()
+
+ def teardown(self, logger, timeout):
+ if self.process is None:
+ return False
+ try:
+ logger.debug(f"Killing worker process (pid {self.process.pid})")
+ os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
+ self.join(timeout)
+ except OSError as e:
+ logger.debug("Couldn't kill worker process")
+ return True
+ return False
+
+
+@unittest.skipIf("quic" in config.excluded_plugins, "Exclude QUIC plugin tests")
+class QUICTestCase(VppAsfTestCase):
+ """QUIC Test Case"""
+
+ timeout = 20
+ pre_test_sleep = 0.3
+ post_test_sleep = 0.3
+ server_appns = "server"
+ server_appns_secret = None
+ client_appns = "client"
+ client_appns_secret = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.extra_vpp_plugin_config.append("plugin quic_plugin.so { enable }")
+ super(QUICTestCase, cls).setUpClass()
+
+ def setUp(self):
+ super(QUICTestCase, self).setUp()
+ self.vppDebug = "vpp_debug" in config.vpp_build_dir
+
+ self.create_loopback_interfaces(2)
+ self.uri = f"quic://{self.loop0.local_ip4}/1234"
+ table_id = 1
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ i.set_table_ip4(table_id)
+ i.config_ip4()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id=self.server_appns,
+ secret=self.server_appns_secret,
+ sw_if_index=self.loop0.sw_if_index,
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id=self.client_appns,
+ secret=self.client_appns_secret,
+ sw_if_index=self.loop1.sw_if_index,
+ )
+
+ # Add inter-table routes
+ self.ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=2)],
+ table_id=1,
+ )
+ self.ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
+ table_id=2,
+ )
+ self.ip_t01.add_vpp_config()
+ self.ip_t10.add_vpp_config()
+ self.logger.debug(self.vapi.cli("show ip fib"))
+
+ def tearDown(self):
+ # Delete inter-table routes
+ self.ip_t01.remove_vpp_config()
+ self.ip_t10.remove_vpp_config()
+
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.set_table_ip4(0)
+ i.admin_down()
+ super(QUICTestCase, self).tearDown()
+
+
+class QUICEchoIntTestCase(QUICTestCase):
+ """QUIC Echo Internal Test Case"""
+
+ test_bytes = " test-bytes"
+ extra_vpp_config = ["session", "{", "enable", "poll-main", "}"]
+
+ def setUp(self):
+ super(QUICEchoIntTestCase, self).setUp()
+ self.client_args = (
+ f"uri {self.uri} fifo-size 64k{self.test_bytes} appns {self.client_appns} "
+ )
+ self.server_args = f"uri {self.uri} fifo-size 64k appns {self.server_appns} "
+
+ def tearDown(self):
+ super(QUICEchoIntTestCase, self).tearDown()
+
+ def server(self, *args):
+ _args = self.server_args + " ".join(args)
+ error = self.vapi.cli(f"test echo server {_args}")
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def client(self, *args):
+ _args = self.client_args + " ".join(args)
+ error = self.vapi.cli(f"test echo client {_args}")
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+
+@tag_fixme_vpp_workers
+class QUICEchoIntTransferTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal Transfer Test Case"""
+
+ def test_quic_int_transfer(self):
+ """QUIC internal transfer"""
+ self.server()
+ self.client("mbytes", "2")
+
+
+@tag_fixme_vpp_workers
+class QUICEchoIntSerialTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal Serial Transfer Test Case"""
+
+ def test_quic_serial_int_transfer(self):
+ """QUIC serial internal transfer"""
+ self.server()
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+
+
+@tag_fixme_vpp_workers
+class QUICEchoIntMStreamTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal MultiStream Test Case"""
+
+ def test_quic_int_multistream_transfer(self):
+ """QUIC internal multi-stream transfer"""
+ self.server()
+ self.client("nclients", "10", "mbytes", "1")
+
+
+class QUICEchoExtTestCase(QUICTestCase):
+ quic_setup = "default"
+ test_bytes = "test-bytes:assert"
+ pre_test_sleep = 1
+ post_test_sleep = 1
+ app = "vpp_echo"
+ evt_q_len = 16384
+ vpp_worker_count = 1
+ server_fifo_size = "1M"
+ client_fifo_size = "4M"
+ extra_vpp_config = [
+ "session",
+ "{",
+ "enable",
+ "poll-main",
+ "use-app-socket-api",
+ "wrk-mqs-segment-size",
+ "64M",
+ "event-queue-length",
+ f"{evt_q_len}",
+ "preallocated-sessions",
+ "1024",
+ "v4-session-table-buckets",
+ "20000",
+ "v4-session-table-memory",
+ "64M",
+ "v4-halfopen-table-buckets",
+ "20000",
+ "v4-halfopen-table-memory",
+ "64M",
+ "local-endpoints-table-buckets",
+ "250000",
+ "local-endpoints-table-memory",
+ "512M",
+ "}",
+ ]
+
+ def setUp(self):
+ self.server_appns_secret = 1234
+ self.client_appns_secret = 5678
+ super(QUICEchoExtTestCase, self).setUp()
+ common_args = [
+ "uri",
+ self.uri,
+ "json",
+ self.test_bytes,
+ "quic-setup",
+ self.quic_setup,
+ "nthreads",
+ "1",
+ "mq-size",
+ f"{self.evt_q_len}",
+ "use-app-socket-api",
+ ]
+ self.server_echo_test_args = common_args + [
+ "server",
+ "appns",
+ f"{self.server_appns}",
+ "fifo-size",
+ f"{self.server_fifo_size}",
+ "socket-name",
+ f"{self.tempdir}/app_ns_sockets/{self.server_appns}",
+ ]
+ self.client_echo_test_args = common_args + [
+ "client",
+ "appns",
+ f"{self.client_appns}",
+ "fifo-size",
+ f"{self.client_fifo_size}",
+ "socket-name",
+ f"{self.tempdir}/app_ns_sockets/{self.client_appns}",
+ ]
+ error = self.vapi.cli("quic set fifo-size 2M")
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def server(self, *args):
+ _args = self.server_echo_test_args + list(args)
+ self.worker_server = QUICAppWorker(
+ self.app, _args, self.logger, self.server_appns, self
+ )
+ self.worker_server.start()
+ self.sleep(self.pre_test_sleep)
+
+ def client(self, *args):
+ _args = self.client_echo_test_args + list(args)
+ self.worker_client = QUICAppWorker(
+ self.app, _args, self.logger, self.client_appns, self
+ )
+ self.worker_client.start()
+ timeout = None if self.debug_all else self.timeout
+ self.worker_client.join(timeout)
+ if self.worker_client.is_alive():
+ error = f"Client failed to complete in {timeout} seconds!"
+ self.logger.critical(error)
+ return
+ self.worker_server.join(timeout)
+ if self.worker_server.is_alive():
+ error = f"Server failed to complete in {timeout} seconds!"
+ self.logger.critical(error)
+ self.sleep(self.post_test_sleep)
+
+ def validate_ext_test_results(self):
+ server_result = self.worker_server.result
+ self.logger.debug(self.vapi.cli(f"show session verbose 2"))
+ client_result = self.worker_client.result
+ self.logger.info(f"Server worker result is `{server_result}'")
+ self.logger.info(f"Client worker result is `{client_result}'")
+ server_kill_error = False
+ if self.worker_server.result is None:
+ server_kill_error = self.worker_server.teardown(self.logger, self.timeout)
+ if self.worker_client.result is None:
+ self.worker_client.teardown(self.logger, self.timeout)
+ err_msg = f"Wrong server worker return code ({server_result})"
+ self.assertEqual(server_result, 0, err_msg)
+ self.assertIsNotNone(
+ client_result, f"Timeout! Client worker did not finish in {self.timeout}s"
+ )
+ err_msg = f"Wrong client worker return code ({client_result})"
+ self.assertEqual(client_result, 0, err_msg)
+ self.assertFalse(server_kill_error, "Server kill errored")
+
+
+class QUICEchoExtTransferTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Test Case"""
+
+ timeout = 60
+
+ def test_quic_ext_transfer(self):
+ """QUIC external transfer"""
+ self.server()
+ self.client()
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtTransferBigTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Big Test Case"""
+
+ server_fifo_size = "4M"
+ client_fifo_size = "4M"
+ test_bytes = ""
+ timeout = 60
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_quic_ext_transfer_big(self):
+ """QUIC external transfer, big stream"""
+ self.server("TX=0", "RX=2G")
+ self.client("TX=2G", "RX=0")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtQcloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Qclose Rx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_qclose_rx(self):
+ """QUIC external transfer, rx close"""
+ self.server("TX=0", "RX=10M", "qclose=Y", "sclose=N")
+ self.client("TX=10M", "RX=0", "qclose=W", "sclose=W")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtQcloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Qclose Tx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_qclose_tx(self):
+ """QUIC external transfer, tx close"""
+ self.server("TX=0", "RX=10M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.client("TX=10M", "RX=0", "qclose=Y", "sclose=N")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtEarlyQcloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Early Qclose Rx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_early_qclose_rx(self):
+ """QUIC external transfer, early rx close"""
+ self.server("TX=0", "RX=10M", "qclose=Y", "sclose=N")
+ self.client("TX=20M", "RX=0", "qclose=W", "sclose=W", "tx-results-diff")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtEarlyQcloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Early Qclose Tx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_early_qclose_tx(self):
+ """QUIC external transfer, early tx close"""
+ self.server("TX=0", "RX=20M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.client("TX=10M", "RX=0", "qclose=Y", "sclose=N")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtScloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Sclose Rx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_sclose_rx(self):
+ """QUIC external transfer, rx stream close"""
+ self.server("TX=0", "RX=10M", "qclose=N", "sclose=Y")
+ self.client("TX=10M", "RX=0", "qclose=W", "sclose=W")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtScloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Sclose Tx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_sclose_tx(self):
+ """QUIC external transfer, tx stream close"""
+ self.server("TX=0", "RX=10M", "qclose=W", "sclose=W")
+ self.client("TX=10M", "RX=0", "qclose=Y", "sclose=Y")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtEarlyScloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Early Sclose Rx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_early_sclose_rx(self):
+ """QUIC external transfer, early rx stream close"""
+ self.server("TX=0", "RX=10M", "qclose=N", "sclose=Y")
+ self.client("TX=20M", "RX=0", "qclose=W", "sclose=W", "tx-results-diff")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtEarlyScloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Early Sclose Tx Test Case"""
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_early_sclose_tx(self):
+ """QUIC external transfer, early tx stream close"""
+ self.server("TX=0", "RX=20M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.client("TX=10M", "RX=0", "qclose=Y", "sclose=Y")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Test Case"""
+
+ quic_setup = "serverstream"
+ timeout = 60
+
+ def test_quic_ext_transfer_server_stream(self):
+ """QUIC external server transfer"""
+ self.server("TX=10M", "RX=0")
+ self.client("TX=0", "RX=10M")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamBigTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Big Test Case"""
+
+ quic_setup = "serverstream"
+ server_fifo_size = "4M"
+ client_fifo_size = "4M"
+ test_bytes = ""
+ timeout = 60
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_quic_ext_transfer_server_stream_big(self):
+ """QUIC external server transfer, big stream"""
+ self.server("TX=2G", "RX=0")
+ self.client("TX=0", "RX=2G")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamQcloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Qclose Rx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_qclose_rx(self):
+ """QUIC external server transfer, rx close"""
+ self.server("TX=10M", "RX=0", "qclose=W", "sclose=W")
+ self.client("TX=0", "RX=10M", "qclose=Y", "sclose=N")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamQcloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Qclose Tx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_qclose_tx(self):
+ """QUIC external server transfer, tx close"""
+ self.server("TX=10M", "RX=0", "qclose=Y", "sclose=N")
+ self.client("TX=0", "RX=10M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamEarlyQcloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Early Qclose Rx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_early_qclose_rx(self):
+ """QUIC external server transfer, early rx close"""
+ self.server("TX=20M", "RX=0", "qclose=W", "sclose=W", "tx-results-diff")
+ self.client("TX=0", "RX=10M", "qclose=Y", "sclose=N")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamEarlyQcloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Early Qclose Tx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_early_qclose_tx(self):
+ """QUIC external server transfer, early tx close"""
+ self.server("TX=10M", "RX=0", "qclose=Y", "sclose=N")
+ self.client("TX=0", "RX=20M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamScloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Sclose Rx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_sclose_rx(self):
+ """QUIC external server transfer, rx stream close"""
+ self.server("TX=10M", "RX=0", "qclose=W", "sclose=W")
+ self.client("TX=0", "RX=10M", "qclose=N", "sclose=Y")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamScloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Sclose Tx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_sclose_tx(self):
+ """QUIC external server transfer, tx stream close"""
+ self.server("TX=10M", "RX=0", "qclose=Y", "sclose=Y")
+ self.client("TX=0", "RX=10M", "qclose=W", "sclose=W")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamEarlyScloseRxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream Early Sclose Rx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_early_sclose_rx(self):
+ """QUIC external server transfer, early rx stream close"""
+ self.server("TX=20M", "RX=0", "qclose=W", "sclose=W", "tx-results-diff")
+ self.client("TX=0", "RX=10M", "qclose=N", "sclose=Y")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamEarlyScloseTxTestCase(QUICEchoExtTestCase):
+ """QUIC Echo Ext Transfer Server Stream Early Sclose Tx Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_server_stream_early_sclose_tx(self):
+ """QUIC external server transfer, early tx stream close"""
+ self.server("TX=10M", "RX=0", "qclose=Y", "sclose=Y")
+ self.client("TX=0", "RX=20M", "qclose=W", "sclose=W", "rx-results-diff")
+ self.validate_ext_test_results()
+
+
+class QUICEchoExtServerStreamWorkersTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Server Stream MultiWorker Test Case"""
+
+ quic_setup = "serverstream"
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("testcase under development")
+ def test_quic_ext_transfer_server_stream_multi_workers(self):
+ """QUIC external server transfer, multi-worker"""
+ self.server("nclients", "4", "quic-streams", "4", "TX=10M", "RX=0")
+ self.client("nclients", "4", "quic-streams", "4", "TX=0", "RX=10M")
+ self.validate_ext_test_results()
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_session.py b/test/asf/test_session.py
new file mode 100644
index 00000000000..64f59df5758
--- /dev/null
+++ b/test/asf/test_session.py
@@ -0,0 +1,195 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import (
+ VppAsfTestCase,
+ VppTestRunner,
+ tag_fixme_vpp_workers,
+ tag_run_solo,
+)
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+
+
+@tag_fixme_vpp_workers
+class TestSession(VppAsfTestCase):
+ """Session Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSession, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSession, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSession, self).setUp()
+
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+
+ table_id = 0
+
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ i.set_table_ip4(table_id)
+ i.config_ip4()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="0", sw_if_index=self.loop0.sw_if_index
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", sw_if_index=self.loop1.sw_if_index
+ )
+
+ def tearDown(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.set_table_ip4(0)
+ i.admin_down()
+
+ super(TestSession, self).tearDown()
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def test_segment_manager_alloc(self):
+ """Session Segment Manager Multiple Segment Allocation"""
+
+ # Add inter-table routes
+ ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
+ )
+ ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
+ table_id=1,
+ )
+ ip_t01.add_vpp_config()
+ ip_t10.add_vpp_config()
+
+ # Start builtin server and client with small private segments
+ uri = "tcp://" + self.loop0.local_ip4 + "/1234"
+ error = self.vapi.cli(
+ "test echo server appns 0 fifo-size 64k "
+ + "private-segment-size 1m uri "
+ + uri
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ error = self.vapi.cli(
+ "test echo client nclients 100 appns 1 "
+ + "fifo-size 64k syn-timeout 2 "
+ + "private-segment-size 1m uri "
+ + uri
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ if self.vpp_dead:
+ self.assert_equal(0)
+
+ # Delete inter-table routes
+ ip_t01.remove_vpp_config()
+ ip_t10.remove_vpp_config()
+
+
+@tag_fixme_vpp_workers
+class TestSessionUnitTests(VppAsfTestCase):
+ """Session Unit Tests Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSessionUnitTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSessionUnitTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSessionUnitTests, self).setUp()
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def test_session(self):
+ """Session Unit Tests"""
+ error = self.vapi.cli("test session all")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def tearDown(self):
+ super(TestSessionUnitTests, self).tearDown()
+ self.vapi.session_enable_disable(is_enable=0)
+
+
+@tag_run_solo
+class TestSegmentManagerTests(VppAsfTestCase):
+ """SVM Fifo Unit Tests Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSegmentManagerTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSegmentManagerTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSegmentManagerTests, self).setUp()
+
+ def test_segment_manager(self):
+ """Segment manager Tests"""
+ error = self.vapi.cli("test segment-manager all")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def tearDown(self):
+ super(TestSegmentManagerTests, self).tearDown()
+
+
+@tag_run_solo
+class TestSvmFifoUnitTests(VppAsfTestCase):
+ """SVM Fifo Unit Tests Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSvmFifoUnitTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSvmFifoUnitTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSvmFifoUnitTests, self).setUp()
+
+ def test_svm_fifo(self):
+ """SVM Fifo Unit Tests"""
+ error = self.vapi.cli("test svm fifo all")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def tearDown(self):
+ super(TestSvmFifoUnitTests, self).tearDown()
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_sparse_vec.py b/test/asf/test_sparse_vec.py
new file mode 100644
index 00000000000..cf0afd8aaf3
--- /dev/null
+++ b/test/asf/test_sparse_vec.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestSparseVec(VppAsfTestCase):
+ """SparseVec Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSparseVec, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSparseVec, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSparseVec, self).setUp()
+
+ def tearDown(self):
+ super(TestSparseVec, self).tearDown()
+
+ def test_string_unittest(self):
+ """SparseVec unit tests"""
+ error = self.vapi.cli("test sparse_vec")
+ if error.find("failed") != -1:
+ self.logger.critical("FAILURE in the sparse_vec test")
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_string.py b/test/asf/test_string.py
new file mode 100644
index 00000000000..2eeecd7dfd8
--- /dev/null
+++ b/test/asf/test_string.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestString(VppAsfTestCase):
+ """String Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestString, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestString, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestString, self).setUp()
+
+ def tearDown(self):
+ super(TestString, self).tearDown()
+
+ def test_string_unittest(self):
+ """String unit tests"""
+ names = [
+ "memcpy_s",
+ "clib_memcmp",
+ "clib_memcpy",
+ "clib_memset",
+ "clib_strcmp",
+ "clib_strncmp",
+ "clib_strncpy",
+ "clib_strnlen",
+ "clib_strtok",
+ "memcmp_s",
+ "memcpy_s",
+ "memset_s ",
+ "strcat_s",
+ "strcmp_s",
+ "strcpy_s",
+ "strncat_s",
+ "strncmp_s",
+ "strncpy_s",
+ "strnlen_s",
+ "strstr_s",
+ "strtok_s",
+ ]
+
+ for name in names:
+ error = self.vapi.cli("test string " + name)
+ if error.find("failed") != -1:
+ self.logger.critical("FAILURE in the " + name + " test")
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_tap.py b/test/asf/test_tap.py
new file mode 100644
index 00000000000..c436ec6b6ae
--- /dev/null
+++ b/test/asf/test_tap.py
@@ -0,0 +1,37 @@
+import unittest
+import os
+
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_devices import VppTAPInterface
+
+
+def check_tuntap_driver_access():
+ return os.access("/dev/net/tun", os.R_OK and os.W_OK)
+
+
+@unittest.skip("Requires root")
+class TestTAP(VppAsfTestCase):
+ """TAP Test Case"""
+
+ def test_tap_add_del(self):
+ """Create TAP interface"""
+ tap0 = VppTAPInterface(self, tap_id=0)
+ tap0.add_vpp_config()
+ self.assertTrue(tap0.query_vpp_config())
+
+ def test_tap_dump(self):
+ """Test api dump w/ and w/o sw_if_index filtering"""
+ MAX_INSTANCES = 10
+ tap_instances = []
+ for instance in range(MAX_INSTANCES):
+ i = VppTAPInterface(self, tap_id=instance)
+ i.add_vpp_config()
+ tap_instances.append(i)
+ details = self.vapi.sw_interface_tap_v2_dump()
+ self.assertEqual(MAX_INSTANCES, len(details))
+ details = self.vapi.sw_interface_tap_v2_dump(tap_instances[5].sw_if_index)
+ self.assertEqual(1, len(details))
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_tcp.py b/test/asf/test_tcp.py
new file mode 100644
index 00000000000..69fc5c472a5
--- /dev/null
+++ b/test/asf/test_tcp.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+
+
+class TestTCP(VppAsfTestCase):
+ """TCP Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestTCP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestTCP, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestTCP, self).setUp()
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+
+ table_id = 0
+
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ i.set_table_ip4(table_id)
+ i.config_ip4()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="0", sw_if_index=self.loop0.sw_if_index
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", sw_if_index=self.loop1.sw_if_index
+ )
+
+ def tearDown(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.set_table_ip4(0)
+ i.admin_down()
+ self.vapi.session_enable_disable(is_enable=0)
+ super(TestTCP, self).tearDown()
+
+ def test_tcp_transfer(self):
+ """TCP echo client/server transfer"""
+
+ # Add inter-table routes
+ ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
+ )
+ ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
+ table_id=1,
+ )
+ ip_t01.add_vpp_config()
+ ip_t10.add_vpp_config()
+
+ # Start builtin server and client
+ uri = "tcp://" + self.loop0.local_ip4 + "/1234"
+ error = self.vapi.cli("test echo server appns 0 fifo-size 4k uri " + uri)
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ error = self.vapi.cli(
+ "test echo client mbytes 10 appns 1 "
+ + "fifo-size 4k test-bytes "
+ + "syn-timeout 2 uri "
+ + uri
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ # Delete inter-table routes
+ ip_t01.remove_vpp_config()
+ ip_t10.remove_vpp_config()
+
+
+class TestTCPUnitTests(VppAsfTestCase):
+ "TCP Unit Tests"
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestTCPUnitTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestTCPUnitTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestTCPUnitTests, self).setUp()
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def tearDown(self):
+ super(TestTCPUnitTests, self).tearDown()
+ self.vapi.session_enable_disable(is_enable=0)
+
+ def test_tcp_unittest(self):
+ """TCP Unit Tests"""
+ error = self.vapi.cli("test tcp all")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_tls.py b/test/asf/test_tls.py
new file mode 100644
index 00000000000..d2d1d9a4747
--- /dev/null
+++ b/test/asf/test_tls.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+
+import unittest
+import os
+import re
+import subprocess
+
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+
+
+def checkQat():
+ r = os.path.exists("/dev/qat_dev_processes")
+ if r:
+ return True
+ else:
+ # print("NO QAT! EXIT!")
+ return False
+
+
+def checkOpenSSLVersion():
+ ret = False
+ r = "OPENSSL_ROOT_DIR" in os.environ
+ if r:
+ ssl = os.environ["OPENSSL_ROOT_DIR"] + "/bin/openssl version"
+ p = subprocess.Popen(
+ ssl, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True
+ )
+ p.wait()
+ output = p.stdout.read()
+ status = p.returncode
+
+ if status:
+ pass
+ # print("openssl version error!")
+ else:
+ ssl_ver_src = re.findall(r"(\d+)\.+\d+.+\d+", output)
+ ssl_ver = int(ssl_ver_src[0])
+ if ssl_ver < 3:
+ ret = False
+ else:
+ ret = True
+ else:
+ # print("NO OPENSSL_ROOT_DIR!")
+ pass
+
+ return ret
+
+
+def checkAll():
+ ret = checkQat() & checkOpenSSLVersion()
+ return ret
+
+
+class TestTLS(VppAsfTestCase):
+ """TLS Qat Test Case."""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestTLS, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestTLS, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestTLS, self).setUp()
+
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+
+ table_id = 0
+
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ i.set_table_ip4(table_id)
+ i.config_ip4()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="0", sw_if_index=self.loop0.sw_if_index
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", sw_if_index=self.loop1.sw_if_index
+ )
+
+ def tearDown(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.set_table_ip4(0)
+ i.admin_down()
+ self.vapi.session_enable_disable(is_enable=0)
+ super(TestTLS, self).tearDown()
+
+ @unittest.skipUnless(checkAll(), "QAT or OpenSSL not satisfied,skip.")
+ def test_tls_transfer(self):
+ """TLS qat echo client/server transfer"""
+
+ # Add inter-table routes
+ ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
+ )
+
+ ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)],
+ table_id=1,
+ )
+ ip_t01.add_vpp_config()
+ ip_t10.add_vpp_config()
+
+ # Enable QAT engine and TLS async
+ r = self.vapi.tls_openssl_set_engine(
+ async_enable=1, engine="qat", algorithm="RSA,PKEY_CRYPTO", ciphers="RSA"
+ )
+ self.assertIsNotNone(r, "No response msg ")
+
+ # Start builtin server and client
+ uri = "tls://" + self.loop0.local_ip4 + "/1234"
+ error = self.vapi.cli(
+ "test echo server appns 0 fifo-size 4k tls-engine 1 uri " + uri
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ error = self.vapi.cli(
+ "test echo client mbytes 10 appns 1 "
+ "fifo-size 4k test-bytes "
+ "tls-engine 1 "
+ "syn-timeout 2 uri " + uri
+ )
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ # Delete inter-table routes
+ ip_t01.remove_vpp_config()
+ ip_t10.remove_vpp_config()
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_util.py b/test/asf/test_util.py
new file mode 100644
index 00000000000..57279f7934c
--- /dev/null
+++ b/test/asf/test_util.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python3
+"""Test framework utility functions tests"""
+
+import unittest
+from asfframework import VppTestRunner, CPUInterface
+from vpp_papi import mac_pton, mac_ntop
+
+
+class TestUtil(CPUInterface, unittest.TestCase):
+ """Test framework utility tests"""
+
+ @classmethod
+ def is_tagged_run_solo(cls):
+ """if the test case class is timing-sensitive - return true"""
+ return False
+
+ @classmethod
+ def has_tag(cls, tag):
+ """if the test case has a given tag - return true"""
+ try:
+ return tag in cls.test_tags
+ except AttributeError:
+ pass
+ return False
+
+ @classmethod
+ def get_cpus_required(cls):
+ return 0
+
+ def test_mac_to_binary(self):
+ """MAC to binary and back"""
+ mac = "aa:bb:cc:dd:ee:ff"
+ b = mac_pton(mac)
+ mac2 = mac_ntop(b)
+ self.assertEqual(type(mac), type(mac2))
+ self.assertEqual(mac2, mac)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_vapi.py b/test/asf/test_vapi.py
new file mode 100644
index 00000000000..a5635ff3567
--- /dev/null
+++ b/test/asf/test_vapi.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python3
+""" VAPI test """
+
+import unittest
+import os
+import signal
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
+
+
+class VAPITestCase(VppAsfTestCase):
+ """VAPI test"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VAPITestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VAPITestCase, cls).tearDownClass()
+
+ def run_vapi_c(self, path, transport):
+ executable = f"{config.vpp_build_dir}/vpp/bin/vapi_c_test"
+ worker = Worker([executable, "vapi client", path, transport], self.logger)
+ worker.start()
+ timeout = 60
+ worker.join(timeout)
+ self.logger.info("Worker result is `%s'" % worker.result)
+ error = False
+ if worker.result is None:
+ try:
+ error = True
+ self.logger.error("Timeout! Worker did not finish in %ss" % timeout)
+ os.killpg(os.getpgid(worker.process.pid), signal.SIGTERM)
+ worker.join()
+ except:
+ self.logger.debug("Couldn't kill worker-spawned process")
+ raise
+ if error:
+ raise Exception("Timeout! Worker did not finish in %ss" % timeout)
+ self.assert_equal(worker.result, 0, "Binary test return code")
+
+ def test_vapi_c_shm(self):
+ """run C VAPI tests (over shared memory)"""
+ self.run_vapi_c(self.get_api_segment_prefix(), "shm")
+
+ def test_vapi_c_uds(self):
+ """run C VAPI tests (over unix domain socket)"""
+ self.run_vapi_c(self.get_api_sock_path(), "uds")
+
+ def run_vapi_cpp(self, path, transport):
+ """run C++ VAPI tests"""
+ executable = f"{config.vpp_build_dir}/vpp/bin/vapi_cpp_test"
+ worker = Worker([executable, "vapi client", path, transport], self.logger)
+ worker.start()
+ timeout = 120
+ worker.join(timeout)
+ self.logger.info("Worker result is `%s'" % worker.result)
+ error = False
+ if worker.result is None:
+ try:
+ error = True
+ self.logger.error("Timeout! Worker did not finish in %ss" % timeout)
+ os.killpg(os.getpgid(worker.process.pid), signal.SIGTERM)
+ worker.join()
+ except:
+ raise Exception("Couldn't kill worker-spawned process")
+ if error:
+ raise Exception("Timeout! Worker did not finish in %ss" % timeout)
+ self.assert_equal(worker.result, 0, "Binary test return code")
+
+ def test_vapi_cpp_shm(self):
+ """run C++ VAPI tests (over shared memory)"""
+ self.run_vapi_cpp(self.get_api_segment_prefix(), "shm")
+
+ def test_vapi_cpp_uds(self):
+ """run C++ VAPI tests (over unix domain socket)"""
+ self.run_vapi_cpp(self.get_api_sock_path(), "uds")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py
new file mode 100644
index 00000000000..a1113b863e8
--- /dev/null
+++ b/test/asf/test_vcl.py
@@ -0,0 +1,1263 @@
+#!/usr/bin/env python3
+""" Vpp VCL tests """
+
+import unittest
+import os
+import subprocess
+import signal
+import glob
+from config import config
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+
+iperf3 = "/usr/bin/iperf3"
+
+
+def have_app(app):
+ try:
+ subprocess.check_output([app, "-v"])
+ except (subprocess.CalledProcessError, OSError):
+ return False
+ return True
+
+
+_have_iperf3 = have_app(iperf3)
+
+
+class VCLAppWorker(Worker):
+ """VCL Test Application Worker"""
+
+ libname = "libvcl_ldpreload.so"
+
+ class LibraryNotFound(Exception):
+ pass
+
+ def __init__(
+ self, appname, executable_args, logger, env=None, role=None, *args, **kwargs
+ ):
+ self.role = role
+ vcl_ldpreload_glob = f"{config.vpp_install_dir}/**/{self.libname}"
+ vcl_ldpreload_so = glob.glob(vcl_ldpreload_glob, recursive=True)
+
+ if len(vcl_ldpreload_so) < 1:
+ raise LibraryNotFound("cannot locate library: {}".format(self.libname))
+
+ vcl_ldpreload_so = vcl_ldpreload_so[0]
+
+ if env is None:
+ env = {}
+ if "iperf" in appname:
+ app = appname
+ env.update({"LD_PRELOAD": vcl_ldpreload_so})
+ elif "sock" in appname:
+ app = f"{config.vpp_build_dir}/vpp/bin/{appname}"
+ env.update({"LD_PRELOAD": vcl_ldpreload_so})
+ else:
+ app = f"{config.vpp_build_dir}/vpp/bin/{appname}"
+ self.args = [app] + executable_args
+ super(VCLAppWorker, self).__init__(self.args, logger, env, *args, **kwargs)
+
+
+class VCLTestCase(VppAsfTestCase):
+ """VCL Test Class"""
+
+ session_startup = ["poll-main"]
+
+ @classmethod
+ def setUpClass(cls):
+ if cls.session_startup:
+ conf = "session {" + " ".join(cls.session_startup) + "}"
+ cls.extra_vpp_config = [conf]
+ super(VCLTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ self.vppDebug = "vpp_debug" in config.vpp_install_dir
+ self.server_addr = "127.0.0.1"
+ self.server_port = "22000"
+ self.server_args = [self.server_port]
+ self.server_ipv6_addr = "::1"
+ self.server_ipv6_args = ["-6", self.server_port]
+ self.timeout = 20
+ self.echo_phrase = "Hello, world! Jenny is a friend of mine."
+ self.pre_test_sleep = 0.3
+ self.post_test_sleep = 1
+ self.sapi_client_sock = ""
+ self.sapi_server_sock = ""
+
+ if os.path.isfile("/tmp/ldp_server_af_unix_socket"):
+ os.remove("/tmp/ldp_server_af_unix_socket")
+
+ super(VCLTestCase, self).setUp()
+
+ def update_vcl_app_env(self, ns_id, ns_secret, attach_sock):
+ if not ns_id:
+ if "VCL_APP_NAMESPACE_ID" in self.vcl_app_env:
+ del self.vcl_app_env["VCL_APP_NAMESPACE_ID"]
+ else:
+ self.vcl_app_env["VCL_APP_NAMESPACE_ID"] = ns_id
+
+ if not ns_secret:
+ if "VCL_APP_NAMESPACE_SECRET" in self.vcl_app_env:
+ del self.vcl_app_env["VCL_APP_NAMESPACE_SECRET"]
+ else:
+ self.vcl_app_env["VCL_APP_NAMESPACE_SECRET"] = ns_secret
+
+ if not attach_sock:
+ self.vcl_app_env["VCL_VPP_API_SOCKET"] = self.get_api_sock_path()
+ if "VCL_VPP_SAPI_SOCKET" in self.vcl_app_env:
+ del self.vcl_app_env["VCL_VPP_SAPI_SOCKET"]
+ else:
+ sapi_sock = "%s/app_ns_sockets/%s" % (self.tempdir, attach_sock)
+ self.vcl_app_env["VCL_VPP_SAPI_SOCKET"] = sapi_sock
+ if "VCL_VPP_API_SOCKET" in self.vcl_app_env:
+ del self.vcl_app_env["VCL_VPP_API_SOCKET"]
+
+ def cut_thru_setup(self):
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def cut_thru_tear_down(self):
+ self.vapi.session_enable_disable(is_enable=0)
+
+ def cut_thru_test(self, server_app, server_args, client_app, client_args):
+ self.vcl_app_env = {"VCL_APP_SCOPE_LOCAL": "true"}
+
+ self.update_vcl_app_env("", "", self.sapi_server_sock)
+ worker_server = VCLAppWorker(
+ server_app, server_args, self.logger, self.vcl_app_env, "server"
+ )
+ worker_server.start()
+ self.sleep(self.pre_test_sleep)
+
+ self.update_vcl_app_env("", "", self.sapi_client_sock)
+ worker_client = VCLAppWorker(
+ client_app, client_args, self.logger, self.vcl_app_env, "client"
+ )
+ worker_client.start()
+ worker_client.join(self.timeout)
+ try:
+ self.validateResults(worker_client, worker_server, self.timeout)
+ except Exception as error:
+ self.fail("Failed with %s" % error)
+ self.sleep(self.post_test_sleep)
+
+ def thru_host_stack_setup(self):
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+
+ table_id = 1
+
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ i.set_table_ip4(table_id)
+ i.config_ip4()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", secret=1234, sw_if_index=self.loop0.sw_if_index
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="2", secret=5678, sw_if_index=self.loop1.sw_if_index
+ )
+
+ # Add inter-table routes
+ ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=2)],
+ table_id=1,
+ )
+ ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip4,
+ 32,
+ [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)],
+ table_id=2,
+ )
+ ip_t01.add_vpp_config()
+ ip_t10.add_vpp_config()
+ self.logger.debug(self.vapi.cli("show ip fib"))
+
+ def thru_host_stack_tear_down(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.set_table_ip4(0)
+ i.admin_down()
+ i.remove_vpp_config()
+
+ def thru_host_stack_ipv6_setup(self):
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+
+ table_id = 1
+
+ for i in self.lo_interfaces:
+ i.admin_up()
+
+ tbl = VppIpTable(self, table_id, is_ip6=1)
+ tbl.add_vpp_config()
+
+ i.set_table_ip6(table_id)
+ i.config_ip6()
+ table_id += 1
+
+ # Configure namespaces
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", secret=1234, sw_if_index=self.loop0.sw_if_index
+ )
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="2", secret=5678, sw_if_index=self.loop1.sw_if_index
+ )
+
+ # Add inter-table routes
+ ip_t01 = VppIpRoute(
+ self,
+ self.loop1.local_ip6,
+ 128,
+ [VppRoutePath("::0", 0xFFFFFFFF, nh_table_id=2)],
+ table_id=1,
+ )
+ ip_t10 = VppIpRoute(
+ self,
+ self.loop0.local_ip6,
+ 128,
+ [VppRoutePath("::0", 0xFFFFFFFF, nh_table_id=1)],
+ table_id=2,
+ )
+ ip_t01.add_vpp_config()
+ ip_t10.add_vpp_config()
+ self.logger.debug(self.vapi.cli("show interface addr"))
+ self.logger.debug(self.vapi.cli("show ip6 fib"))
+
+ def thru_host_stack_ipv6_tear_down(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip6()
+ i.set_table_ip6(0)
+ i.admin_down()
+
+ self.vapi.session_enable_disable(is_enable=0)
+
+ @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
+ def thru_host_stack_test(self, server_app, server_args, client_app, client_args):
+ self.vcl_app_env = {"VCL_APP_SCOPE_GLOBAL": "true"}
+
+ self.update_vcl_app_env("1", "1234", self.sapi_server_sock)
+ worker_server = VCLAppWorker(
+ server_app, server_args, self.logger, self.vcl_app_env, "server"
+ )
+ worker_server.start()
+ self.sleep(self.pre_test_sleep)
+
+ self.update_vcl_app_env("2", "5678", self.sapi_client_sock)
+ worker_client = VCLAppWorker(
+ client_app, client_args, self.logger, self.vcl_app_env, "client"
+ )
+ worker_client.start()
+ worker_client.join(self.timeout)
+
+ try:
+ self.validateResults(worker_client, worker_server, self.timeout)
+ except Exception as error:
+ self.fail("Failed with %s" % error)
+ self.sleep(self.post_test_sleep)
+
+ def validateResults(self, worker_client, worker_server, timeout):
+ if worker_server.process is None:
+ raise RuntimeError("worker_server is not running.")
+ if os.path.isdir("/proc/{}".format(worker_server.process.pid)):
+ self.logger.info(
+ "Killing server worker process (pid %d)" % worker_server.process.pid
+ )
+ os.killpg(os.getpgid(worker_server.process.pid), signal.SIGTERM)
+ worker_server.join()
+ self.logger.info("Client worker result is `%s'" % worker_client.result)
+ error = False
+ if worker_client.result is None:
+ try:
+ error = True
+ self.logger.error(
+ "Timeout: %ss! Killing client worker process (pid %d)"
+ % (timeout, worker_client.process.pid)
+ )
+ os.killpg(os.getpgid(worker_client.process.pid), signal.SIGKILL)
+ worker_client.join()
+ except OSError:
+ self.logger.debug("Couldn't kill client worker process")
+ raise
+ if error:
+ raise RuntimeError("Timeout! Client worker did not finish in %ss" % timeout)
+ self.assert_equal(worker_client.result, 0, "Binary test return code")
+
+
+class LDPCutThruTestCase(VCLTestCase):
+ """LDP Cut Thru Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.session_startup = ["poll-main", "use-app-socket-api"]
+ super(LDPCutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPCutThruTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(LDPCutThruTestCase, self).setUp()
+
+ self.cut_thru_setup()
+ self.client_echo_test_args = [
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.server_addr,
+ self.server_port,
+ ]
+ self.client_iperf3_timeout = 20
+ self.client_iperf3_args = ["-4", "-t 2", "-c", self.server_addr]
+ self.server_iperf3_args = ["-4", "-s"]
+ self.client_uni_dir_nsock_timeout = 20
+ self.client_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-I",
+ "2",
+ self.server_addr,
+ self.server_port,
+ ]
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-I",
+ "2",
+ self.server_addr,
+ self.server_port,
+ ]
+ self.sapi_client_sock = "default"
+ self.sapi_server_sock = "default"
+
+ def tearDown(self):
+ super(LDPCutThruTestCase, self).tearDown()
+ self.cut_thru_tear_down()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_ldp_cut_thru_echo(self):
+ """run LDP cut thru echo test"""
+
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_args,
+ "sock_test_client",
+ self.client_echo_test_args,
+ )
+
+ def test_ldp_cut_thru_iperf3(self):
+ """run LDP cut thru iperf3 test"""
+
+ self.timeout = self.client_iperf3_timeout
+ self.cut_thru_test(
+ iperf3, self.server_iperf3_args, iperf3, self.client_iperf3_args
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_ldp_cut_thru_uni_dir_nsock(self):
+ """run LDP cut thru uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_args,
+ "sock_test_client",
+ self.client_uni_dir_nsock_test_args,
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("sock test apps need to be improved")
+ def test_ldp_cut_thru_bi_dir_nsock(self):
+ """run LDP cut thru bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_args,
+ "sock_test_client",
+ self.client_bi_dir_nsock_test_args,
+ )
+
+
+class VCLCutThruTestCase(VCLTestCase):
+ """VCL Cut Thru Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLCutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLCutThruTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLCutThruTestCase, self).setUp()
+
+ self.cut_thru_setup()
+ self.client_echo_test_args = [
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.server_addr,
+ self.server_port,
+ ]
+
+ self.client_uni_dir_nsock_timeout = 20
+ self.client_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-I",
+ "2",
+ self.server_addr,
+ self.server_port,
+ ]
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-I",
+ "2",
+ self.server_addr,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ super(VCLCutThruTestCase, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def test_vcl_cut_thru_echo(self):
+ """run VCL cut thru echo test"""
+
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_echo_test_args,
+ )
+
+ def test_vcl_cut_thru_uni_dir_nsock(self):
+ """run VCL cut thru uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_uni_dir_nsock_test_args,
+ )
+
+ def test_vcl_cut_thru_bi_dir_nsock(self):
+ """run VCL cut thru bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_bi_dir_nsock_test_args,
+ )
+
+
+class VCLThruHostStackEcho(VCLTestCase):
+ """VCL Thru Host Stack Echo"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackEcho, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackEcho, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-I",
+ "2",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+ self.client_echo_test_args = [
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackEcho, self).tearDown()
+
+ def test_vcl_thru_host_stack_echo(self):
+ """run VCL IPv4 thru host stack echo test"""
+
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_echo_test_args,
+ )
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+class VCLThruHostStackTLS(VCLTestCase):
+ """VCL Thru Host Stack TLS"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.session_startup = ["poll-main", "use-app-socket-api"]
+ super(VCLThruHostStackTLS, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackTLS, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackTLS, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_uni_dir_tls_timeout = 20
+ self.server_tls_args = ["-L", self.server_port]
+ self.client_uni_dir_tls_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-L",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+ self.sapi_server_sock = "1"
+ self.sapi_client_sock = "2"
+
+ def test_vcl_thru_host_stack_tls_uni_dir(self):
+ """run VCL thru host stack uni-directional TLS test"""
+
+ self.timeout = self.client_uni_dir_tls_timeout
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_tls_args,
+ "vcl_test_client",
+ self.client_uni_dir_tls_test_args,
+ )
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackTLS, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+class VCLThruHostStackEchoInterruptMode(VCLThruHostStackEcho):
+ """VCL Thru Host Stack Echo interrupt mode"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.session_startup = ["use-private-rx-mqs", "use-app-socket-api"]
+ super(VCLThruHostStackEcho, cls).setUpClass()
+
+ def test_vcl_thru_host_stack_echo(self):
+ """run VCL IPv4 thru host stack echo test interrupt mode"""
+
+ self.sapi_server_sock = "1"
+ self.sapi_client_sock = "2"
+
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_echo_test_args,
+ )
+
+
+class VCLThruHostStackTLSInterruptMode(VCLThruHostStackTLS):
+ """VCL Thru Host Stack TLS interrupt mode"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.session_startup = ["poll-main", "use-app-socket-api", "use-private-rx-mqs"]
+ super(VCLThruHostStackTLS, cls).setUpClass()
+
+
+class VCLThruHostStackDTLS(VCLTestCase):
+ """VCL Thru Host Stack DTLS"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackDTLS, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackDTLS, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackDTLS, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_uni_dir_dtls_timeout = 20
+ self.server_dtls_args = ["-p", "dtls", self.server_port]
+ self.client_uni_dir_dtls_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-p",
+ "dtls",
+ "-T 1400",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def test_vcl_thru_host_stack_dtls_uni_dir(self):
+ """run VCL thru host stack uni-directional DTLS test"""
+
+ self.timeout = self.client_uni_dir_dtls_timeout
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_dtls_args,
+ "vcl_test_client",
+ self.client_uni_dir_dtls_test_args,
+ )
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackDTLS, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+class VCLThruHostStackQUIC(VCLTestCase):
+ """VCL Thru Host Stack QUIC"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.extra_vpp_plugin_config.append("plugin quic_plugin.so { enable }")
+ super(VCLThruHostStackQUIC, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackQUIC, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackQUIC, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_uni_dir_quic_timeout = 20
+ self.server_quic_args = ["-p", "quic", self.server_port]
+ self.client_uni_dir_quic_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-p",
+ "quic",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vcl_thru_host_stack_quic_uni_dir(self):
+ """run VCL thru host stack uni-directional QUIC test"""
+
+ self.timeout = self.client_uni_dir_quic_timeout
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_quic_args,
+ "vcl_test_client",
+ self.client_uni_dir_quic_test_args,
+ )
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackQUIC, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+class VCLThruHostStackBidirNsock(VCLTestCase):
+ """VCL Thru Host Stack Bidir Nsock"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackBidirNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackBidirNsock, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackBidirNsock, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-I",
+ "2",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+ self.client_echo_test_args = [
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackBidirNsock, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def test_vcl_thru_host_stack_bi_dir_nsock(self):
+ """run VCL thru host stack bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_bi_dir_nsock_test_args,
+ )
+
+
+class LDPThruHostStackBidirNsock(VCLTestCase):
+ """LDP Thru Host Stack Bidir Nsock"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackBidirNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackBidirNsock, cls).tearDownClass()
+
+ def setUp(self):
+ super(LDPThruHostStackBidirNsock, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ # OUCH! Host Stack Bug?
+ # Only fails when running
+ # 'make test TEST_JOBS=auto'
+ # or TEST_JOBS > 1
+ # "-I", "2",
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(LDPThruHostStackBidirNsock, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def test_ldp_thru_host_stack_bi_dir_nsock(self):
+ """run LDP thru host stack bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.thru_host_stack_test(
+ "sock_test_server",
+ self.server_args,
+ "sock_test_client",
+ self.client_bi_dir_nsock_test_args,
+ )
+
+
+class LDPThruHostStackNsock(VCLTestCase):
+ """LDP Thru Host Stack Nsock"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackNsock, cls).tearDownClass()
+
+ def setUp(self):
+ super(LDPThruHostStackNsock, self).setUp()
+
+ self.thru_host_stack_setup()
+ if self.vppDebug:
+ self.client_uni_dir_nsock_timeout = 20
+ self.numSockets = "2"
+ else:
+ self.client_uni_dir_nsock_timeout = 20
+ self.numSockets = "5"
+
+ self.client_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-I",
+ self.numSockets,
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(LDPThruHostStackNsock, self).tearDown()
+
+ def test_ldp_thru_host_stack_uni_dir_nsock(self):
+ """run LDP thru host stack uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.thru_host_stack_test(
+ "sock_test_server",
+ self.server_args,
+ "sock_test_client",
+ self.client_uni_dir_nsock_test_args,
+ )
+
+
+class VCLThruHostStackNsock(VCLTestCase):
+ """VCL Thru Host Stack Nsock"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackNsock, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackNsock, self).setUp()
+
+ self.thru_host_stack_setup()
+ if self.vppDebug:
+ self.client_uni_dir_nsock_timeout = 20
+ self.numSockets = "2"
+ else:
+ self.client_uni_dir_nsock_timeout = 20
+ self.numSockets = "5"
+
+ self.client_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-I",
+ self.numSockets,
+ self.loop0.local_ip4,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackNsock, self).tearDown()
+
+ def test_vcl_thru_host_stack_uni_dir_nsock(self):
+ """run VCL thru host stack uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_args,
+ "vcl_test_client",
+ self.client_uni_dir_nsock_test_args,
+ )
+
+
+class LDPThruHostStackIperf(VCLTestCase):
+ """LDP Thru Host Stack Iperf"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackIperf, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackIperf, cls).tearDownClass()
+
+ def setUp(self):
+ super(LDPThruHostStackIperf, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_iperf3_timeout = 20
+ self.client_iperf3_args = ["-4", "-t 2", "-c", self.loop0.local_ip4]
+ self.server_iperf3_args = ["-4", "-s"]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(LDPThruHostStackIperf, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
+ def test_ldp_thru_host_stack_iperf3(self):
+ """run LDP thru host stack iperf3 test"""
+
+ self.timeout = self.client_iperf3_timeout
+ self.thru_host_stack_test(
+ iperf3, self.server_iperf3_args, iperf3, self.client_iperf3_args
+ )
+
+ @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
+ def test_ldp_thru_host_stack_iperf3_mss(self):
+ """run LDP thru host stack iperf3 test with mss option"""
+
+ self.timeout = self.client_iperf3_timeout
+ self.client_iperf3_args.append("-M 1000")
+ self.thru_host_stack_test(
+ iperf3, self.server_iperf3_args, iperf3, self.client_iperf3_args
+ )
+
+
+class LDPThruHostStackIperfUdp(VCLTestCase):
+ """LDP Thru Host Stack Iperf UDP"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackIperfUdp, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackIperfUdp, cls).tearDownClass()
+
+ def setUp(self):
+ super(LDPThruHostStackIperfUdp, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_iperf3_timeout = 20
+ self.client_iperf3_args = [
+ "-4",
+ "-t 2",
+ "-u",
+ "-l 1400",
+ "-P 2",
+ "-c",
+ self.loop0.local_ip4,
+ ]
+ self.server_iperf3_args = ["-4", "-s"]
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(LDPThruHostStackIperfUdp, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
+ def test_ldp_thru_host_stack_iperf3_udp(self):
+ """run LDP thru host stack iperf3 UDP test"""
+
+ self.timeout = self.client_iperf3_timeout
+ self.thru_host_stack_test(
+ iperf3, self.server_iperf3_args, iperf3, self.client_iperf3_args
+ )
+
+
+class LDPIpv6CutThruTestCase(VCLTestCase):
+ """LDP IPv6 Cut Thru Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(LDPIpv6CutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPIpv6CutThruTestCase, cls).tearDownClass()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def setUp(self):
+ super(LDPIpv6CutThruTestCase, self).setUp()
+
+ self.cut_thru_setup()
+ self.client_iperf3_timeout = 20
+ self.client_uni_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_ipv6_echo_test_args = [
+ "-6",
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+ self.client_ipv6_iperf3_args = ["-6", "-t 2", "-c", self.server_ipv6_addr]
+ self.server_ipv6_iperf3_args = ["-6", "-s"]
+ self.client_ipv6_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-6",
+ "-I",
+ "2",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+ self.client_ipv6_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-6",
+ "-I",
+ "2",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ super(LDPIpv6CutThruTestCase, self).tearDown()
+ self.cut_thru_tear_down()
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_ldp_ipv6_cut_thru_echo(self):
+ """run LDP IPv6 cut thru echo test"""
+
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_ipv6_args,
+ "sock_test_client",
+ self.client_ipv6_echo_test_args,
+ )
+
+ @unittest.skipUnless(_have_iperf3, "'%s' not found, Skipping.")
+ def test_ldp_ipv6_cut_thru_iperf3(self):
+ """run LDP IPv6 cut thru iperf3 test"""
+
+ self.timeout = self.client_iperf3_timeout
+ self.cut_thru_test(
+ iperf3, self.server_ipv6_iperf3_args, iperf3, self.client_ipv6_iperf3_args
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_ldp_ipv6_cut_thru_uni_dir_nsock(self):
+ """run LDP IPv6 cut thru uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_ipv6_args,
+ "sock_test_client",
+ self.client_ipv6_uni_dir_nsock_test_args,
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ @unittest.skip("sock test apps need to be improved")
+ def test_ldp_ipv6_cut_thru_bi_dir_nsock(self):
+ """run LDP IPv6 cut thru bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.cut_thru_test(
+ "sock_test_server",
+ self.server_ipv6_args,
+ "sock_test_client",
+ self.client_ipv6_bi_dir_nsock_test_args,
+ )
+
+
+class VCLIpv6CutThruTestCase(VCLTestCase):
+ """VCL IPv6 Cut Thru Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLIpv6CutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLIpv6CutThruTestCase, cls).tearDownClass()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def setUp(self):
+ super(VCLIpv6CutThruTestCase, self).setUp()
+
+ self.cut_thru_setup()
+ self.client_uni_dir_nsock_timeout = 20
+ self.client_bi_dir_nsock_timeout = 20
+ self.client_ipv6_echo_test_args = [
+ "-6",
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+ self.client_ipv6_uni_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-U",
+ "-X",
+ "-6",
+ "-I",
+ "2",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+ self.client_ipv6_bi_dir_nsock_test_args = [
+ "-N",
+ "1000",
+ "-B",
+ "-X",
+ "-6",
+ "-I",
+ "2",
+ self.server_ipv6_addr,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ super(VCLIpv6CutThruTestCase, self).tearDown()
+ self.cut_thru_tear_down()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+ def test_vcl_ipv6_cut_thru_echo(self):
+ """run VCL IPv6 cut thru echo test"""
+
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_ipv6_args,
+ "vcl_test_client",
+ self.client_ipv6_echo_test_args,
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vcl_ipv6_cut_thru_uni_dir_nsock(self):
+ """run VCL IPv6 cut thru uni-directional (multiple sockets) test"""
+
+ self.timeout = self.client_uni_dir_nsock_timeout
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_ipv6_args,
+ "vcl_test_client",
+ self.client_ipv6_uni_dir_nsock_test_args,
+ )
+
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vcl_ipv6_cut_thru_bi_dir_nsock(self):
+ """run VCL IPv6 cut thru bi-directional (multiple sockets) test"""
+
+ self.timeout = self.client_bi_dir_nsock_timeout
+ self.cut_thru_test(
+ "vcl_test_server",
+ self.server_ipv6_args,
+ "vcl_test_client",
+ self.client_ipv6_bi_dir_nsock_test_args,
+ )
+
+
+class VCLIpv6ThruHostStackEcho(VCLTestCase):
+ """VCL IPv6 Thru Host Stack Echo"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLIpv6ThruHostStackEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLIpv6ThruHostStackEcho, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLIpv6ThruHostStackEcho, self).setUp()
+
+ self.thru_host_stack_ipv6_setup()
+ self.client_ipv6_echo_test_args = [
+ "-6",
+ "-E",
+ self.echo_phrase,
+ "-X",
+ self.loop0.local_ip6,
+ self.server_port,
+ ]
+
+ def tearDown(self):
+ self.thru_host_stack_ipv6_tear_down()
+ super(VCLIpv6ThruHostStackEcho, self).tearDown()
+
+ def test_vcl_ipv6_thru_host_stack_echo(self):
+ """run VCL IPv6 thru host stack echo test"""
+
+ self.thru_host_stack_test(
+ "vcl_test_server",
+ self.server_ipv6_args,
+ "vcl_test_client",
+ self.client_ipv6_echo_test_args,
+ )
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_vhost.py b/test/asf/test_vhost.py
new file mode 100644
index 00000000000..622716cafe3
--- /dev/null
+++ b/test/asf/test_vhost.py
@@ -0,0 +1,145 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+from vpp_vhost_interface import VppVhostInterface
+
+
+class TesVhostInterface(VppAsfTestCase):
+ """Vhost User Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TesVhostInterface, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TesVhostInterface, cls).tearDownClass()
+
+ def tearDown(self):
+ super(TesVhostInterface, self).tearDown()
+ if not self.vpp_dead:
+ if_dump = self.vapi.sw_interface_vhost_user_dump()
+ for ifc in if_dump:
+ self.vapi.delete_vhost_user_if(ifc.sw_if_index)
+
+ def test_vhost(self):
+ """Vhost User add/delete interface test"""
+ self.logger.info("Vhost User add interfaces")
+
+ # create interface 1 (VirtualEthernet0/0/0)
+ vhost_if1 = VppVhostInterface(self, sock_filename="/tmp/sock1")
+ vhost_if1.add_vpp_config()
+ vhost_if1.admin_up()
+
+ # create interface 2 (VirtualEthernet0/0/1)
+ vhost_if2 = VppVhostInterface(self, sock_filename="/tmp/sock2")
+ vhost_if2.add_vpp_config()
+ vhost_if2.admin_up()
+
+ # verify both interfaces in the show
+ ifs = self.vapi.cli("show interface")
+ self.assertIn("VirtualEthernet0/0/0", ifs)
+ self.assertIn("VirtualEthernet0/0/1", ifs)
+
+ # verify they are in the dump also
+ if_dump = self.vapi.sw_interface_vhost_user_dump()
+ self.assertTrue(vhost_if1.is_interface_config_in_dump(if_dump))
+ self.assertTrue(vhost_if2.is_interface_config_in_dump(if_dump))
+
+ # delete VirtualEthernet0/0/1
+ self.logger.info("Deleting VirtualEthernet0/0/1")
+ vhost_if2.remove_vpp_config()
+
+ self.logger.info("Verifying VirtualEthernet0/0/1 is deleted")
+
+ ifs = self.vapi.cli("show interface")
+ # verify VirtualEthernet0/0/0 still in the show
+ self.assertIn("VirtualEthernet0/0/0", ifs)
+
+ # verify VirtualEthernet0/0/1 not in the show
+ self.assertNotIn("VirtualEthernet0/0/1", ifs)
+
+ # verify VirtualEthernet0/0/1 is not in the dump
+ if_dump = self.vapi.sw_interface_vhost_user_dump()
+ self.assertFalse(vhost_if2.is_interface_config_in_dump(if_dump))
+
+ # verify VirtualEthernet0/0/0 is still in the dump
+ self.assertTrue(vhost_if1.is_interface_config_in_dump(if_dump))
+
+ # delete VirtualEthernet0/0/0
+ self.logger.info("Deleting VirtualEthernet0/0/0")
+ vhost_if1.remove_vpp_config()
+
+ self.logger.info("Verifying VirtualEthernet0/0/0 is deleted")
+
+ # verify VirtualEthernet0/0/0 not in the show
+ ifs = self.vapi.cli("show interface")
+ self.assertNotIn("VirtualEthernet0/0/0", ifs)
+
+ # verify VirtualEthernet0/0/0 is not in the dump
+ if_dump = self.vapi.sw_interface_vhost_user_dump()
+ self.assertFalse(vhost_if1.is_interface_config_in_dump(if_dump))
+
+ def test_vhost_interface_state(self):
+ """Vhost User interface states and events test"""
+
+ self.vapi.want_interface_events()
+
+ # clear outstanding events
+ # (like delete interface events from other tests)
+ self.vapi.collect_events()
+
+ vhost_if = VppVhostInterface(self, sock_filename="/tmp/sock1")
+
+ # create vhost interface
+ vhost_if.add_vpp_config()
+ self.sleep(0.1)
+ events = self.vapi.collect_events()
+ # creating interface does now create events
+ self.assert_equal(len(events), 1, "number of events")
+
+ vhost_if.admin_up()
+ vhost_if.assert_interface_state(1, 0, expect_event=True)
+
+ vhost_if.admin_down()
+ vhost_if.assert_interface_state(0, 0, expect_event=True)
+
+ # delete vhost interface
+ vhost_if.remove_vpp_config()
+ event = self.vapi.wait_for_event(timeout=1)
+ self.assert_equal(event.sw_if_index, vhost_if.sw_if_index, "sw_if_index")
+ self.assert_equal(event.deleted, 1, "deleted flag")
+
+ # verify there are no more events
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 0, "number of events")
+
+ def test_vhost_interface_custom_mac_addr(self):
+ """Vhost User interface custom mac address test"""
+
+ mac_addr = "aa:bb:cc:dd:ee:ff"
+ vhost_if = VppVhostInterface(
+ self, sock_filename="/tmp/sock1", use_custom_mac=1, mac_address=mac_addr
+ )
+
+ # create vhost interface
+ vhost_if.add_vpp_config()
+ self.sleep(0.1)
+
+ # verify mac in the dump
+ if_dump_list = self.vapi.sw_interface_dump(sw_if_index=vhost_if.sw_if_index)
+ self.assert_equal(len(if_dump_list), 1, "if dump length")
+
+ [if_dump] = if_dump_list
+ self.assert_equal(if_dump.l2_address.mac_string, mac_addr, "MAC Address")
+
+ # delete VirtualEthernet
+ self.logger.info("Deleting VirtualEthernet")
+ vhost_if.remove_vpp_config()
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_vpe_api.py b/test/asf/test_vpe_api.py
new file mode 100644
index 00000000000..4d866ec906a
--- /dev/null
+++ b/test/asf/test_vpe_api.py
@@ -0,0 +1,54 @@
+# Copyright (c) 2019. Vinci Consulting Corp. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import time
+from asfframework import VppAsfTestCase
+
+enable_print = False
+
+
+class TestVpeApi(VppAsfTestCase):
+ """TestVpeApi"""
+
+ def test_log_dump_default(self):
+ rv = self.vapi.cli("test log notice fib entry this is a test")
+ rv = self.vapi.log_dump()
+ if enable_print:
+ print("\n".join([str(v) for v in rv]))
+ self.assertTrue(rv)
+
+ def test_log_dump_timestamp_0(self):
+ rv = self.vapi.cli("test log notice fib entry this is a test")
+ rv = self.vapi.log_dump(start_timestamp=0.0)
+ if enable_print:
+ print("\n".join([str(v) for v in rv]))
+ self.assertTrue(rv)
+
+ def test_log_dump_timestamp_future(self):
+ rv = self.vapi.cli("test log debug fib entry test")
+ rv = self.vapi.log_dump(start_timestamp=time.time() + 60.0)
+ if enable_print:
+ print("\n".join([str(v) for v in rv]))
+ self.assertFalse(rv)
+
+ def test_show_vpe_system_time(self):
+ local_start_time = datetime.datetime.now()
+ rv = self.vapi.show_vpe_system_time()
+ self.assertTrue(
+ rv.vpe_system_time > local_start_time - datetime.timedelta(hours=1.0),
+ "system times differ by more than an hour.",
+ )
+ if enable_print:
+ print("\n".join([str(v) for v in rv]))
+ print("%r %s" % (rv.vpe_system_time, rv.vpe_system_time))
diff --git a/test/asf/test_vppinfra.py b/test/asf/test_vppinfra.py
new file mode 100644
index 00000000000..56391bfb13c
--- /dev/null
+++ b/test/asf/test_vppinfra.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+
+import unittest
+
+from asfframework import VppAsfTestCase, VppTestRunner
+
+
+class TestVppinfra(VppAsfTestCase):
+ """Vppinfra Unit Test Cases"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestVppinfra, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestVppinfra, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestVppinfra, self).setUp()
+
+ def tearDown(self):
+ super(TestVppinfra, self).tearDown()
+
+ def test_bitmap_unittest(self):
+ """Bitmap unit tests"""
+
+ cmds = ["test bitmap"]
+
+ for cmd in cmds:
+ error = self.vapi.cli(cmd)
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)