aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2021-04-08 19:37:41 +0200
committerAndrew Yourtchenko <ayourtch@gmail.com>2021-04-16 09:26:33 +0000
commit558ceabc6c08f275ce6c6e7ff295e74272eb851a (patch)
tree778bb375cdd8f3b11537b6b638a4b00f53211ba5
parentf70cf2337683a97b06fe30ea56fab9ebab556ae7 (diff)
tests: cpus awareness
Introduce MAX_CPUS parameters to control maximum number of CPUs used by VPP(s) during testing, with default value 'auto' corresponding to all CPUs available. Calculate test CPU requirements by taking into account the number of workers, so a test requires 1 (main thread) + # of worker CPUs. When running tests, keep track of both running test jobs (controlled by TEST_JOBS parameter) and free CPUs. This then causes two limits in the system - to not exceed number of jobs in parallel but also to not exceed number of CPUs available. Skip tests which require more CPUs than are available in system (or more than MAX_CPUS) and print a warning message. Type: improvement Change-Id: Ib8fda54e4c6a36179d64160bb87fbd3a0011762d Signed-off-by: Klement Sekera <ksekera@cisco.com>
-rw-r--r--src/plugins/memif/test/test_memif.py15
-rw-r--r--test/Makefile3
-rw-r--r--test/cpu_config.py21
-rw-r--r--test/framework.py135
-rw-r--r--test/log.py2
-rw-r--r--test/run_tests.py270
-rw-r--r--test/sanity_run_vpp.py2
-rwxr-xr-xtest/test_util.py8
8 files changed, 296 insertions, 160 deletions
diff --git a/src/plugins/memif/test/test_memif.py b/src/plugins/memif/test/test_memif.py
index caaab87f1e8..fc7cf9b2e7e 100644
--- a/src/plugins/memif/test/test_memif.py
+++ b/src/plugins/memif/test/test_memif.py
@@ -16,11 +16,24 @@ from vpp_papi import VppEnum
@tag_run_solo
class TestMemif(VppTestCase):
""" Memif Test Case """
+ remote_class = RemoteVppTestCase
+
+ @classmethod
+ def get_cpus_required(cls):
+ return (super().get_cpus_required() +
+ cls.remote_class.get_cpus_required())
+
+ @classmethod
+ def assign_cpus(cls, cpus):
+ remote_cpus = cpus[:cls.remote_class.get_cpus_required()]
+ my_cpus = cpus[cls.remote_class.get_cpus_required():]
+ cls.remote_class.assign_cpus(remote_cpus)
+ super().assign_cpus(my_cpus)
@classmethod
def setUpClass(cls):
# fork new process before client connects to VPP
- cls.remote_test = RemoteClass(RemoteVppTestCase)
+ cls.remote_test = RemoteClass(cls.remote_class)
cls.remote_test.start_remote()
cls.remote_test.set_request_timeout(10)
super(TestMemif, cls).setUpClass()
diff --git a/test/Makefile b/test/Makefile
index dc6aa096dd0..0ee61a23c0b 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -366,7 +366,8 @@ help:
@echo "Arguments controlling test runs:"
@echo " V=[0|1|2] - set test verbosity level"
@echo " 0=ERROR, 1=INFO, 2=DEBUG"
- @echo " TEST_JOBS=[<n>|auto] - use <n> parallel processes for test execution or automatic discovery of maximum acceptable processes (default: 1)"
+ @echo " TEST_JOBS=[<n>|auto] - use at most <n> parallel python processes for test execution, if auto, set to number of available cpus (default: 1)"
+ @echo " MAX_VPP_CPUS=[<n>|auto]- use at most <n> cpus for running vpp main and worker threads, if auto, set to number of available cpus (default: auto)"
@echo " CACHE_OUTPUT=[0|1] - cache VPP stdout/stderr and log as one block after test finishes (default: 1)"
@echo " FAILFAST=[0|1] - fail fast if 1, complete all tests if 0"
@echo " TIMEOUT=<timeout> - fail test suite if any single test takes longer than <timeout> (in seconds) to finish (default: 600)"
diff --git a/test/cpu_config.py b/test/cpu_config.py
new file mode 100644
index 00000000000..b4e5d1ab767
--- /dev/null
+++ b/test/cpu_config.py
@@ -0,0 +1,21 @@
+import os
+import psutil
+
+available_cpus = psutil.Process().cpu_affinity()
+num_cpus = len(available_cpus)
+
+max_vpp_cpus = os.getenv("MAX_VPP_CPUS", "auto").lower()
+
+if max_vpp_cpus == "auto":
+ max_vpp_cpus = num_cpus
+else:
+ try:
+ max_vpp_cpus = int(max_vpp_cpus)
+ except ValueError as e:
+ raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+ "values are a positive integer or 'auto'") from e
+ if max_vpp_cpus <= 0:
+ raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+ "values are a positive integer or 'auto'")
+ if max_vpp_cpus > num_cpus:
+ max_vpp_cpus = num_cpus
diff --git a/test/framework.py b/test/framework.py
index 67ac495547c..1cbd814aa8d 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -22,6 +22,7 @@ from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
+from abc import ABC, abstractmethod
import scapy.compat
from scapy.packet import Raw
@@ -42,6 +43,8 @@ from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+from cpu_config import available_cpus, num_cpus, max_vpp_cpus
+
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
@@ -53,6 +56,7 @@ FAIL = 1
ERROR = 2
SKIP = 3
TEST_RUN = 4
+SKIP_CPU_SHORTAGE = 5
class BoolEnvironmentVariable(object):
@@ -223,6 +227,21 @@ def _running_gcov_tests():
running_gcov_tests = _running_gcov_tests()
+def get_environ_vpp_worker_count():
+ worker_config = os.getenv("VPP_WORKER_CONFIG", None)
+ if worker_config:
+ elems = worker_config.split(" ")
+ if elems[0] != "workers" or len(elems) != 2:
+ raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
+ worker_config)
+ return int(elems[1])
+ else:
+ return 0
+
+
+environ_vpp_worker_count = get_environ_vpp_worker_count()
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
@@ -292,7 +311,21 @@ class DummyVpp:
pass
-class VppTestCase(unittest.TestCase):
+class CPUInterface(ABC):
+ cpus = []
+ skipped_due_to_cpu_lack = False
+
+ @classmethod
+ @abstractmethod
+ def get_cpus_required(cls):
+ pass
+
+ @classmethod
+ def assign_cpus(cls, cpus):
+ cls.cpus = cpus
+
+
+class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
@@ -358,34 +391,18 @@ class VppTestCase(unittest.TestCase):
if dl == "gdb-all" or dl == "gdbserver-all":
cls.debug_all = True
- @staticmethod
- def get_least_used_cpu():
- cpu_usage_list = [set(range(psutil.cpu_count()))]
- vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
- if 'vpp_main' == p.info['name']]
- for vpp_process in vpp_processes:
- for cpu_usage_set in cpu_usage_list:
- try:
- cpu_num = vpp_process.cpu_num()
- if cpu_num in cpu_usage_set:
- cpu_usage_set_index = cpu_usage_list.index(
- cpu_usage_set)
- if cpu_usage_set_index == len(cpu_usage_list) - 1:
- cpu_usage_list.append({cpu_num})
- else:
- cpu_usage_list[cpu_usage_set_index + 1].add(
- cpu_num)
- cpu_usage_set.remove(cpu_num)
- break
- except psutil.NoSuchProcess:
- pass
-
- for cpu_usage_set in cpu_usage_list:
- if len(cpu_usage_set) > 0:
- min_usage_set = cpu_usage_set
- break
+ @classmethod
+ def get_vpp_worker_count(cls):
+ if not hasattr(cls, "vpp_worker_count"):
+ if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+ cls.vpp_worker_count = 0
+ else:
+ cls.vpp_worker_count = environ_vpp_worker_count
+ return cls.vpp_worker_count
- return random.choice(tuple(min_usage_set))
+ @classmethod
+ def get_cpus_required(cls):
+ return 1 + cls.get_vpp_worker_count()
@classmethod
def setUpConstants(cls):
@@ -417,20 +434,6 @@ class VppTestCase(unittest.TestCase):
if coredump_size is None:
coredump_size = "coredump-size unlimited"
- cpu_core_number = cls.get_least_used_cpu()
- if not hasattr(cls, "vpp_worker_count"):
- cls.vpp_worker_count = 0
- worker_config = os.getenv("VPP_WORKER_CONFIG", "")
- if worker_config:
- elems = worker_config.split(" ")
- if elems[0] != "workers" or len(elems) != 2:
- raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
- worker_config)
- cls.vpp_worker_count = int(elems[1])
- if cls.vpp_worker_count > 0 and\
- cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- cls.vpp_worker_count = 0
-
default_variant = os.getenv("VARIANT")
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
@@ -447,9 +450,10 @@ class VppTestCase(unittest.TestCase):
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
- "cpu", "{", "main-core", str(cpu_core_number), ]
- if cls.vpp_worker_count:
- cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)])
+ "cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if cls.get_vpp_worker_count():
+ cls.vpp_cmdline.extend([
+ "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
cls.vpp_cmdline.extend([
"}",
"physmem", "{", "max-size", "32m", "}",
@@ -509,11 +513,12 @@ class VppTestCase(unittest.TestCase):
@classmethod
def run_vpp(cls):
+ cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
if cls.debug_gdbserver:
gdbserver = '/usr/bin/gdbserver'
- if not os.path.isfile(gdbserver) or \
+ if not os.path.isfile(gdbserver) or\
not os.access(gdbserver, os.X_OK):
raise Exception("gdbserver binary '%s' does not exist or is "
"not executable" % gdbserver)
@@ -1349,6 +1354,7 @@ class VppTestResult(unittest.TestResult):
self.verbosity = verbosity
self.result_string = None
self.runner = runner
+ self.printed = []
def addSuccess(self, test):
"""
@@ -1383,7 +1389,10 @@ class VppTestResult(unittest.TestResult):
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
- self.send_result_through_pipe(test, SKIP)
+ if reason == "not enough cpus":
+ self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
+ else:
+ self.send_result_through_pipe(test, SKIP)
def symlink_failed(self):
if self.current_test_case_info:
@@ -1501,28 +1510,34 @@ class VppTestResult(unittest.TestResult):
"""
def print_header(test):
+ if test.__class__ in self.printed:
+ return
+
test_doc = getdoc(test)
if not test_doc:
raise Exception("No doc string for test '%s'" % test.id())
+
test_title = test_doc.splitlines()[0]
- test_title_colored = colorize(test_title, GREEN)
+ test_title = colorize(test_title, GREEN)
if test.is_tagged_run_solo():
- # long live PEP-8 and 80 char width limitation...
- c = YELLOW
- test_title_colored = colorize("SOLO RUN: " + test_title, c)
+ test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
# This block may overwrite the colorized title above,
# but we want this to stand out and be fixed
if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- c = RED
- w = "FIXME with VPP workers: "
- test_title_colored = colorize(w + test_title, c)
-
- if not hasattr(test.__class__, '_header_printed'):
- print(double_line_delim)
- print(test_title_colored)
- print(double_line_delim)
- test.__class__._header_printed = True
+ test_title = colorize(
+ f"FIXME with VPP workers: {test_title}", RED)
+
+ if test.__class__.skipped_due_to_cpu_lack:
+ test_title = colorize(
+ f"{test_title} [skipped - not enough cpus, "
+ f"required={test.__class__.get_cpus_required()}, "
+ f"available={max_vpp_cpus}]", YELLOW)
+
+ print(double_line_delim)
+ print(test_title)
+ print(double_line_delim)
+ self.printed.append(test.__class__)
print_header(test)
self.start_test = time.time()
diff --git a/test/log.py b/test/log.py
index b0fe037e1ba..5fd91c794e8 100644
--- a/test/log.py
+++ b/test/log.py
@@ -11,7 +11,7 @@ single_line_delim = '-' * 78
def colorize(msg, color):
- return color + msg + COLOR_RESET
+ return f"{color}{msg}{COLOR_RESET}"
class ColorFormatter(logging.Formatter):
diff --git a/test/run_tests.py b/test/run_tests.py
index 008828c1e86..5d091ad253f 100644
--- a/test/run_tests.py
+++ b/test/run_tests.py
@@ -9,22 +9,21 @@ import argparse
import time
import threading
import signal
-import psutil
import re
-import multiprocessing
-from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing import Process, Pipe, get_context
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
import framework
from framework import VppTestRunner, running_extended_tests, VppTestCase, \
get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
- TEST_RUN
+ TEST_RUN, SKIP_CPU_SHORTAGE
from debug import spawn_gdb, start_vpp_in_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
colorize, single_line_delim
from discover_tests import discover_tests
from subprocess import check_output, CalledProcessError
from util import check_core_path, get_core_path, is_core_present
+from cpu_config import num_cpus, max_vpp_cpus, available_cpus
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
@@ -59,6 +58,7 @@ class TestResult(dict):
self[FAIL] = []
self[ERROR] = []
self[SKIP] = []
+ self[SKIP_CPU_SHORTAGE] = []
self[TEST_RUN] = []
self.crashed = False
self.testcase_suite = testcase_suite
@@ -67,8 +67,8 @@ class TestResult(dict):
def was_successful(self):
return 0 == len(self[FAIL]) == len(self[ERROR]) \
- and len(self[PASS] + self[SKIP]) \
- == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
+ and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+ == self.testcase_suite.countTestCases()
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
@@ -80,7 +80,7 @@ class TestResult(dict):
rerun_ids = set([])
for testcase in self.testcase_suite:
tc_id = testcase.id()
- if tc_id not in self[PASS] and tc_id not in self[SKIP]:
+ if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
rerun_ids.add(tc_id)
if rerun_ids:
return suite_from_failed(self.testcase_suite, rerun_ids)
@@ -142,11 +142,7 @@ class TestCaseWrapper(object):
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
- if sys.version[0] == '2':
- self.stdouterr_queue = manager.StreamQueue()
- else:
- from multiprocessing import get_context
- self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
@@ -213,6 +209,13 @@ class TestCaseWrapper(object):
def was_successful(self):
return self.result.was_successful()
+ @property
+ def cpus_used(self):
+ return self.testcase_suite.cpus_used
+
+ def get_assigned_cpus(self):
+ return self.testcase_suite.get_assigned_cpus()
+
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
@@ -324,7 +327,6 @@ def process_finished_testsuite(wrapped_testcase_suite,
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
solo_testcase_suites = []
- total_test_runners = 0
# suites are unhashable, need to use list
results = []
@@ -332,31 +334,53 @@ def run_forked(testcase_suites):
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
- total_test_runners = 0
- while total_test_runners < concurrent_tests:
- if testcase_suites:
+ tests_running = 0
+ free_cpus = list(available_cpus)
+
+ def on_suite_start(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running + 1
+
+ def on_suite_finish(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running - 1
+ assert tests_running >= 0
+ free_cpus.extend(tc.get_assigned_cpus())
+
+ def run_suite(suite):
+ nonlocal manager
+ nonlocal wrapped_testcase_suites
+ nonlocal unread_testcases
+ nonlocal free_cpus
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used:]
+ wrapper = TestCaseWrapper(suite, manager)
+ wrapped_testcase_suites.add(wrapper)
+ unread_testcases.add(wrapper)
+ on_suite_start(suite)
+
+ def can_run_suite(suite):
+ return (tests_running < max_concurrent_tests and
+ (suite.cpus_used <= len(free_cpus) or
+ suite.cpus_used > max_vpp_cpus))
+
+ while free_cpus and testcase_suites:
+ a_suite = testcase_suites[0]
+ if a_suite.is_tagged_run_solo:
+ a_suite = testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ continue
+ if can_run_suite(a_suite):
a_suite = testcase_suites.pop(0)
- if a_suite.is_tagged_run_solo:
- solo_testcase_suites.append(a_suite)
- continue
- wrapped_testcase_suite = TestCaseWrapper(a_suite,
- manager)
- wrapped_testcase_suites.add(wrapped_testcase_suite)
- unread_testcases.add(wrapped_testcase_suite)
- total_test_runners = total_test_runners + 1
+ run_suite(a_suite)
else:
break
- while total_test_runners < 1 and solo_testcase_suites:
- if solo_testcase_suites:
- a_suite = solo_testcase_suites.pop(0)
- wrapped_testcase_suite = TestCaseWrapper(a_suite,
- manager)
- wrapped_testcase_suites.add(wrapped_testcase_suite)
- unread_testcases.add(wrapped_testcase_suite)
- total_test_runners = total_test_runners + 1
- else:
- break
+ if tests_running == 0 and solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
read_from_testcases = threading.Event()
read_from_testcases.set()
@@ -466,31 +490,23 @@ def run_forked(testcase_suites):
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
- total_test_runners = total_test_runners - 1
+ on_suite_finish(finished_testcase)
if stop_run:
while testcase_suites:
results.append(TestResult(testcase_suites.pop(0)))
elif testcase_suites:
- a_testcase = testcase_suites.pop(0)
- while a_testcase and a_testcase.is_tagged_run_solo:
- solo_testcase_suites.append(a_testcase)
+ a_suite = testcase_suites.pop(0)
+ while a_suite and a_suite.is_tagged_run_solo:
+ solo_testcase_suites.append(a_suite)
if testcase_suites:
- a_testcase = testcase_suites.pop(0)
+ a_suite = testcase_suites.pop(0)
else:
- a_testcase = None
- if a_testcase:
- new_testcase = TestCaseWrapper(a_testcase,
- manager)
- wrapped_testcase_suites.add(new_testcase)
- total_test_runners = total_test_runners + 1
- unread_testcases.add(new_testcase)
- if solo_testcase_suites and total_test_runners == 0:
- a_testcase = solo_testcase_suites.pop(0)
- new_testcase = TestCaseWrapper(a_testcase,
- manager)
- wrapped_testcase_suites.add(new_testcase)
- total_test_runners = total_test_runners + 1
- unread_testcases.add(new_testcase)
+ a_suite = None
+ if a_suite and can_run_suite(a_suite):
+ run_suite(a_suite)
+ if solo_testcase_suites and tests_running == 0:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
time.sleep(0.1)
except Exception:
for wrapped_testcase_suite in wrapped_testcase_suites:
@@ -506,19 +522,41 @@ def run_forked(testcase_suites):
return results
+class TestSuiteWrapper(unittest.TestSuite):
+ cpus_used = 0
+
+ def __init__(self):
+ return super().__init__()
+
+ def addTest(self, test):
+ self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+ super().addTest(test)
+
+ def assign_cpus(self, cpus):
+ self.cpus = cpus
+
+ def _handleClassSetUp(self, test, result):
+ if not test.__class__.skipped_due_to_cpu_lack:
+ test.assign_cpus(self.cpus)
+ super()._handleClassSetUp(test, result)
+
+ def get_assigned_cpus(self):
+ return self.cpus
+
+
class SplitToSuitesCallback:
def __init__(self, filter_callback):
self.suites = {}
self.suite_name = 'default'
self.filter_callback = filter_callback
- self.filtered = unittest.TestSuite()
+ self.filtered = TestSuiteWrapper()
def __call__(self, file_name, cls, method):
test_method = cls(method)
if self.filter_callback(file_name, cls.__name__, method):
self.suite_name = file_name + cls.__name__
if self.suite_name not in self.suites:
- self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name] = TestSuiteWrapper()
self.suites[self.suite_name].is_tagged_run_solo = False
self.suites[self.suite_name].addTest(test_method)
if test_method.is_tagged_run_solo():
@@ -563,7 +601,7 @@ def parse_test_option():
def filter_tests(tests, filter_cb):
- result = unittest.suite.TestSuite()
+ result = TestSuiteWrapper()
for t in tests:
if isinstance(t, unittest.suite.TestSuite):
# this is a bunch of tests, recursively filter...
@@ -628,13 +666,14 @@ class AllResults(dict):
self[FAIL] = 0
self[ERROR] = 0
self[SKIP] = 0
+ self[SKIP_CPU_SHORTAGE] = 0
self[TEST_RUN] = 0
self.rerun = []
self.testsuites_no_tests_run = []
def add_results(self, result):
self.results_per_suite.append(result)
- result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
+ result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
for result_type in result_types:
self[result_type] += len(result[result_type])
@@ -661,22 +700,29 @@ class AllResults(dict):
print('')
print(double_line_delim)
print('TEST RESULTS:')
- print(' Scheduled tests: {}'.format(self.all_testcases))
- print(' Executed tests: {}'.format(self[TEST_RUN]))
- print(' Passed tests: {}'.format(
- colorize(str(self[PASS]), GREEN)))
- if self[SKIP] > 0:
- print(' Skipped tests: {}'.format(
- colorize(str(self[SKIP]), YELLOW)))
- if self.not_executed > 0:
- print(' Not Executed tests: {}'.format(
- colorize(str(self.not_executed), RED)))
- if self[FAIL] > 0:
- print(' Failures: {}'.format(
- colorize(str(self[FAIL]), RED)))
- if self[ERROR] > 0:
- print(' Errors: {}'.format(
- colorize(str(self[ERROR]), RED)))
+
+ def indent_results(lines):
+ lines = list(filter(None, lines))
+ maximum = max(lines, key=lambda x: x.index(":"))
+ maximum = 4 + maximum.index(":")
+ for l in lines:
+ padding = " " * (maximum - l.index(":"))
+ print(f"{padding}{l}")
+
+ indent_results([
+ f'Scheduled tests: {self.all_testcases}',
+ f'Executed tests: {self[TEST_RUN]}',
+ f'Passed tests: {colorize(self[PASS], GREEN)}',
+ f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
+ if self[SKIP] else None,
+ f'Not Executed tests: {colorize(self.not_executed, RED)}'
+ if self.not_executed else None,
+ f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
+ f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
+ 'Tests skipped due to lack of CPUS: '
+ f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
+ if self[SKIP_CPU_SHORTAGE] else None
+ ])
if self.all_failed > 0:
print('FAILURES AND ERRORS IN TESTS:')
@@ -713,6 +759,10 @@ class AllResults(dict):
for tc_class in tc_classes:
print(' {}'.format(colorize(tc_class, RED)))
+ if self[SKIP_CPU_SHORTAGE]:
+ print()
+ print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
print(double_line_delim)
print('')
@@ -793,31 +843,34 @@ if __name__ == '__main__':
run_interactive = debug or step or force_foreground
- try:
- num_cpus = len(os.sched_getaffinity(0))
- except AttributeError:
- num_cpus = multiprocessing.cpu_count()
-
- print("OS reports %s available cpu(s)." % num_cpus)
+ max_concurrent_tests = 0
+ print(f"OS reports {num_cpus} available cpu(s).")
test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
if test_jobs == 'auto':
if run_interactive:
- concurrent_tests = 1
- print('Interactive mode required, running on one core')
+ max_concurrent_tests = 1
+ print('Interactive mode required, running tests consecutively.')
else:
- concurrent_tests = num_cpus
- print('Found enough resources to run tests with %s cores'
- % concurrent_tests)
- elif test_jobs.isdigit():
- concurrent_tests = int(test_jobs)
- print("Running on %s core(s) as set by 'TEST_JOBS'." %
- concurrent_tests)
+ max_concurrent_tests = num_cpus
+ print(f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently.")
else:
- concurrent_tests = 1
- print('Running on one core.')
-
- if run_interactive and concurrent_tests > 1:
+ try:
+ test_jobs = int(test_jobs)
+ except ValueError as e:
+ raise ValueError("Invalid TEST_JOBS value specified, valid "
+ "values are a positive integer or 'auto'") from e
+ if test_jobs <= 0:
+ raise ValueError("Invalid TEST_JOBS value specified, valid "
+ "values are a positive integer or 'auto'")
+ max_concurrent_tests = int(test_jobs)
+ print(f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'.")
+
+ print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
+
+ if run_interactive and max_concurrent_tests > 1:
raise NotImplementedError(
'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
@@ -833,7 +886,7 @@ if __name__ == '__main__':
failfast = args.failfast
descriptions = True
- print("Running tests using custom test runner") # debug message
+ print("Running tests using custom test runner.")
filter_file, filter_class, filter_func = parse_test_option()
print("Active filters: file=%s, class=%s, function=%s" % (
@@ -852,6 +905,21 @@ if __name__ == '__main__':
tests_amount = 0
for testcase_suite in cb.suites.values():
tests_amount += testcase_suite.countTestCases()
+ if testcase_suite.cpus_used > max_vpp_cpus:
+ # here we replace test functions with lambdas to just skip them
+ # but we also replace setUp/tearDown functions to do nothing
+ # so that the test can be "started" and "stopped", so that we can
+ # still keep those prints (test description - SKIP), which are done
+ # in stopTest() (for that to trigger, test function must run)
+ for t in testcase_suite:
+ for m in dir(t):
+ if m.startswith('test_'):
+ setattr(t, m, lambda: t.skipTest("not enough cpus"))
+ setattr(t.__class__, 'setUpClass', lambda: None)
+ setattr(t.__class__, 'tearDownClass', lambda: None)
+ setattr(t, 'setUp', lambda: None)
+ setattr(t, 'tearDown', lambda: None)
+ t.__class__.skipped_due_to_cpu_lack = True
suites.append(testcase_suite)
print("%s out of %s tests match specified filters" % (
@@ -868,6 +936,14 @@ if __name__ == '__main__':
# don't fork if requiring interactive terminal
print('Running tests in foreground in the current process')
full_suite = unittest.TestSuite()
+ free_cpus = list(available_cpus)
+ cpu_shortage = False
+ for suite in suites:
+ if suite.cpus_used <= max_vpp_cpus:
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ else:
+ suite.assign_cpus([])
+ cpu_shortage = True
full_suite.addTests(suites)
result = VppTestRunner(verbosity=verbose,
failfast=failfast,
@@ -883,10 +959,16 @@ if __name__ == '__main__':
test_case_info.tempdir,
test_case_info.core_crash_test)
+ if cpu_shortage:
+ print()
+ print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print()
sys.exit(not was_successful)
else:
print('Running each VPPTestCase in a separate background process'
- ' with {} parallel process(es)'.format(concurrent_tests))
+ f' with at most {max_concurrent_tests} parallel python test '
+ 'process(es)')
exit_code = 0
while suites and attempts > 0:
results = run_forked(suites)
diff --git a/test/sanity_run_vpp.py b/test/sanity_run_vpp.py
index f91dc833eeb..4b21de12dfe 100644
--- a/test/sanity_run_vpp.py
+++ b/test/sanity_run_vpp.py
@@ -9,7 +9,7 @@ from framework import VppDiedError, VppTestCase, KeepAliveReporter
class SanityTestCase(VppTestCase):
""" Sanity test case - verify whether VPP is able to start """
- pass
+ cpus = [0]
# don't ask to debug SanityTestCase
@classmethod
diff --git a/test/test_util.py b/test/test_util.py
index 421afce4d5d..3a61d64b28f 100755
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -2,11 +2,11 @@
"""Test framework utility functions tests"""
import unittest
-from framework import VppTestRunner
+from framework import VppTestRunner, CPUInterface
from vpp_papi import mac_pton, mac_ntop
-class TestUtil (unittest.TestCase):
+class TestUtil (CPUInterface, unittest.TestCase):
""" Test framework utility tests """
@classmethod
@@ -23,6 +23,10 @@ class TestUtil (unittest.TestCase):
pass
return False
+ @classmethod
+ def get_cpus_required(cls):
+ return 0
+
def test_mac_to_binary(self):
""" MAC to binary and back """
mac = 'aa:bb:cc:dd:ee:ff'