From f56b77a0764222cc45a9df572df901067a273356 Mon Sep 17 00:00:00 2001 From: Damjan Marion Date: Mon, 3 Oct 2016 19:44:57 +0200 Subject: test: new test infrastructure Change-Id: I73ca19c431743f6b39669c583d9222a6559346ef Signed-off-by: Jan Gelety Signed-off-by: Juraj Sloboda Signed-off-by: Stefan Kobza Signed-off-by: Matej Klotton Signed-off-by: Maciek Konstantynowicz Signed-off-by: Damjan Marion --- test/framework.py | 621 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 621 insertions(+) create mode 100644 test/framework.py (limited to 'test/framework.py') diff --git a/test/framework.py b/test/framework.py new file mode 100644 index 00000000..67f8a058 --- /dev/null +++ b/test/framework.py @@ -0,0 +1,621 @@ +#!/usr/bin/env python +## @package framework +# Module to handle test case execution. +# +# The module provides a set of tools for constructing and running tests and +# representing the results. + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import os +import subprocess +import unittest +from inspect import getdoc + +from scapy.utils import wrpcap, rdpcap +from scapy.packet import Raw + +## Static variables to store color formatting strings. +# +# These variables (RED, GREEN, YELLOW and LPURPLE) are used to configure +# the color of the text to be printed in the terminal. Variable END is used +# to revert the text color to the default one. +RED = '\033[91m' +GREEN = '\033[92m' +YELLOW = '\033[93m' +LPURPLE = '\033[94m' +END = '\033[0m' + +## Private class to create packet info object. +# +# Help process information about the next packet. +# Set variables to default values. +class _PacketInfo(object): + index = -1 + src = -1 + dst = -1 + data = None + ## @var index + # Integer variable to store the index of the packet. + ## @var src + # Integer variable to store the index of the source packet generator + # interface of the packet. + ## @var dst + # Integer variable to store the index of the destination packet generator + # interface of the packet. + ## @var data + # Object variable to store the copy of the former packet. + +## Subclass of the python unittest.TestCase class. +# +# This subclass is a base class for test cases that are implemented as classes. +# It provides methods to create and run test case. +class VppTestCase(unittest.TestCase): + + ## Class method to set class constants necessary to run test case. + # @param cls The class pointer. + @classmethod + def setUpConstants(cls): + cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp") + cls.vpp_api_test_bin = os.getenv("VPP_TEST_API_TEST_BIN", + "vpp-api-test") + cls.vpp_cmdline = [cls.vpp_bin, "unix", "nodaemon", "api-segment", "{", + "prefix", "unittest", "}"] + cls.vpp_api_test_cmdline = [cls.vpp_api_test_bin, "chroot", "prefix", + "unittest"] + try: + cls.verbose = int(os.getenv("V", 0)) + except: + cls.verbose = 0 + + ## @var vpp_bin + # String variable to store the path to vpp (vector packet processor). + ## @var vpp_api_test_bin + # String variable to store the path to vpp_api_test (vpp API test tool). + ## @var vpp_cmdline + # List of command line attributes for vpp. + ## @var vpp_api_test_cmdline + # List of command line attributes for vpp_api_test. + ## @var verbose + # Integer variable to store required verbosity level. + + ## Class method to start the test case. + # 1. Initiate test case constants and set test case variables to default + # values. + # 2. Remove files from the shared memory. + # 3. Start vpp as a subprocess. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + cls.setUpConstants() + cls.pg_streams = [] + cls.MY_MACS = {} + cls.MY_IP4S = {} + cls.MY_IP6S = {} + cls.VPP_MACS = {} + cls.VPP_IP4S = {} + cls.VPP_IP6S = {} + cls.packet_infos = {} + print "==================================================================" + print YELLOW + getdoc(cls) + END + print "==================================================================" + os.system("rm -f /dev/shm/unittest-global_vm") + os.system("rm -f /dev/shm/unittest-vpe-api") + os.system("rm -f /dev/shm/unittest-db") + cls.vpp = subprocess.Popen(cls.vpp_cmdline, stderr=subprocess.PIPE) + ## @var pg_streams + # List variable to store packet-generator streams for interfaces. + ## @var MY_MACS + # Dictionary variable to store host MAC addresses connected to packet + # generator interfaces. + ## @var MY_IP4S + # Dictionary variable to store host IPv4 addresses connected to packet + # generator interfaces. + ## @var MY_IP6S + # Dictionary variable to store host IPv6 addresses connected to packet + # generator interfaces. + ## @var VPP_MACS + # Dictionary variable to store VPP MAC addresses of the packet + # generator interfaces. + ## @var VPP_IP4S + # Dictionary variable to store VPP IPv4 addresses of the packet + # generator interfaces. + ## @var VPP_IP6S + # Dictionary variable to store VPP IPv6 addresses of the packet + # generator interfaces. + ## @var vpp + # Test case object variable to store file descriptor of running vpp + # subprocess with open pipe to the standard error stream per + # VppTestCase object. + + ## Class method to do cleaning when all tests (test_) defined for + # VppTestCase class are finished. + # 1. Terminate vpp and kill all vpp instances. + # 2. Remove files from the shared memory. + # @param cls The class pointer. + @classmethod + def quit(cls): + cls.vpp.terminate() + cls.vpp = None + os.system("rm -f /dev/shm/unittest-global_vm") + os.system("rm -f /dev/shm/unittest-vpe-api") + os.system("rm -f /dev/shm/unittest-db") + + ## Class method to define tear down action of the VppTestCase class. + # @param cls The class pointer. + @classmethod + def tearDownClass(cls): + cls.quit() + + ## Method to define tear down VPP actions of the test case. + # @param self The object pointer. + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show ip arp") + self.cli(2, "show ip fib") + self.cli(2, "show error") + self.cli(2, "show run") + + ## Method to define setup action of the test case. + # @param self The object pointer. + def setUp(self): + self.cli(2, "clear trace") + + ## Class method to print logs. + # Based on set level of verbosity print text in the terminal. + # @param cls The class pointer. + # @param s String variable to store text to be printed. + # @param v Integer variable to store required level of verbosity. + @classmethod + def log(cls, s, v=1): + if cls.verbose >= v: + print "LOG: " + LPURPLE + s + END + + ## Class method to execute api commands. + # Based on set level of verbosity print the output of the api command in + # the terminal. + # @param cls The class pointer. + # @param s String variable to store api command string. + @classmethod + def api(cls, s): + p = subprocess.Popen(cls.vpp_api_test_cmdline, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + if cls.verbose > 0: + print "API: " + RED + s + END + p.stdin.write(s) + out = p.communicate()[0] + out = out.replace("vat# ", "", 2) + if cls.verbose > 0: + if len(out) > 1: + print YELLOW + out + END + ## @var p + # Object variable to store file descriptor of vpp_api_test subprocess + # with open pipes to the standard output, inputs and error streams. + ## @var out + # Tuple variable to store standard output of vpp_api_test subprocess + # where the string "vat# " is replaced by empty string later. + + ## Class method to execute cli commands. + # Based on set level of verbosity of the log and verbosity defined by + # environmental variable execute the cli command and print the output in + # the terminal. + # CLI command is executed via vpp API test tool (exec + cli_command) + # @param cls The class pointer. + # @param v Integer variable to store required level of verbosity. + # @param s String variable to store cli command string. + @classmethod + def cli(cls, v, s): + if cls.verbose < v: + return + p = subprocess.Popen(cls.vpp_api_test_cmdline, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + if cls.verbose > 0: + print "CLI: " + RED + s + END + p.stdin.write('exec ' + s) + out = p.communicate()[0] + out = out.replace("vat# ", "", 2) + if cls.verbose > 0: + if len(out) > 1: + print YELLOW + out + END + ## @var p + # Object variable to store file descriptor of vpp_api_test subprocess + # with open pipes to the standard output, inputs and error streams. + ## @var out + # Tuple variable to store standard output of vpp_api_test subprocess + # where the string "vat# " is replaced by empty string later. + + ## Class method to create incoming packet stream for the packet-generator + # interface. + # Delete old /tmp/pgX_in.pcap file if exists and create the empty one and + # fill it with provided packets and add it to pg_streams list. + # @param cls The class pointer. + # @param i Integer variable to store the index of the packet-generator + # interface to create packet stream for. + # @param pkts List variable to store packets to be added to the stream. + @classmethod + def pg_add_stream(cls, i, pkts): + os.system("sudo rm -f /tmp/pg%u_in.pcap" % i) + wrpcap("/tmp/pg%u_in.pcap" % i, pkts) + # no equivalent API command + cls.cli(0, "packet-generator new pcap /tmp/pg%u_in.pcap source pg%u" + " name pcap%u" % (i, i, i)) + cls.pg_streams.append('pcap%u' % i) + + ## Class method to enable packet capturing for the packet-generator + # interface. + # Delete old /tmp/pgX_out.pcap file if exists and set the packet-generator + # to capture outgoing packets to /tmp/pgX_out.pcap file. + # @param cls The class pointer. + # @param args List variable to store the indexes of the packet-generator + # interfaces to start packet capturing for. + @classmethod + def pg_enable_capture(cls, args): + for i in args: + os.system("sudo rm -f /tmp/pg%u_out.pcap" % i) + cls.cli(0, "packet-generator capture pg%u pcap /tmp/pg%u_out.pcap" + % (i, i)) + + ## Class method to start packet sending. + # Start to send packets for all defined pg streams. Delete every stream + # from the stream list when sent and clear the pg_streams list. + # @param cls The class pointer. + @classmethod + def pg_start(cls): + cls.cli(2, "trace add pg-input 50") # 50 is maximum + cls.cli(0, 'packet-generator enable') + for stream in cls.pg_streams: + cls.cli(0, 'packet-generator delete %s' % stream) + cls.pg_streams = [] + + ## Class method to return captured packets. + # Return packet captured for the defined packet-generator interface. Open + # the corresponding pcap file (/tmp/pgX_out.pcap), read the content and + # store captured packets to output variable. + # @param cls The class pointer. + # @param o Integer variable to store the index of the packet-generator + # interface. + # @return output List of packets captured on the defined packet-generator + # interface. If the corresponding pcap file (/tmp/pgX_out.pcap) does not + # exist return empty list. + @classmethod + def pg_get_capture(cls, o): + pcap_filename = "/tmp/pg%u_out.pcap" % o + try: + output = rdpcap(pcap_filename) + except IOError: # TODO + cls.log("WARNING: File %s does not exist, probably because no" + " packets arrived" % pcap_filename) + return [] + return output + ## @var pcap_filename + # File descriptor to the corresponding pcap file. + + ## Class method to create packet-generator interfaces. + # Create packet-generator interfaces and add host MAC addresses connected + # to these packet-generator interfaces to the MY_MACS dictionary. + # @param cls The class pointer. + # @param args List variable to store the indexes of the packet-generator + # interfaces to be created. + @classmethod + def create_interfaces(cls, args): + for i in args: + cls.MY_MACS[i] = "02:00:00:00:ff:%02x" % i + cls.log("My MAC address is %s" % (cls.MY_MACS[i])) + cls.api("pg_create_interface if_id %u" % i) + cls.api("sw_interface_set_flags pg%u admin-up" % i) + + ## Static method to extend packet to specified size + # Extend provided packet to the specified size (including Ethernet FCS). + # The packet is extended by adding corresponding number of spaces to the + # packet payload. + # NOTE: Currently works only when Raw layer is present. + # @param packet Variable to store packet object. + # @param size Integer variable to store the required size of the packet. + @staticmethod + def extend_packet(packet, size): + packet_len = len(packet) + 4 + extend = size - packet_len + if extend > 0: + packet[Raw].load += ' ' * extend + ## @var packet_len + # Integer variable to store the current packet length including + # Ethernet FCS. + ## @var extend + # Integer variable to store the size of the packet extension. + + ## Method to add packet info object to the packet_infos list. + # Extend the existing packet_infos list with the given information from + # the packet. + # @param self The object pointer. + # @param info Object to store required information from the packet. + def add_packet_info_to_list(self, info): + info.index = len(self.packet_infos) + self.packet_infos[info.index] = info + ## @var info.index + # Info object attribute to store the packet order in the stream. + ## @var packet_infos + # List variable to store required information from packets. + + ## Method to create packet info object. + # Create the existing packet_infos list with the given information from + # the packet. + # @param self The object pointer. + # @param pg_id Integer variable to store the index of the packet-generator + # interface. + def create_packet_info(self, pg_id, target_id): + info = _PacketInfo() + self.add_packet_info_to_list(info) + info.src = pg_id + info.dst = target_id + return info + ## @var info + # Object to store required information from packet. + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + ## Static method to return packet info string. + # Create packet info string from the provided info object that will be put + # to the packet payload. + # @param info Object to store required information from the packet. + # @return String of information about packet's order in the stream, source + # and destination packet generator interface. + @staticmethod + def info_to_payload(info): + return "%d %d %d" % (info.index, info.src, info.dst) + + ## Static method to create packet info object from the packet payload. + # Create packet info object and set its attribute values based on data + # gained from the packet payload. + # @param payload String variable to store packet payload. + # @return info Object to store required information about the packet. + @staticmethod + def payload_to_info(payload): + numbers = payload.split() + info = _PacketInfo() + info.index = int(numbers[0]) + info.src = int(numbers[1]) + info.dst = int(numbers[2]) + return info + ## @var info.index + # Info object attribute to store the packet order in the stream. + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + ## Method to return packet info object of the next packet in + # the packet_infos list. + # Get the next packet info object from the packet_infos list by increasing + # the packet_infos list index by one. + # @param self The object pointer. + # @param info Object to store required information about the packet. + # @return packet_infos[next_index] Next info object from the packet_infos + # list with stored information about packets. Return None if the end of + # the list is reached. + def get_next_packet_info(self, info): + if info is None: + next_index = 0 + else: + next_index = info.index + 1 + if next_index == len(self.packet_infos): + return None + else: + return self.packet_infos[next_index] + ## @var next_index + # Integer variable to store the index of the next info object. + + ## Method to return packet info object of the next packet with the required + # source packet generator interface. + # Iterate over the packet_infos list and search for the next packet info + # object with the required source packet generator interface. + # @param self The object pointer. + # @param src_pg Integer variable to store index of requested source packet + # generator interface. + # @param info Object to store required information about the packet. + # @return packet_infos[next_index] Next info object from the packet_infos + # list with stored information about packets. Return None if the end of + # the list is reached. + def get_next_packet_info_for_interface(self, src_pg, info): + while True: + info = self.get_next_packet_info(info) + if info is None: + return None + if info.src == src_pg: + return info + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + + ## Method to return packet info object of the next packet with required + # source and destination packet generator interfaces. + # Search for the next packet info object with the required source and + # destination packet generator interfaces. + # @param self The object pointer. + # @param src_pg Integer variable to store the index of the requested source + # packet generator interface. + # @param dst_pg Integer variable to store the index of the requested source + # packet generator interface. + # @param info Object to store required information about the packet. + # @return info Object with the info about the next packet with with + # required source and destination packet generator interfaces. Return None + # if there is no other packet with required data. + def get_next_packet_info_for_interface2(self, src_pg, dst_pg, info): + while True: + info = self.get_next_packet_info_for_interface(src_pg, info) + if info is None: + return None + if info.dst == dst_pg: + return info + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + +## Subclass of the python unittest.TestResult class. +# +# This subclass provides methods to compile information about which tests have +# succeeded and which have failed. +class VppTestResult(unittest.TestResult): + ## The constructor. + # @param stream File descriptor to store where to report test results. Set + # to the standard error stream by default. + # @param descriptions Boolean variable to store information if to use test + # case descriptions. + # @param verbosity Integer variable to store required verbosity level. + def __init__(self, stream, descriptions, verbosity): + unittest.TestResult.__init__(self, stream, descriptions, verbosity) + self.stream = stream + self.descriptions = descriptions + self.verbosity = verbosity + self.result_string = None + ## @var result_string + # String variable to store the test case result string. + + + ## Method called when the test case succeeds. + # Run the default implementation (that does nothing) and set the result + # string in case of test case success. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def addSuccess(self, test): + unittest.TestResult.addSuccess(self, test) + self.result_string = GREEN + "OK" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method called when the test case signals a failure. + # Run the default implementation that appends a tuple (test, formatted_err) + # to the instance's failures attribute, where formatted_err is a formatted + # traceback derived from err and set the result string in case of test case + # success. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @param err Tuple variable to store the error data: + # (type, value, traceback). + def addFailure(self, test, err): + unittest.TestResult.addFailure(self, test, err) + self.result_string = RED + "FAIL" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method called when the test case raises an unexpected exception. + # Run the default implementation that appends a tuple (test, formatted_err) + # to the instance's error attribute, where formatted_err is a formatted + # traceback derived from err and set the result string in case of test case + # unexpected failure. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @param err Tuple variable to store the error data: + # (type, value, traceback). + def addError(self, test, err): + unittest.TestResult.addError(self, test, err) + self.result_string = RED + "ERROR" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method to get the description of the test case. + # Used to get the description string from the test case object. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @return String of the short description if exist otherwise return test + # case name string. + def getDescription(self, test): + # TODO: if none print warning not raise exception + short_description = test.shortDescription() + if self.descriptions and short_description: + return short_description + else: + return str(test) + ## @var short_description + # String variable to store the short description of the test case. + + ## Method called when the test case is about to be run. + # Run the default implementation and based on the set verbosity level write + # the starting string to the output stream. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def startTest(self, test): + unittest.TestResult.startTest(self, test) + if self.verbosity > 0: + self.stream.writeln("Starting " + self.getDescription(test) + " ...") + self.stream.writeln("------------------------------------------------------------------") + + ## Method called after the test case has been executed. + # Run the default implementation and based on the set verbosity level write + # the result string to the output stream. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def stopTest(self, test): + unittest.TestResult.stopTest(self, test) + if self.verbosity > 0: + self.stream.writeln("------------------------------------------------------------------") + self.stream.writeln("%-60s%s" % (self.getDescription(test), self.result_string)) + self.stream.writeln("------------------------------------------------------------------") + else: + self.stream.writeln("%-60s%s" % (self.getDescription(test), self.result_string)) + + ## Method to write errors and failures information to the output stream. + # Write content of errors and failures lists to the output stream. + # @param self The object pointer. + def printErrors(self): + self.stream.writeln() + self.printErrorList('ERROR', self.errors) + self.printErrorList('FAIL', self.failures) + ## @var errors + # List variable containing 2-tuples of TestCase instances and strings + # holding formatted tracebacks. Each tuple represents a test which + # raised an unexpected exception. + ## @var failures + # List variable containing 2-tuples of TestCase instances and strings + # holding formatted tracebacks. Each tuple represents a test where + # a failure was explicitly signalled using the TestCase.assert*() + # methods. + + ## Method to write the error information to the output stream. + # Write content of error lists to the output stream together with error + # type and test case description. + # @param self The object pointer. + # @param flavour String variable to store error type. + # @param errors List variable to store 2-tuples of TestCase instances and + # strings holding formatted tracebacks. + def printErrorList(self, flavour, errors): + for test, err in errors: + self.stream.writeln('=' * 70) + self.stream.writeln("%s: %s" % (flavour, self.getDescription(test))) + self.stream.writeln('-' * 70) + self.stream.writeln("%s" % err) + ## @var test + # Object variable to store the test case instance. + ## @var err + # String variable to store formatted tracebacks. + + +## Subclass of the python unittest.TextTestRunner class. +# +# A basic test runner implementation which prints results on standard error. +class VppTestRunner(unittest.TextTestRunner): + ## Class object variable to store the results of a set of tests. + resultclass = VppTestResult + + ## Method to run the test. + # Print debug message in the terminal and run the standard run() method + # of the test runner collecting the result into the test result object. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @return Test result object of the VppTestRunner. + def run(self, test): + print "Running tests using custom test runner" # debug message + return super(VppTestRunner, self).run(test) -- cgit 1.2.3-korg