diff options
author | Klement Sekera <ksekera@cisco.com> | 2021-04-08 19:37:41 +0200 |
---|---|---|
committer | Andrew Yourtchenko <ayourtch@gmail.com> | 2021-04-16 09:26:33 +0000 |
commit | 558ceabc6c08f275ce6c6e7ff295e74272eb851a (patch) | |
tree | 778bb375cdd8f3b11537b6b638a4b00f53211ba5 | |
parent | f70cf2337683a97b06fe30ea56fab9ebab556ae7 (diff) |
tests: cpus awareness
Introduce MAX_CPUS parameters to control maximum number of CPUs used by
VPP(s) during testing, with default value 'auto' corresponding to all
CPUs available.
Calculate test CPU requirements by taking into account the number of
workers, so a test requires 1 (main thread) + # of worker CPUs.
When running tests, keep track of both running test jobs (controlled by
TEST_JOBS parameter) and free CPUs. This then causes two limits in the
system - to not exceed number of jobs in parallel but also to not exceed
number of CPUs available.
Skip tests which require more CPUs than are available in system (or more
than MAX_CPUS) and print a warning message.
Type: improvement
Change-Id: Ib8fda54e4c6a36179d64160bb87fbd3a0011762d
Signed-off-by: Klement Sekera <ksekera@cisco.com>
-rw-r--r-- | src/plugins/memif/test/test_memif.py | 15 | ||||
-rw-r--r-- | test/Makefile | 3 | ||||
-rw-r--r-- | test/cpu_config.py | 21 | ||||
-rw-r--r-- | test/framework.py | 135 | ||||
-rw-r--r-- | test/log.py | 2 | ||||
-rw-r--r-- | test/run_tests.py | 270 | ||||
-rw-r--r-- | test/sanity_run_vpp.py | 2 | ||||
-rwxr-xr-x | test/test_util.py | 8 |
8 files changed, 296 insertions, 160 deletions
diff --git a/src/plugins/memif/test/test_memif.py b/src/plugins/memif/test/test_memif.py index caaab87f1e8..fc7cf9b2e7e 100644 --- a/src/plugins/memif/test/test_memif.py +++ b/src/plugins/memif/test/test_memif.py @@ -16,11 +16,24 @@ from vpp_papi import VppEnum @tag_run_solo class TestMemif(VppTestCase): """ Memif Test Case """ + remote_class = RemoteVppTestCase + + @classmethod + def get_cpus_required(cls): + return (super().get_cpus_required() + + cls.remote_class.get_cpus_required()) + + @classmethod + def assign_cpus(cls, cpus): + remote_cpus = cpus[:cls.remote_class.get_cpus_required()] + my_cpus = cpus[cls.remote_class.get_cpus_required():] + cls.remote_class.assign_cpus(remote_cpus) + super().assign_cpus(my_cpus) @classmethod def setUpClass(cls): # fork new process before client connects to VPP - cls.remote_test = RemoteClass(RemoteVppTestCase) + cls.remote_test = RemoteClass(cls.remote_class) cls.remote_test.start_remote() cls.remote_test.set_request_timeout(10) super(TestMemif, cls).setUpClass() diff --git a/test/Makefile b/test/Makefile index dc6aa096dd0..0ee61a23c0b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -366,7 +366,8 @@ help: @echo "Arguments controlling test runs:" @echo " V=[0|1|2] - set test verbosity level" @echo " 0=ERROR, 1=INFO, 2=DEBUG" - @echo " TEST_JOBS=[<n>|auto] - use <n> parallel processes for test execution or automatic discovery of maximum acceptable processes (default: 1)" + @echo " TEST_JOBS=[<n>|auto] - use at most <n> parallel python processes for test execution, if auto, set to number of available cpus (default: 1)" + @echo " MAX_VPP_CPUS=[<n>|auto]- use at most <n> cpus for running vpp main and worker threads, if auto, set to number of available cpus (default: auto)" @echo " CACHE_OUTPUT=[0|1] - cache VPP stdout/stderr and log as one block after test finishes (default: 1)" @echo " FAILFAST=[0|1] - fail fast if 1, complete all tests if 0" @echo " TIMEOUT=<timeout> - fail test suite if any single test takes longer than <timeout> (in seconds) to finish (default: 600)" diff --git a/test/cpu_config.py b/test/cpu_config.py new file mode 100644 index 00000000000..b4e5d1ab767 --- /dev/null +++ b/test/cpu_config.py @@ -0,0 +1,21 @@ +import os +import psutil + +available_cpus = psutil.Process().cpu_affinity() +num_cpus = len(available_cpus) + +max_vpp_cpus = os.getenv("MAX_VPP_CPUS", "auto").lower() + +if max_vpp_cpus == "auto": + max_vpp_cpus = num_cpus +else: + try: + max_vpp_cpus = int(max_vpp_cpus) + except ValueError as e: + raise ValueError("Invalid MAX_VPP_CPUS value specified, valid " + "values are a positive integer or 'auto'") from e + if max_vpp_cpus <= 0: + raise ValueError("Invalid MAX_VPP_CPUS value specified, valid " + "values are a positive integer or 'auto'") + if max_vpp_cpus > num_cpus: + max_vpp_cpus = num_cpus diff --git a/test/framework.py b/test/framework.py index 67ac495547c..1cbd814aa8d 100644 --- a/test/framework.py +++ b/test/framework.py @@ -22,6 +22,7 @@ from inspect import getdoc, isclass from traceback import format_exception from logging import FileHandler, DEBUG, Formatter from enum import Enum +from abc import ABC, abstractmethod import scapy.compat from scapy.packet import Raw @@ -42,6 +43,8 @@ from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest from scapy.layers.inet6 import ICMPv6EchoReply +from cpu_config import available_cpus, num_cpus, max_vpp_cpus + logger = logging.getLogger(__name__) # Set up an empty logger for the testcase that can be overridden as necessary @@ -53,6 +56,7 @@ FAIL = 1 ERROR = 2 SKIP = 3 TEST_RUN = 4 +SKIP_CPU_SHORTAGE = 5 class BoolEnvironmentVariable(object): @@ -223,6 +227,21 @@ def _running_gcov_tests(): running_gcov_tests = _running_gcov_tests() +def get_environ_vpp_worker_count(): + worker_config = os.getenv("VPP_WORKER_CONFIG", None) + if worker_config: + elems = worker_config.split(" ") + if elems[0] != "workers" or len(elems) != 2: + raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." % + worker_config) + return int(elems[1]) + else: + return 0 + + +environ_vpp_worker_count = get_environ_vpp_worker_count() + + class KeepAliveReporter(object): """ Singleton object which reports test start to parent process @@ -292,7 +311,21 @@ class DummyVpp: pass -class VppTestCase(unittest.TestCase): +class CPUInterface(ABC): + cpus = [] + skipped_due_to_cpu_lack = False + + @classmethod + @abstractmethod + def get_cpus_required(cls): + pass + + @classmethod + def assign_cpus(cls, cpus): + cls.cpus = cpus + + +class VppTestCase(CPUInterface, unittest.TestCase): """This subclass is a base class for VPP test cases that are implemented as classes. It provides methods to create and run test case. """ @@ -358,34 +391,18 @@ class VppTestCase(unittest.TestCase): if dl == "gdb-all" or dl == "gdbserver-all": cls.debug_all = True - @staticmethod - def get_least_used_cpu(): - cpu_usage_list = [set(range(psutil.cpu_count()))] - vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) - if 'vpp_main' == p.info['name']] - for vpp_process in vpp_processes: - for cpu_usage_set in cpu_usage_list: - try: - cpu_num = vpp_process.cpu_num() - if cpu_num in cpu_usage_set: - cpu_usage_set_index = cpu_usage_list.index( - cpu_usage_set) - if cpu_usage_set_index == len(cpu_usage_list) - 1: - cpu_usage_list.append({cpu_num}) - else: - cpu_usage_list[cpu_usage_set_index + 1].add( - cpu_num) - cpu_usage_set.remove(cpu_num) - break - except psutil.NoSuchProcess: - pass - - for cpu_usage_set in cpu_usage_list: - if len(cpu_usage_set) > 0: - min_usage_set = cpu_usage_set - break + @classmethod + def get_vpp_worker_count(cls): + if not hasattr(cls, "vpp_worker_count"): + if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS): + cls.vpp_worker_count = 0 + else: + cls.vpp_worker_count = environ_vpp_worker_count + return cls.vpp_worker_count - return random.choice(tuple(min_usage_set)) + @classmethod + def get_cpus_required(cls): + return 1 + cls.get_vpp_worker_count() @classmethod def setUpConstants(cls): @@ -417,20 +434,6 @@ class VppTestCase(unittest.TestCase): if coredump_size is None: coredump_size = "coredump-size unlimited" - cpu_core_number = cls.get_least_used_cpu() - if not hasattr(cls, "vpp_worker_count"): - cls.vpp_worker_count = 0 - worker_config = os.getenv("VPP_WORKER_CONFIG", "") - if worker_config: - elems = worker_config.split(" ") - if elems[0] != "workers" or len(elems) != 2: - raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." % - worker_config) - cls.vpp_worker_count = int(elems[1]) - if cls.vpp_worker_count > 0 and\ - cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS): - cls.vpp_worker_count = 0 - default_variant = os.getenv("VARIANT") if default_variant is not None: default_variant = "defaults { %s 100 }" % default_variant @@ -447,9 +450,10 @@ class VppTestCase(unittest.TestCase): coredump_size, "runtime-dir", cls.tempdir, "}", "api-trace", "{", "on", "}", "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}", - "cpu", "{", "main-core", str(cpu_core_number), ] - if cls.vpp_worker_count: - cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)]) + "cpu", "{", "main-core", str(cls.cpus[0]), ] + if cls.get_vpp_worker_count(): + cls.vpp_cmdline.extend([ + "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]) cls.vpp_cmdline.extend([ "}", "physmem", "{", "max-size", "32m", "}", @@ -509,11 +513,12 @@ class VppTestCase(unittest.TestCase): @classmethod def run_vpp(cls): + cls.logger.debug(f"Assigned cpus: {cls.cpus}") cmdline = cls.vpp_cmdline if cls.debug_gdbserver: gdbserver = '/usr/bin/gdbserver' - if not os.path.isfile(gdbserver) or \ + if not os.path.isfile(gdbserver) or\ not os.access(gdbserver, os.X_OK): raise Exception("gdbserver binary '%s' does not exist or is " "not executable" % gdbserver) @@ -1349,6 +1354,7 @@ class VppTestResult(unittest.TestResult): self.verbosity = verbosity self.result_string = None self.runner = runner + self.printed = [] def addSuccess(self, test): """ @@ -1383,7 +1389,10 @@ class VppTestResult(unittest.TestResult): unittest.TestResult.addSkip(self, test, reason) self.result_string = colorize("SKIP", YELLOW) - self.send_result_through_pipe(test, SKIP) + if reason == "not enough cpus": + self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE) + else: + self.send_result_through_pipe(test, SKIP) def symlink_failed(self): if self.current_test_case_info: @@ -1501,28 +1510,34 @@ class VppTestResult(unittest.TestResult): """ def print_header(test): + if test.__class__ in self.printed: + return + test_doc = getdoc(test) if not test_doc: raise Exception("No doc string for test '%s'" % test.id()) + test_title = test_doc.splitlines()[0] - test_title_colored = colorize(test_title, GREEN) + test_title = colorize(test_title, GREEN) if test.is_tagged_run_solo(): - # long live PEP-8 and 80 char width limitation... - c = YELLOW - test_title_colored = colorize("SOLO RUN: " + test_title, c) + test_title = colorize(f"SOLO RUN: {test_title}", YELLOW) # This block may overwrite the colorized title above, # but we want this to stand out and be fixed if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS): - c = RED - w = "FIXME with VPP workers: " - test_title_colored = colorize(w + test_title, c) - - if not hasattr(test.__class__, '_header_printed'): - print(double_line_delim) - print(test_title_colored) - print(double_line_delim) - test.__class__._header_printed = True + test_title = colorize( + f"FIXME with VPP workers: {test_title}", RED) + + if test.__class__.skipped_due_to_cpu_lack: + test_title = colorize( + f"{test_title} [skipped - not enough cpus, " + f"required={test.__class__.get_cpus_required()}, " + f"available={max_vpp_cpus}]", YELLOW) + + print(double_line_delim) + print(test_title) + print(double_line_delim) + self.printed.append(test.__class__) print_header(test) self.start_test = time.time() diff --git a/test/log.py b/test/log.py index b0fe037e1ba..5fd91c794e8 100644 --- a/test/log.py +++ b/test/log.py @@ -11,7 +11,7 @@ single_line_delim = '-' * 78 def colorize(msg, color): - return color + msg + COLOR_RESET + return f"{color}{msg}{COLOR_RESET}" class ColorFormatter(logging.Formatter): diff --git a/test/run_tests.py b/test/run_tests.py index 008828c1e86..5d091ad253f 100644 --- a/test/run_tests.py +++ b/test/run_tests.py @@ -9,22 +9,21 @@ import argparse import time import threading import signal -import psutil import re -import multiprocessing -from multiprocessing import Process, Pipe, cpu_count +from multiprocessing import Process, Pipe, get_context from multiprocessing.queues import Queue from multiprocessing.managers import BaseManager import framework from framework import VppTestRunner, running_extended_tests, VppTestCase, \ get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \ - TEST_RUN + TEST_RUN, SKIP_CPU_SHORTAGE from debug import spawn_gdb, start_vpp_in_gdb from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \ colorize, single_line_delim from discover_tests import discover_tests from subprocess import check_output, CalledProcessError from util import check_core_path, get_core_path, is_core_present +from cpu_config import num_cpus, max_vpp_cpus, available_cpus # timeout which controls how long the child has to finish after seeing # a core dump in test temporary directory. If this is exceeded, parent assumes @@ -59,6 +58,7 @@ class TestResult(dict): self[FAIL] = [] self[ERROR] = [] self[SKIP] = [] + self[SKIP_CPU_SHORTAGE] = [] self[TEST_RUN] = [] self.crashed = False self.testcase_suite = testcase_suite @@ -67,8 +67,8 @@ class TestResult(dict): def was_successful(self): return 0 == len(self[FAIL]) == len(self[ERROR]) \ - and len(self[PASS] + self[SKIP]) \ - == self.testcase_suite.countTestCases() == len(self[TEST_RUN]) + and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \ + == self.testcase_suite.countTestCases() def no_tests_run(self): return 0 == len(self[TEST_RUN]) @@ -80,7 +80,7 @@ class TestResult(dict): rerun_ids = set([]) for testcase in self.testcase_suite: tc_id = testcase.id() - if tc_id not in self[PASS] and tc_id not in self[SKIP]: + if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]: rerun_ids.add(tc_id) if rerun_ids: return suite_from_failed(self.testcase_suite, rerun_ids) @@ -142,11 +142,7 @@ class TestCaseWrapper(object): self.finished_parent_end, self.finished_child_end = Pipe(duplex=False) self.result_parent_end, self.result_child_end = Pipe(duplex=False) self.testcase_suite = testcase_suite - if sys.version[0] == '2': - self.stdouterr_queue = manager.StreamQueue() - else: - from multiprocessing import get_context - self.stdouterr_queue = manager.StreamQueue(ctx=get_context()) + self.stdouterr_queue = manager.StreamQueue(ctx=get_context()) self.logger = get_parallel_logger(self.stdouterr_queue) self.child = Process(target=test_runner_wrapper, args=(testcase_suite, @@ -213,6 +209,13 @@ class TestCaseWrapper(object): def was_successful(self): return self.result.was_successful() + @property + def cpus_used(self): + return self.testcase_suite.cpus_used + + def get_assigned_cpus(self): + return self.testcase_suite.get_assigned_cpus() + def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases, read_testcases): @@ -324,7 +327,6 @@ def process_finished_testsuite(wrapped_testcase_suite, def run_forked(testcase_suites): wrapped_testcase_suites = set() solo_testcase_suites = [] - total_test_runners = 0 # suites are unhashable, need to use list results = [] @@ -332,31 +334,53 @@ def run_forked(testcase_suites): finished_unread_testcases = set() manager = StreamQueueManager() manager.start() - total_test_runners = 0 - while total_test_runners < concurrent_tests: - if testcase_suites: + tests_running = 0 + free_cpus = list(available_cpus) + + def on_suite_start(tc): + nonlocal tests_running + nonlocal free_cpus + tests_running = tests_running + 1 + + def on_suite_finish(tc): + nonlocal tests_running + nonlocal free_cpus + tests_running = tests_running - 1 + assert tests_running >= 0 + free_cpus.extend(tc.get_assigned_cpus()) + + def run_suite(suite): + nonlocal manager + nonlocal wrapped_testcase_suites + nonlocal unread_testcases + nonlocal free_cpus + suite.assign_cpus(free_cpus[:suite.cpus_used]) + free_cpus = free_cpus[suite.cpus_used:] + wrapper = TestCaseWrapper(suite, manager) + wrapped_testcase_suites.add(wrapper) + unread_testcases.add(wrapper) + on_suite_start(suite) + + def can_run_suite(suite): + return (tests_running < max_concurrent_tests and + (suite.cpus_used <= len(free_cpus) or + suite.cpus_used > max_vpp_cpus)) + + while free_cpus and testcase_suites: + a_suite = testcase_suites[0] + if a_suite.is_tagged_run_solo: + a_suite = testcase_suites.pop(0) + solo_testcase_suites.append(a_suite) + continue + if can_run_suite(a_suite): a_suite = testcase_suites.pop(0) - if a_suite.is_tagged_run_solo: - solo_testcase_suites.append(a_suite) - continue - wrapped_testcase_suite = TestCaseWrapper(a_suite, - manager) - wrapped_testcase_suites.add(wrapped_testcase_suite) - unread_testcases.add(wrapped_testcase_suite) - total_test_runners = total_test_runners + 1 + run_suite(a_suite) else: break - while total_test_runners < 1 and solo_testcase_suites: - if solo_testcase_suites: - a_suite = solo_testcase_suites.pop(0) - wrapped_testcase_suite = TestCaseWrapper(a_suite, - manager) - wrapped_testcase_suites.add(wrapped_testcase_suite) - unread_testcases.add(wrapped_testcase_suite) - total_test_runners = total_test_runners + 1 - else: - break + if tests_running == 0 and solo_testcase_suites: + a_suite = solo_testcase_suites.pop(0) + run_suite(a_suite) read_from_testcases = threading.Event() read_from_testcases.set() @@ -466,31 +490,23 @@ def run_forked(testcase_suites): wrapped_testcase_suites.remove(finished_testcase) finished_unread_testcases.add(finished_testcase) finished_testcase.stdouterr_queue.put(None) - total_test_runners = total_test_runners - 1 + on_suite_finish(finished_testcase) if stop_run: while testcase_suites: results.append(TestResult(testcase_suites.pop(0))) elif testcase_suites: - a_testcase = testcase_suites.pop(0) - while a_testcase and a_testcase.is_tagged_run_solo: - solo_testcase_suites.append(a_testcase) + a_suite = testcase_suites.pop(0) + while a_suite and a_suite.is_tagged_run_solo: + solo_testcase_suites.append(a_suite) if testcase_suites: - a_testcase = testcase_suites.pop(0) + a_suite = testcase_suites.pop(0) else: - a_testcase = None - if a_testcase: - new_testcase = TestCaseWrapper(a_testcase, - manager) - wrapped_testcase_suites.add(new_testcase) - total_test_runners = total_test_runners + 1 - unread_testcases.add(new_testcase) - if solo_testcase_suites and total_test_runners == 0: - a_testcase = solo_testcase_suites.pop(0) - new_testcase = TestCaseWrapper(a_testcase, - manager) - wrapped_testcase_suites.add(new_testcase) - total_test_runners = total_test_runners + 1 - unread_testcases.add(new_testcase) + a_suite = None + if a_suite and can_run_suite(a_suite): + run_suite(a_suite) + if solo_testcase_suites and tests_running == 0: + a_suite = solo_testcase_suites.pop(0) + run_suite(a_suite) time.sleep(0.1) except Exception: for wrapped_testcase_suite in wrapped_testcase_suites: @@ -506,19 +522,41 @@ def run_forked(testcase_suites): return results +class TestSuiteWrapper(unittest.TestSuite): + cpus_used = 0 + + def __init__(self): + return super().__init__() + + def addTest(self, test): + self.cpus_used = max(self.cpus_used, test.get_cpus_required()) + super().addTest(test) + + def assign_cpus(self, cpus): + self.cpus = cpus + + def _handleClassSetUp(self, test, result): + if not test.__class__.skipped_due_to_cpu_lack: + test.assign_cpus(self.cpus) + super()._handleClassSetUp(test, result) + + def get_assigned_cpus(self): + return self.cpus + + class SplitToSuitesCallback: def __init__(self, filter_callback): self.suites = {} self.suite_name = 'default' self.filter_callback = filter_callback - self.filtered = unittest.TestSuite() + self.filtered = TestSuiteWrapper() def __call__(self, file_name, cls, method): test_method = cls(method) if self.filter_callback(file_name, cls.__name__, method): self.suite_name = file_name + cls.__name__ if self.suite_name not in self.suites: - self.suites[self.suite_name] = unittest.TestSuite() + self.suites[self.suite_name] = TestSuiteWrapper() self.suites[self.suite_name].is_tagged_run_solo = False self.suites[self.suite_name].addTest(test_method) if test_method.is_tagged_run_solo(): @@ -563,7 +601,7 @@ def parse_test_option(): def filter_tests(tests, filter_cb): - result = unittest.suite.TestSuite() + result = TestSuiteWrapper() for t in tests: if isinstance(t, unittest.suite.TestSuite): # this is a bunch of tests, recursively filter... @@ -628,13 +666,14 @@ class AllResults(dict): self[FAIL] = 0 self[ERROR] = 0 self[SKIP] = 0 + self[SKIP_CPU_SHORTAGE] = 0 self[TEST_RUN] = 0 self.rerun = [] self.testsuites_no_tests_run = [] def add_results(self, result): self.results_per_suite.append(result) - result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN] + result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE] for result_type in result_types: self[result_type] += len(result[result_type]) @@ -661,22 +700,29 @@ class AllResults(dict): print('') print(double_line_delim) print('TEST RESULTS:') - print(' Scheduled tests: {}'.format(self.all_testcases)) - print(' Executed tests: {}'.format(self[TEST_RUN])) - print(' Passed tests: {}'.format( - colorize(str(self[PASS]), GREEN))) - if self[SKIP] > 0: - print(' Skipped tests: {}'.format( - colorize(str(self[SKIP]), YELLOW))) - if self.not_executed > 0: - print(' Not Executed tests: {}'.format( - colorize(str(self.not_executed), RED))) - if self[FAIL] > 0: - print(' Failures: {}'.format( - colorize(str(self[FAIL]), RED))) - if self[ERROR] > 0: - print(' Errors: {}'.format( - colorize(str(self[ERROR]), RED))) + + def indent_results(lines): + lines = list(filter(None, lines)) + maximum = max(lines, key=lambda x: x.index(":")) + maximum = 4 + maximum.index(":") + for l in lines: + padding = " " * (maximum - l.index(":")) + print(f"{padding}{l}") + + indent_results([ + f'Scheduled tests: {self.all_testcases}', + f'Executed tests: {self[TEST_RUN]}', + f'Passed tests: {colorize(self[PASS], GREEN)}', + f'Skipped tests: {colorize(self[SKIP], YELLOW)}' + if self[SKIP] else None, + f'Not Executed tests: {colorize(self.not_executed, RED)}' + if self.not_executed else None, + f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None, + f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None, + 'Tests skipped due to lack of CPUS: ' + f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}' + if self[SKIP_CPU_SHORTAGE] else None + ]) if self.all_failed > 0: print('FAILURES AND ERRORS IN TESTS:') @@ -713,6 +759,10 @@ class AllResults(dict): for tc_class in tc_classes: print(' {}'.format(colorize(tc_class, RED))) + if self[SKIP_CPU_SHORTAGE]: + print() + print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT' + ' ENOUGH CPUS AVAILABLE', YELLOW)) print(double_line_delim) print('') @@ -793,31 +843,34 @@ if __name__ == '__main__': run_interactive = debug or step or force_foreground - try: - num_cpus = len(os.sched_getaffinity(0)) - except AttributeError: - num_cpus = multiprocessing.cpu_count() - - print("OS reports %s available cpu(s)." % num_cpus) + max_concurrent_tests = 0 + print(f"OS reports {num_cpus} available cpu(s).") test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process if test_jobs == 'auto': if run_interactive: - concurrent_tests = 1 - print('Interactive mode required, running on one core') + max_concurrent_tests = 1 + print('Interactive mode required, running tests consecutively.') else: - concurrent_tests = num_cpus - print('Found enough resources to run tests with %s cores' - % concurrent_tests) - elif test_jobs.isdigit(): - concurrent_tests = int(test_jobs) - print("Running on %s core(s) as set by 'TEST_JOBS'." % - concurrent_tests) + max_concurrent_tests = num_cpus + print(f"Running at most {max_concurrent_tests} python test " + "processes concurrently.") else: - concurrent_tests = 1 - print('Running on one core.') - - if run_interactive and concurrent_tests > 1: + try: + test_jobs = int(test_jobs) + except ValueError as e: + raise ValueError("Invalid TEST_JOBS value specified, valid " + "values are a positive integer or 'auto'") from e + if test_jobs <= 0: + raise ValueError("Invalid TEST_JOBS value specified, valid " + "values are a positive integer or 'auto'") + max_concurrent_tests = int(test_jobs) + print(f"Running at most {max_concurrent_tests} python test processes " + "concurrently as set by 'TEST_JOBS'.") + + print(f"Using at most {max_vpp_cpus} cpus for VPP threads.") + + if run_interactive and max_concurrent_tests > 1: raise NotImplementedError( 'Running tests interactively (DEBUG is gdb[server] or ATTACH or ' 'STEP is set) in parallel (TEST_JOBS is more than 1) is not ' @@ -833,7 +886,7 @@ if __name__ == '__main__': failfast = args.failfast descriptions = True - print("Running tests using custom test runner") # debug message + print("Running tests using custom test runner.") filter_file, filter_class, filter_func = parse_test_option() print("Active filters: file=%s, class=%s, function=%s" % ( @@ -852,6 +905,21 @@ if __name__ == '__main__': tests_amount = 0 for testcase_suite in cb.suites.values(): tests_amount += testcase_suite.countTestCases() + if testcase_suite.cpus_used > max_vpp_cpus: + # here we replace test functions with lambdas to just skip them + # but we also replace setUp/tearDown functions to do nothing + # so that the test can be "started" and "stopped", so that we can + # still keep those prints (test description - SKIP), which are done + # in stopTest() (for that to trigger, test function must run) + for t in testcase_suite: + for m in dir(t): + if m.startswith('test_'): + setattr(t, m, lambda: t.skipTest("not enough cpus")) + setattr(t.__class__, 'setUpClass', lambda: None) + setattr(t.__class__, 'tearDownClass', lambda: None) + setattr(t, 'setUp', lambda: None) + setattr(t, 'tearDown', lambda: None) + t.__class__.skipped_due_to_cpu_lack = True suites.append(testcase_suite) print("%s out of %s tests match specified filters" % ( @@ -868,6 +936,14 @@ if __name__ == '__main__': # don't fork if requiring interactive terminal print('Running tests in foreground in the current process') full_suite = unittest.TestSuite() + free_cpus = list(available_cpus) + cpu_shortage = False + for suite in suites: + if suite.cpus_used <= max_vpp_cpus: + suite.assign_cpus(free_cpus[:suite.cpus_used]) + else: + suite.assign_cpus([]) + cpu_shortage = True full_suite.addTests(suites) result = VppTestRunner(verbosity=verbose, failfast=failfast, @@ -883,10 +959,16 @@ if __name__ == '__main__': test_case_info.tempdir, test_case_info.core_crash_test) + if cpu_shortage: + print() + print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT' + ' ENOUGH CPUS AVAILABLE', YELLOW)) + print() sys.exit(not was_successful) else: print('Running each VPPTestCase in a separate background process' - ' with {} parallel process(es)'.format(concurrent_tests)) + f' with at most {max_concurrent_tests} parallel python test ' + 'process(es)') exit_code = 0 while suites and attempts > 0: results = run_forked(suites) diff --git a/test/sanity_run_vpp.py b/test/sanity_run_vpp.py index f91dc833eeb..4b21de12dfe 100644 --- a/test/sanity_run_vpp.py +++ b/test/sanity_run_vpp.py @@ -9,7 +9,7 @@ from framework import VppDiedError, VppTestCase, KeepAliveReporter class SanityTestCase(VppTestCase): """ Sanity test case - verify whether VPP is able to start """ - pass + cpus = [0] # don't ask to debug SanityTestCase @classmethod diff --git a/test/test_util.py b/test/test_util.py index 421afce4d5d..3a61d64b28f 100755 --- a/test/test_util.py +++ b/test/test_util.py @@ -2,11 +2,11 @@ """Test framework utility functions tests""" import unittest -from framework import VppTestRunner +from framework import VppTestRunner, CPUInterface from vpp_papi import mac_pton, mac_ntop -class TestUtil (unittest.TestCase): +class TestUtil (CPUInterface, unittest.TestCase): """ Test framework utility tests """ @classmethod @@ -23,6 +23,10 @@ class TestUtil (unittest.TestCase): pass return False + @classmethod + def get_cpus_required(cls): + return 0 + def test_mac_to_binary(self): """ MAC to binary and back """ mac = 'aa:bb:cc:dd:ee:ff' |