diff options
author | Damjan Marion <damarion@cisco.com> | 2016-10-03 19:44:57 +0200 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2016-10-03 19:58:19 +0000 |
commit | f56b77a0764222cc45a9df572df901067a273356 (patch) | |
tree | 7f11bcde6219608ad2a433655ea58cd11747025a /test | |
parent | 6c3ebcc2bfd36a5835a99225ad667e4403293ffb (diff) |
test: new test infrastructure
Change-Id: I73ca19c431743f6b39669c583d9222a6559346ef
Signed-off-by: Jan Gelety <jgelety@cisco.com>
Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
Signed-off-by: Stefan Kobza <skobza@cisco.com>
Signed-off-by: Matej Klotton <mklotton@cisco.com>
Signed-off-by: Maciek Konstantynowicz <mkonstan@cisco.com>
Signed-off-by: Damjan Marion <damarion@cisco.com>
Diffstat (limited to 'test')
-rw-r--r-- | test/Makefile | 3 | ||||
-rw-r--r-- | test/framework.py | 621 | ||||
-rw-r--r-- | test/run_tests.py | 12 | ||||
-rw-r--r-- | test/scapy_handlers/__init__.py | 0 | ||||
-rw-r--r-- | test/scapy_handlers/vxlan.py | 17 | ||||
-rw-r--r-- | test/template_bd.py | 106 | ||||
-rw-r--r-- | test/test_infra.md | 145 | ||||
-rw-r--r-- | test/test_ip.py | 261 | ||||
-rw-r--r-- | test/test_ip6.py | 281 | ||||
-rw-r--r-- | test/test_l2bd.py | 473 | ||||
-rw-r--r-- | test/test_l2xc.py | 243 | ||||
-rw-r--r-- | test/test_vxlan.py | 102 | ||||
-rw-r--r-- | test/util.py | 139 |
13 files changed, 2403 insertions, 0 deletions
diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 00000000000..7cbcf97a21d --- /dev/null +++ b/test/Makefile @@ -0,0 +1,3 @@ + +all: + @python run_tests.py discover -p test_$(TEST)"*.py" diff --git a/test/framework.py b/test/framework.py new file mode 100644 index 00000000000..67f8a058ad4 --- /dev/null +++ b/test/framework.py @@ -0,0 +1,621 @@ +#!/usr/bin/env python +## @package framework +# Module to handle test case execution. +# +# The module provides a set of tools for constructing and running tests and +# representing the results. + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import os +import subprocess +import unittest +from inspect import getdoc + +from scapy.utils import wrpcap, rdpcap +from scapy.packet import Raw + +## Static variables to store color formatting strings. +# +# These variables (RED, GREEN, YELLOW and LPURPLE) are used to configure +# the color of the text to be printed in the terminal. Variable END is used +# to revert the text color to the default one. +RED = '\033[91m' +GREEN = '\033[92m' +YELLOW = '\033[93m' +LPURPLE = '\033[94m' +END = '\033[0m' + +## Private class to create packet info object. +# +# Help process information about the next packet. +# Set variables to default values. +class _PacketInfo(object): + index = -1 + src = -1 + dst = -1 + data = None + ## @var index + # Integer variable to store the index of the packet. + ## @var src + # Integer variable to store the index of the source packet generator + # interface of the packet. + ## @var dst + # Integer variable to store the index of the destination packet generator + # interface of the packet. + ## @var data + # Object variable to store the copy of the former packet. + +## Subclass of the python unittest.TestCase class. +# +# This subclass is a base class for test cases that are implemented as classes. +# It provides methods to create and run test case. +class VppTestCase(unittest.TestCase): + + ## Class method to set class constants necessary to run test case. + # @param cls The class pointer. + @classmethod + def setUpConstants(cls): + cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp") + cls.vpp_api_test_bin = os.getenv("VPP_TEST_API_TEST_BIN", + "vpp-api-test") + cls.vpp_cmdline = [cls.vpp_bin, "unix", "nodaemon", "api-segment", "{", + "prefix", "unittest", "}"] + cls.vpp_api_test_cmdline = [cls.vpp_api_test_bin, "chroot", "prefix", + "unittest"] + try: + cls.verbose = int(os.getenv("V", 0)) + except: + cls.verbose = 0 + + ## @var vpp_bin + # String variable to store the path to vpp (vector packet processor). + ## @var vpp_api_test_bin + # String variable to store the path to vpp_api_test (vpp API test tool). + ## @var vpp_cmdline + # List of command line attributes for vpp. + ## @var vpp_api_test_cmdline + # List of command line attributes for vpp_api_test. + ## @var verbose + # Integer variable to store required verbosity level. + + ## Class method to start the test case. + # 1. Initiate test case constants and set test case variables to default + # values. + # 2. Remove files from the shared memory. + # 3. Start vpp as a subprocess. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + cls.setUpConstants() + cls.pg_streams = [] + cls.MY_MACS = {} + cls.MY_IP4S = {} + cls.MY_IP6S = {} + cls.VPP_MACS = {} + cls.VPP_IP4S = {} + cls.VPP_IP6S = {} + cls.packet_infos = {} + print "==================================================================" + print YELLOW + getdoc(cls) + END + print "==================================================================" + os.system("rm -f /dev/shm/unittest-global_vm") + os.system("rm -f /dev/shm/unittest-vpe-api") + os.system("rm -f /dev/shm/unittest-db") + cls.vpp = subprocess.Popen(cls.vpp_cmdline, stderr=subprocess.PIPE) + ## @var pg_streams + # List variable to store packet-generator streams for interfaces. + ## @var MY_MACS + # Dictionary variable to store host MAC addresses connected to packet + # generator interfaces. + ## @var MY_IP4S + # Dictionary variable to store host IPv4 addresses connected to packet + # generator interfaces. + ## @var MY_IP6S + # Dictionary variable to store host IPv6 addresses connected to packet + # generator interfaces. + ## @var VPP_MACS + # Dictionary variable to store VPP MAC addresses of the packet + # generator interfaces. + ## @var VPP_IP4S + # Dictionary variable to store VPP IPv4 addresses of the packet + # generator interfaces. + ## @var VPP_IP6S + # Dictionary variable to store VPP IPv6 addresses of the packet + # generator interfaces. + ## @var vpp + # Test case object variable to store file descriptor of running vpp + # subprocess with open pipe to the standard error stream per + # VppTestCase object. + + ## Class method to do cleaning when all tests (test_) defined for + # VppTestCase class are finished. + # 1. Terminate vpp and kill all vpp instances. + # 2. Remove files from the shared memory. + # @param cls The class pointer. + @classmethod + def quit(cls): + cls.vpp.terminate() + cls.vpp = None + os.system("rm -f /dev/shm/unittest-global_vm") + os.system("rm -f /dev/shm/unittest-vpe-api") + os.system("rm -f /dev/shm/unittest-db") + + ## Class method to define tear down action of the VppTestCase class. + # @param cls The class pointer. + @classmethod + def tearDownClass(cls): + cls.quit() + + ## Method to define tear down VPP actions of the test case. + # @param self The object pointer. + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show ip arp") + self.cli(2, "show ip fib") + self.cli(2, "show error") + self.cli(2, "show run") + + ## Method to define setup action of the test case. + # @param self The object pointer. + def setUp(self): + self.cli(2, "clear trace") + + ## Class method to print logs. + # Based on set level of verbosity print text in the terminal. + # @param cls The class pointer. + # @param s String variable to store text to be printed. + # @param v Integer variable to store required level of verbosity. + @classmethod + def log(cls, s, v=1): + if cls.verbose >= v: + print "LOG: " + LPURPLE + s + END + + ## Class method to execute api commands. + # Based on set level of verbosity print the output of the api command in + # the terminal. + # @param cls The class pointer. + # @param s String variable to store api command string. + @classmethod + def api(cls, s): + p = subprocess.Popen(cls.vpp_api_test_cmdline, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + if cls.verbose > 0: + print "API: " + RED + s + END + p.stdin.write(s) + out = p.communicate()[0] + out = out.replace("vat# ", "", 2) + if cls.verbose > 0: + if len(out) > 1: + print YELLOW + out + END + ## @var p + # Object variable to store file descriptor of vpp_api_test subprocess + # with open pipes to the standard output, inputs and error streams. + ## @var out + # Tuple variable to store standard output of vpp_api_test subprocess + # where the string "vat# " is replaced by empty string later. + + ## Class method to execute cli commands. + # Based on set level of verbosity of the log and verbosity defined by + # environmental variable execute the cli command and print the output in + # the terminal. + # CLI command is executed via vpp API test tool (exec + cli_command) + # @param cls The class pointer. + # @param v Integer variable to store required level of verbosity. + # @param s String variable to store cli command string. + @classmethod + def cli(cls, v, s): + if cls.verbose < v: + return + p = subprocess.Popen(cls.vpp_api_test_cmdline, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + if cls.verbose > 0: + print "CLI: " + RED + s + END + p.stdin.write('exec ' + s) + out = p.communicate()[0] + out = out.replace("vat# ", "", 2) + if cls.verbose > 0: + if len(out) > 1: + print YELLOW + out + END + ## @var p + # Object variable to store file descriptor of vpp_api_test subprocess + # with open pipes to the standard output, inputs and error streams. + ## @var out + # Tuple variable to store standard output of vpp_api_test subprocess + # where the string "vat# " is replaced by empty string later. + + ## Class method to create incoming packet stream for the packet-generator + # interface. + # Delete old /tmp/pgX_in.pcap file if exists and create the empty one and + # fill it with provided packets and add it to pg_streams list. + # @param cls The class pointer. + # @param i Integer variable to store the index of the packet-generator + # interface to create packet stream for. + # @param pkts List variable to store packets to be added to the stream. + @classmethod + def pg_add_stream(cls, i, pkts): + os.system("sudo rm -f /tmp/pg%u_in.pcap" % i) + wrpcap("/tmp/pg%u_in.pcap" % i, pkts) + # no equivalent API command + cls.cli(0, "packet-generator new pcap /tmp/pg%u_in.pcap source pg%u" + " name pcap%u" % (i, i, i)) + cls.pg_streams.append('pcap%u' % i) + + ## Class method to enable packet capturing for the packet-generator + # interface. + # Delete old /tmp/pgX_out.pcap file if exists and set the packet-generator + # to capture outgoing packets to /tmp/pgX_out.pcap file. + # @param cls The class pointer. + # @param args List variable to store the indexes of the packet-generator + # interfaces to start packet capturing for. + @classmethod + def pg_enable_capture(cls, args): + for i in args: + os.system("sudo rm -f /tmp/pg%u_out.pcap" % i) + cls.cli(0, "packet-generator capture pg%u pcap /tmp/pg%u_out.pcap" + % (i, i)) + + ## Class method to start packet sending. + # Start to send packets for all defined pg streams. Delete every stream + # from the stream list when sent and clear the pg_streams list. + # @param cls The class pointer. + @classmethod + def pg_start(cls): + cls.cli(2, "trace add pg-input 50") # 50 is maximum + cls.cli(0, 'packet-generator enable') + for stream in cls.pg_streams: + cls.cli(0, 'packet-generator delete %s' % stream) + cls.pg_streams = [] + + ## Class method to return captured packets. + # Return packet captured for the defined packet-generator interface. Open + # the corresponding pcap file (/tmp/pgX_out.pcap), read the content and + # store captured packets to output variable. + # @param cls The class pointer. + # @param o Integer variable to store the index of the packet-generator + # interface. + # @return output List of packets captured on the defined packet-generator + # interface. If the corresponding pcap file (/tmp/pgX_out.pcap) does not + # exist return empty list. + @classmethod + def pg_get_capture(cls, o): + pcap_filename = "/tmp/pg%u_out.pcap" % o + try: + output = rdpcap(pcap_filename) + except IOError: # TODO + cls.log("WARNING: File %s does not exist, probably because no" + " packets arrived" % pcap_filename) + return [] + return output + ## @var pcap_filename + # File descriptor to the corresponding pcap file. + + ## Class method to create packet-generator interfaces. + # Create packet-generator interfaces and add host MAC addresses connected + # to these packet-generator interfaces to the MY_MACS dictionary. + # @param cls The class pointer. + # @param args List variable to store the indexes of the packet-generator + # interfaces to be created. + @classmethod + def create_interfaces(cls, args): + for i in args: + cls.MY_MACS[i] = "02:00:00:00:ff:%02x" % i + cls.log("My MAC address is %s" % (cls.MY_MACS[i])) + cls.api("pg_create_interface if_id %u" % i) + cls.api("sw_interface_set_flags pg%u admin-up" % i) + + ## Static method to extend packet to specified size + # Extend provided packet to the specified size (including Ethernet FCS). + # The packet is extended by adding corresponding number of spaces to the + # packet payload. + # NOTE: Currently works only when Raw layer is present. + # @param packet Variable to store packet object. + # @param size Integer variable to store the required size of the packet. + @staticmethod + def extend_packet(packet, size): + packet_len = len(packet) + 4 + extend = size - packet_len + if extend > 0: + packet[Raw].load += ' ' * extend + ## @var packet_len + # Integer variable to store the current packet length including + # Ethernet FCS. + ## @var extend + # Integer variable to store the size of the packet extension. + + ## Method to add packet info object to the packet_infos list. + # Extend the existing packet_infos list with the given information from + # the packet. + # @param self The object pointer. + # @param info Object to store required information from the packet. + def add_packet_info_to_list(self, info): + info.index = len(self.packet_infos) + self.packet_infos[info.index] = info + ## @var info.index + # Info object attribute to store the packet order in the stream. + ## @var packet_infos + # List variable to store required information from packets. + + ## Method to create packet info object. + # Create the existing packet_infos list with the given information from + # the packet. + # @param self The object pointer. + # @param pg_id Integer variable to store the index of the packet-generator + # interface. + def create_packet_info(self, pg_id, target_id): + info = _PacketInfo() + self.add_packet_info_to_list(info) + info.src = pg_id + info.dst = target_id + return info + ## @var info + # Object to store required information from packet. + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + ## Static method to return packet info string. + # Create packet info string from the provided info object that will be put + # to the packet payload. + # @param info Object to store required information from the packet. + # @return String of information about packet's order in the stream, source + # and destination packet generator interface. + @staticmethod + def info_to_payload(info): + return "%d %d %d" % (info.index, info.src, info.dst) + + ## Static method to create packet info object from the packet payload. + # Create packet info object and set its attribute values based on data + # gained from the packet payload. + # @param payload String variable to store packet payload. + # @return info Object to store required information about the packet. + @staticmethod + def payload_to_info(payload): + numbers = payload.split() + info = _PacketInfo() + info.index = int(numbers[0]) + info.src = int(numbers[1]) + info.dst = int(numbers[2]) + return info + ## @var info.index + # Info object attribute to store the packet order in the stream. + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + ## Method to return packet info object of the next packet in + # the packet_infos list. + # Get the next packet info object from the packet_infos list by increasing + # the packet_infos list index by one. + # @param self The object pointer. + # @param info Object to store required information about the packet. + # @return packet_infos[next_index] Next info object from the packet_infos + # list with stored information about packets. Return None if the end of + # the list is reached. + def get_next_packet_info(self, info): + if info is None: + next_index = 0 + else: + next_index = info.index + 1 + if next_index == len(self.packet_infos): + return None + else: + return self.packet_infos[next_index] + ## @var next_index + # Integer variable to store the index of the next info object. + + ## Method to return packet info object of the next packet with the required + # source packet generator interface. + # Iterate over the packet_infos list and search for the next packet info + # object with the required source packet generator interface. + # @param self The object pointer. + # @param src_pg Integer variable to store index of requested source packet + # generator interface. + # @param info Object to store required information about the packet. + # @return packet_infos[next_index] Next info object from the packet_infos + # list with stored information about packets. Return None if the end of + # the list is reached. + def get_next_packet_info_for_interface(self, src_pg, info): + while True: + info = self.get_next_packet_info(info) + if info is None: + return None + if info.src == src_pg: + return info + ## @var info.src + # Info object attribute to store the index of the source packet + # generator interface of the packet. + + ## Method to return packet info object of the next packet with required + # source and destination packet generator interfaces. + # Search for the next packet info object with the required source and + # destination packet generator interfaces. + # @param self The object pointer. + # @param src_pg Integer variable to store the index of the requested source + # packet generator interface. + # @param dst_pg Integer variable to store the index of the requested source + # packet generator interface. + # @param info Object to store required information about the packet. + # @return info Object with the info about the next packet with with + # required source and destination packet generator interfaces. Return None + # if there is no other packet with required data. + def get_next_packet_info_for_interface2(self, src_pg, dst_pg, info): + while True: + info = self.get_next_packet_info_for_interface(src_pg, info) + if info is None: + return None + if info.dst == dst_pg: + return info + ## @var info.dst + # Info object attribute to store the index of the destination packet + # generator interface of the packet. + + +## Subclass of the python unittest.TestResult class. +# +# This subclass provides methods to compile information about which tests have +# succeeded and which have failed. +class VppTestResult(unittest.TestResult): + ## The constructor. + # @param stream File descriptor to store where to report test results. Set + # to the standard error stream by default. + # @param descriptions Boolean variable to store information if to use test + # case descriptions. + # @param verbosity Integer variable to store required verbosity level. + def __init__(self, stream, descriptions, verbosity): + unittest.TestResult.__init__(self, stream, descriptions, verbosity) + self.stream = stream + self.descriptions = descriptions + self.verbosity = verbosity + self.result_string = None + ## @var result_string + # String variable to store the test case result string. + + + ## Method called when the test case succeeds. + # Run the default implementation (that does nothing) and set the result + # string in case of test case success. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def addSuccess(self, test): + unittest.TestResult.addSuccess(self, test) + self.result_string = GREEN + "OK" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method called when the test case signals a failure. + # Run the default implementation that appends a tuple (test, formatted_err) + # to the instance's failures attribute, where formatted_err is a formatted + # traceback derived from err and set the result string in case of test case + # success. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @param err Tuple variable to store the error data: + # (type, value, traceback). + def addFailure(self, test, err): + unittest.TestResult.addFailure(self, test, err) + self.result_string = RED + "FAIL" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method called when the test case raises an unexpected exception. + # Run the default implementation that appends a tuple (test, formatted_err) + # to the instance's error attribute, where formatted_err is a formatted + # traceback derived from err and set the result string in case of test case + # unexpected failure. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @param err Tuple variable to store the error data: + # (type, value, traceback). + def addError(self, test, err): + unittest.TestResult.addError(self, test, err) + self.result_string = RED + "ERROR" + END + ## @var result_string + # String variable to store the test case result string. + + ## Method to get the description of the test case. + # Used to get the description string from the test case object. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @return String of the short description if exist otherwise return test + # case name string. + def getDescription(self, test): + # TODO: if none print warning not raise exception + short_description = test.shortDescription() + if self.descriptions and short_description: + return short_description + else: + return str(test) + ## @var short_description + # String variable to store the short description of the test case. + + ## Method called when the test case is about to be run. + # Run the default implementation and based on the set verbosity level write + # the starting string to the output stream. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def startTest(self, test): + unittest.TestResult.startTest(self, test) + if self.verbosity > 0: + self.stream.writeln("Starting " + self.getDescription(test) + " ...") + self.stream.writeln("------------------------------------------------------------------") + + ## Method called after the test case has been executed. + # Run the default implementation and based on the set verbosity level write + # the result string to the output stream. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + def stopTest(self, test): + unittest.TestResult.stopTest(self, test) + if self.verbosity > 0: + self.stream.writeln("------------------------------------------------------------------") + self.stream.writeln("%-60s%s" % (self.getDescription(test), self.result_string)) + self.stream.writeln("------------------------------------------------------------------") + else: + self.stream.writeln("%-60s%s" % (self.getDescription(test), self.result_string)) + + ## Method to write errors and failures information to the output stream. + # Write content of errors and failures lists to the output stream. + # @param self The object pointer. + def printErrors(self): + self.stream.writeln() + self.printErrorList('ERROR', self.errors) + self.printErrorList('FAIL', self.failures) + ## @var errors + # List variable containing 2-tuples of TestCase instances and strings + # holding formatted tracebacks. Each tuple represents a test which + # raised an unexpected exception. + ## @var failures + # List variable containing 2-tuples of TestCase instances and strings + # holding formatted tracebacks. Each tuple represents a test where + # a failure was explicitly signalled using the TestCase.assert*() + # methods. + + ## Method to write the error information to the output stream. + # Write content of error lists to the output stream together with error + # type and test case description. + # @param self The object pointer. + # @param flavour String variable to store error type. + # @param errors List variable to store 2-tuples of TestCase instances and + # strings holding formatted tracebacks. + def printErrorList(self, flavour, errors): + for test, err in errors: + self.stream.writeln('=' * 70) + self.stream.writeln("%s: %s" % (flavour, self.getDescription(test))) + self.stream.writeln('-' * 70) + self.stream.writeln("%s" % err) + ## @var test + # Object variable to store the test case instance. + ## @var err + # String variable to store formatted tracebacks. + + +## Subclass of the python unittest.TextTestRunner class. +# +# A basic test runner implementation which prints results on standard error. +class VppTestRunner(unittest.TextTestRunner): + ## Class object variable to store the results of a set of tests. + resultclass = VppTestResult + + ## Method to run the test. + # Print debug message in the terminal and run the standard run() method + # of the test runner collecting the result into the test result object. + # @param self The object pointer. + # @param test Object variable to store the test case instance. + # @return Test result object of the VppTestRunner. + def run(self, test): + print "Running tests using custom test runner" # debug message + return super(VppTestRunner, self).run(test) diff --git a/test/run_tests.py b/test/run_tests.py new file mode 100644 index 00000000000..8f2174b1463 --- /dev/null +++ b/test/run_tests.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python + +import os +import unittest +from framework import VppTestRunner + +if __name__ == '__main__': + try: + verbose = int(os.getenv("V", 0)) + except: + verbose = 0 + unittest.main(testRunner=VppTestRunner, module=None, verbosity=verbose) diff --git a/test/scapy_handlers/__init__.py b/test/scapy_handlers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/test/scapy_handlers/__init__.py diff --git a/test/scapy_handlers/vxlan.py b/test/scapy_handlers/vxlan.py new file mode 100644 index 00000000000..bf86f179a8e --- /dev/null +++ b/test/scapy_handlers/vxlan.py @@ -0,0 +1,17 @@ +from scapy.fields import BitField, XByteField, X3BytesField +from scapy.packet import Packet, bind_layers +from scapy.layers.l2 import Ether +from scapy.layers.inet import UDP + + +class VXLAN(Packet): + name = "VXLAN" + fields_desc = [BitField("flags", 0x08000000, 32), + X3BytesField("vni", 0), + XByteField("reserved", 0x00)] + + def mysummary(self): + return self.sprintf("VXLAN (vni=%VXLAN.vni%)") + +bind_layers(UDP, VXLAN, dport=4789) +bind_layers(VXLAN, Ether) diff --git a/test/template_bd.py b/test/template_bd.py new file mode 100644 index 00000000000..473c42282b9 --- /dev/null +++ b/test/template_bd.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +from abc import abstractmethod + +from scapy.layers.l2 import Ether, Raw +from scapy.layers.inet import IP, UDP + + +class BridgeDomain(object): + def __init__(self): + ## Ethernet frame which is send to pg0 interface and is forwarded to pg1 + self.payload_0_1 = ( + Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') / + IP(src='1.2.3.4', dst='4.3.2.1') / + UDP(sport=10000, dport=20000) / + Raw('\xa5' * 100)) + + ## Ethernet frame which is send to pg1 interface and is forwarded to pg0 + self.payload_1_0 = ( + Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') / + IP(src='4.3.2.1', dst='1.2.3.4') / + UDP(sport=20000, dport=10000) / + Raw('\xa5' * 100)) + + ## Test case must implement this method, so template known how to send + # encapsulated frame. + @abstractmethod + def encapsulate(self, pkt): + pass + + ## Test case must implement this method, so template known how to get + # original payload. + @abstractmethod + def decapsulate(self, pkt): + pass + + ## Test case must implement this method, so template known how if the + # received frame is corectly encapsulated. + @abstractmethod + def check_encapsulation(self, pkt): + pass + + ## On pg0 interface are encapsulated frames, on pg1 are testing frames + # without encapsulation + def test_decap(self): + ## Prepare Ethernet frame that will be send encapsulated. + pkt_to_send = self.encapsulate(self.payload_0_1) + + ## Add packet to list of packets. + self.pg_add_stream(0, [pkt_to_send, ]) + + ## Enable Packet Capture on both ports. + self.pg_enable_capture([0, 1]) + + ## Start all streams + self.pg_start() + + ## Pick first received frame and check if is same as non-encapsulated + # frame. + out = self.pg_get_capture(1) + self.assertEqual(len(out), 1, + 'Invalid number of packets on ' + 'output: {}'.format(len(out))) + pkt = out[0] + + # TODO: add error messages + self.assertEqual(pkt[Ether].src, self.payload_0_1[Ether].src) + self.assertEqual(pkt[Ether].dst, self.payload_0_1[Ether].dst) + self.assertEqual(pkt[IP].src, self.payload_0_1[IP].src) + self.assertEqual(pkt[IP].dst, self.payload_0_1[IP].dst) + self.assertEqual(pkt[UDP].sport, self.payload_0_1[UDP].sport) + self.assertEqual(pkt[UDP].dport, self.payload_0_1[UDP].dport) + self.assertEqual(pkt[Raw], self.payload_0_1[Raw]) + + ## Send non-encapsulated Ethernet frame from pg1 interface and expect + # encapsulated frame on pg0. On pg0 interface are encapsulated frames, + # on pg1 are testing frames without encapsulation. + def test_encap(self): + ## Create packet generator stream. + self.pg_add_stream(1, [self.payload_1_0]) + + ## Enable Packet Capture on both ports. + self.pg_enable_capture([0, 1]) + + ## Start all streams. + self.pg_start() + + ## Pick first received frame and check if is corectly encapsulated. + out = self.pg_get_capture(0) + self.assertEqual(len(out), 1, + 'Invalid number of packets on ' + 'output: {}'.format(len(out))) + rcvd = out[0] + self.check_encapsulation(rcvd) + + ## Get original frame from received packet and check if is same as + # sended frame. + rcvd_payload = self.decapsulate(rcvd) + # TODO: add error messages + self.assertEqual(rcvd_payload[Ether].src, self.payload_1_0[Ether].src) + self.assertEqual(rcvd_payload[Ether].dst, self.payload_1_0[Ether].dst) + self.assertEqual(rcvd_payload[IP].src, self.payload_1_0[IP].src) + self.assertEqual(rcvd_payload[IP].dst, self.payload_1_0[IP].dst) + self.assertEqual(rcvd_payload[UDP].sport, self.payload_1_0[UDP].sport) + self.assertEqual(rcvd_payload[UDP].dport, self.payload_1_0[UDP].dport) + self.assertEqual(rcvd_payload[Raw], self.payload_1_0[Raw]) diff --git a/test/test_infra.md b/test/test_infra.md new file mode 100644 index 00000000000..371d48b0dec --- /dev/null +++ b/test/test_infra.md @@ -0,0 +1,145 @@ +# VPP Functional Test Infra + +## Running VPP tests +VPP functional tests are triggered by `make test` command run in the git vpp source directory. Following Linux environment variables are used by the current VPP functional test infrastructure: + +- `TEST=<name>` - run only specific test identified by filename `test/test_<name>.py` +- `V=[0|1|2]` - set verbosity level. `0` for minimal verbosity, `1` for increased verbosity, `2` for maximum verbosity. Default value is 0. + +Example of running tests: + +``` +~/src/vpp-test-infra$ make test V=1 TEST=vxlan +``` + +All tests listed in `test/` directory are run by default. To run selected tests you can set variable TEST when starting tests. + +## Overview +The main functionality of the test framework is defined in [framework.py](test/framework.py) file. The implementation of the test framework uses classes and methods from Python module *unittest*. + +Three main classes are defined to support the overall test automation: + +* **class VppTestCase(unittest.TestCase)** - a sub-class of *unittest.TestCase* class. Provides methods to create and run test case. These methods can be divided into 5 groups: + 1. Methods to control test case setup and tear down: + * *def setUpConstants(cls):* + * *def setUpClass(cls):* + * *def quit(cls):* + * *def tearDownClass(cls):* + * *def tearDown(self):* + * *def setUp(self):* + + 2. Methods to create VPP packet generator interfaces: + * *def create_interfaces(cls, args):* + + 3. Methods to execute VPP commands and print logs in the output (terminal for now): + * *def log(cls, s, v=1):* + * *def api(cls, s):* + * *def cli(cls, v, s):* + + 4. Methods to control packet stream generation and capturing: + * *def pg_add_stream(cls, i, pkts):* + * *def pg_enable_capture(cls, args):* + * *def pg_start(cls):* + * *def pg_get_capture(cls, o):* + + 5. Methods to create and verify packets: + * *def extend_packet(packet, size):* + * *def add_packet_info_to_list(self, info):* + * *def create_packet_info(self, pg_id, target_id):* + * *def info_to_payload(info):* + * *def payload_to_info(payload):* + * *def get_next_packet_info(self, info):* + * *def get_next_packet_info_for_interface(self, src_pg, info):* + * *def get_next_packet_info_for_interface2(self, src_pg, dst_pg, info):* + +* **class VppTestResult(unittest.TestResult)** - a sub-class of *unittest.TestResult* class. Provides methods to compile information about the tests that have succeeded and the ones that have failed. These methods can be divided into 4 groups: + 1. Processing test case result: + * *def addSuccess(self, test):* + * *def addFailure(self, test, err):* + * *def addError(self, test, err):* + + 2. Processing test case description: + * *def getDescription(self, test):* + + 3. Processing test case start and stop: + * *def startTest(self, test):* + * *def stopTest(self, test):* + + 4. Printing error and failure information: + * *def printErrors(self):* + * *def printErrorList(self, flavour, errors):* + +* **class VppTestRunner(unittest.TextTestRunner)** - a sub-class of *unittest.TextTestRunner* class. Provides basic test runner implementation that prints results on standard error stream. Contains one method: + * *def run(self, test):* + +In addition [util.py] (test/util.py) file defines number of common methods useful for many test cases. All of these methods are currently contained in one class: + +* **class Util(object)**: + * *def resolve_arp(cls, args):* + * *def resolve_icmpv6_nd(cls, args):* + * *def config_ip4(cls, args):* + * *def config_ip6(cls, args):* + +## Interaction with VPP +VPP is started from command line as a sub-process during the test case setup phase. Command line attributes to start VPP are stored in class variable *vpp_cmdline*. +To get an overview of VPP command line attributes, visit section [Command-line Arguments](https://wiki.fd.io/view/VPP/Command-line_Arguments) on VPP wiki page. + +Current VPP test infrastructure is using two ways to interact with VPP for configuration, operational status check, tracing and logging. + +### Using API commands +API commands are executed by VPP API test tool that is started from command line as a sub-process. Command line attributes to start VPP API test tool are stored in class variable *vpp_api_test_cmdline*. +When executed, API command and its possible output are printed in the terminal if verbosity level is greater then 0. + +Example: + +``` +cls.api("sw_interface_set_flags pg1 admin-up") +``` + +will print in the terminal + +``` +API: sw_interface_set_flags pg1 admin-up +``` + +### Using CLI commands +CLI commands are executed via VPP API test tool by sending API command "*exec + cli_command*". It is possible to set verbosity level for executing specific CLI commands, so that the CLI command is executed only and only if its associated verbosity level is equal or lower then the verbosity level set in the system. + +Similarly to API commands, when executed, CLI command and its possible output are printed in the terminal if verbosity level is greater then 0. + +Example I - CLI command will be executed always (its verbosity is 0): + +``` +cls.cli(0, "show l2fib") +``` + +Example II - CLI command will be executed only if the verbosity level is set to 2: + +``` +self.cli(2, "show l2fib verbose") +``` + +## Logging +It is possible to log some additional information in the terminal for different verbosity levels. + +Example I - verbosity level of the log is set to default value (0): + +``` +self.log("Verifying capture %u" % i) +``` + +will be always printed in the terminal: + +``` +LOG: Verifying capture 0 +``` + +Example II - the log will be printed in the terminal only if the verbosity level is set to 2: + +``` +self.log("Got packet on port %u: src=%u (id=%u)" + % (o, payload_info.src, payload_info.index), 2) +``` + +--- +***END*** diff --git a/test/test_ip.py b/test/test_ip.py new file mode 100644 index 00000000000..3a2c90115fe --- /dev/null +++ b/test/test_ip.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import unittest +from framework import VppTestCase, VppTestRunner +from util import Util + +from scapy.packet import Raw +from scapy.layers.l2 import Ether, ARP, Dot1Q +from scapy.layers.inet import IP, UDP + + +class TestIPv4(Util, VppTestCase): + """ IPv4 Test Case """ + + @classmethod + def setUpClass(cls): + super(TestIPv4, cls).setUpClass() + + try: + cls.create_interfaces_and_subinterfaces() + + # configure IPv4 on hardware interfaces + cls.config_ip4(cls.interfaces) + + cls.config_ip4_on_software_interfaces(cls.interfaces) + + # resolve ARP using hardware interfaces + cls.resolve_arp(cls.interfaces) + + # let VPP know MAC addresses of peer (sub)interfaces + cls.resolve_arp_on_software_interfaces(cls.interfaces) + + # config 2M FIB enries + cls.config_fib_entries(2000000) + + except Exception as e: + super(TestIPv4, cls).tearDownClass() + raise + + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show ip arp") + # self.cli(2, "show ip fib") # 2M entries + self.cli(2, "show error") + self.cli(2, "show run") + + @classmethod + def create_vlan_subif(cls, pg_index, vlan): + cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan)) + + @classmethod + def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id): + cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad" + % (pg_index, sub_id, outer_vlan_id, inner_vlan_id)) + + class SoftInt(object): + pass + + class HardInt(SoftInt): + pass + + class Subint(SoftInt): + def __init__(self, sub_id): + self.sub_id = sub_id + + class Dot1QSubint(Subint): + def __init__(self, sub_id, vlan=None): + if vlan is None: + vlan = sub_id + super(TestIPv4.Dot1QSubint, self).__init__(sub_id) + self.vlan = vlan + + class Dot1ADSubint(Subint): + def __init__(self, sub_id, outer_vlan, inner_vlan): + super(TestIPv4.Dot1ADSubint, self).__init__(sub_id) + self.outer_vlan = outer_vlan + self.inner_vlan = inner_vlan + + @classmethod + def create_interfaces_and_subinterfaces(cls): + cls.interfaces = range(3) + + cls.create_interfaces(cls.interfaces) + + # Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces) + cls.api("sw_interface_dump") + + cls.INT_DETAILS = dict() + + cls.INT_DETAILS[0] = cls.HardInt() + + cls.INT_DETAILS[1] = cls.Dot1QSubint(100) + cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan) + + # FIXME: Wrong packet format/wrong layer on output of interface 2 + #self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300) + #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan) + + # Use dor1q for now + cls.INT_DETAILS[2] = cls.Dot1QSubint(200) + cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan) + + for i in cls.interfaces: + det = cls.INT_DETAILS[i] + if isinstance(det, cls.Subint): + cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id)) + + # IP adresses on subinterfaces + MY_SOFT_IP4S = {} + VPP_SOFT_IP4S = {} + + @classmethod + def config_ip4_on_software_interfaces(cls, args): + for i in args: + cls.MY_SOFT_IP4S[i] = "172.17.%u.2" % i + cls.VPP_SOFT_IP4S[i] = "172.17.%u.1" % i + if isinstance(cls.INT_DETAILS[i], cls.Subint): + interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id) + else: + interface = "pg%u" % i + cls.api("sw_interface_add_del_address %s %s/24" % (interface, cls.VPP_SOFT_IP4S[i])) + cls.log("My subinterface IPv4 address is %s" % (cls.MY_SOFT_IP4S[i])) + + # let VPP know MAC addresses of peer (sub)interfaces + @classmethod + def resolve_arp_on_software_interfaces(cls, args): + for i in args: + ip = cls.VPP_SOFT_IP4S[i] + cls.log("Sending ARP request for %s on port %u" % (ip, i)) + packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) / + ARP(op=ARP.who_has, pdst=ip, psrc=cls.MY_SOFT_IP4S[i], hwsrc=cls.MY_MACS[i])) + + cls.add_dot1_layers(i, packet) + + cls.pg_add_stream(i, packet) + cls.pg_enable_capture([i]) + + cls.cli(2, "trace add pg-input 1") + cls.pg_start() + + # We don't need to read output + + @classmethod + def config_fib_entries(cls, count): + n_int = len(cls.interfaces) + for i in cls.interfaces: + cls.api("ip_add_del_route 10.0.0.1/32 via %s count %u" % (cls.VPP_SOFT_IP4S[i], count / n_int)) + + @classmethod + def add_dot1_layers(cls, i, packet): + assert(type(packet) is Ether) + payload = packet.payload + det = cls.INT_DETAILS[i] + if isinstance(det, cls.Dot1QSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=det.sub_id) / payload) + elif isinstance(det, cls.Dot1ADSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload) + packet.type = 0x88A8 + + def remove_dot1_layers(self, i, packet): + self.assertEqual(type(packet), Ether) + payload = packet.payload + det = self.INT_DETAILS[i] + if isinstance(det, self.Dot1QSubint): + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan) + payload = payload.payload + elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan) + payload = payload.payload + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan) + payload = payload.payload + packet.remove_payload() + packet.add_payload(payload) + + def create_stream(self, pg_id): + pg_targets = [None] * 3 + pg_targets[0] = [1, 2] + pg_targets[1] = [0, 2] + pg_targets[2] = [0, 1] + pkts = [] + for i in range(0, 257): + target_pg_id = pg_targets[pg_id][i % 2] + info = self.create_packet_info(pg_id, target_pg_id) + payload = self.info_to_payload(info) + p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) / + IP(src=self.MY_SOFT_IP4S[pg_id], dst=self.MY_SOFT_IP4S[target_pg_id]) / + UDP(sport=1234, dport=1234) / + Raw(payload)) + info.data = p.copy() + self.add_dot1_layers(pg_id, p) + if not isinstance(self.INT_DETAILS[pg_id], self.Subint): + packet_sizes = [64, 512, 1518, 9018] + else: + packet_sizes = [64, 512, 1518+4, 9018+4] + size = packet_sizes[(i / 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + return pkts + + def verify_capture(self, o, capture): + last_info = {} + for i in self.interfaces: + last_info[i] = None + for packet in capture: + self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header + self.assertTrue(Dot1Q not in packet) + try: + ip = packet[IP] + udp = packet[UDP] + payload_info = self.payload_to_info(str(packet[Raw])) + packet_index = payload_info.index + src_pg = payload_info.src + dst_pg = payload_info.dst + self.assertEqual(dst_pg, o) + self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2) + next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg]) + last_info[src_pg] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(packet_index, next_info.index) + saved_packet = next_info.data + # Check standard fields + self.assertEqual(ip.src, saved_packet[IP].src) + self.assertEqual(ip.dst, saved_packet[IP].dst) + self.assertEqual(udp.sport, saved_packet[UDP].sport) + self.assertEqual(udp.dport, saved_packet[UDP].dport) + except: + self.log("Unexpected or invalid packet:") + packet.show() + raise + for i in self.interfaces: + remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i]) + self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i)) + + def test_fib(self): + """ IPv4 FIB test """ + + for i in self.interfaces: + pkts = self.create_stream(i) + self.pg_add_stream(i, pkts) + + self.pg_enable_capture(self.interfaces) + self.pg_start() + + for i in self.interfaces: + out = self.pg_get_capture(i) + self.log("Verifying capture %u" % i) + self.verify_capture(i, out) + + +if __name__ == '__main__': + unittest.main(testRunner = VppTestRunner) diff --git a/test/test_ip6.py b/test/test_ip6.py new file mode 100644 index 00000000000..38808a9e1b7 --- /dev/null +++ b/test/test_ip6.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import unittest +from framework import VppTestCase, VppTestRunner +from util import Util + +from scapy.packet import Raw +from scapy.layers.l2 import Ether, Dot1Q +from scapy.layers.inet6 import (IPv6, UDP, + ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr, + ICMPv6ND_NA, ICMPv6NDOptDstLLAddr) + + +@unittest.skip('Not finished yet.\n') +class TestIPv6(Util, VppTestCase): + """ IPv6 Test Case """ + + @classmethod + def setUpClass(cls): + super(TestIPv6, cls).setUpClass() + + try: + cls.create_interfaces_and_subinterfaces() + + # configure IPv6 on hardware interfaces + cls.config_ip6(cls.interfaces) + + cls.config_ip6_on_software_interfaces(cls.interfaces) + + # resolve ICMPv6 ND using hardware interfaces + cls.resolve_icmpv6_nd(cls.interfaces) + + # let VPP know MAC addresses of peer (sub)interfaces + # cls.resolve_icmpv6_nd_on_software_interfaces(cls.interfaces) + cls.send_neighbour_advertisement_on_software_interfaces(cls.interfaces) + + # config 2M FIB enries + #cls.config_fib_entries(2000000) + cls.config_fib_entries(1000000) + + except Exception as e: + super(TestIPv6, cls).tearDownClass() + raise + + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show ip arp") + # self.cli(2, "show ip fib") # 2M entries + self.cli(2, "show error") + self.cli(2, "show run") + + @classmethod + def create_vlan_subif(cls, pg_index, vlan): + cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan)) + + @classmethod + def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id): + cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad" + % (pg_index, sub_id, outer_vlan_id, inner_vlan_id)) + + class SoftInt(object): + pass + + class HardInt(SoftInt): + pass + + class Subint(SoftInt): + def __init__(self, sub_id): + self.sub_id = sub_id + + class Dot1QSubint(Subint): + def __init__(self, sub_id, vlan=None): + if vlan is None: + vlan = sub_id + super(TestIPv6.Dot1QSubint, self).__init__(sub_id) + self.vlan = vlan + + class Dot1ADSubint(Subint): + def __init__(self, sub_id, outer_vlan, inner_vlan): + super(TestIPv6.Dot1ADSubint, self).__init__(sub_id) + self.outer_vlan = outer_vlan + self.inner_vlan = inner_vlan + + @classmethod + def create_interfaces_and_subinterfaces(cls): + cls.interfaces = range(3) + + cls.create_interfaces(cls.interfaces) + + # Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces) + cls.api("sw_interface_dump") + + cls.INT_DETAILS = dict() + + cls.INT_DETAILS[0] = cls.HardInt() + + cls.INT_DETAILS[1] = cls.Dot1QSubint(100) + cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan) + + # FIXME: Wrong packet format/wrong layer on output of interface 2 + #self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300) + #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan) + + # Use dor1q for now + cls.INT_DETAILS[2] = cls.Dot1QSubint(200) + cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan) + + for i in cls.interfaces: + det = cls.INT_DETAILS[i] + if isinstance(det, cls.Subint): + cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id)) + + # IP adresses on subinterfaces + MY_SOFT_IP6S = {} + VPP_SOFT_IP6S = {} + + @classmethod + def config_ip6_on_software_interfaces(cls, args): + for i in args: + cls.MY_SOFT_IP6S[i] = "fd01:%u::2" % i + cls.VPP_SOFT_IP6S[i] = "fd01:%u::1" % i + if isinstance(cls.INT_DETAILS[i], cls.Subint): + interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id) + else: + interface = "pg%u" % i + cls.api("sw_interface_add_del_address %s %s/32" % (interface, cls.VPP_SOFT_IP6S[i])) + cls.log("My subinterface IPv6 address is %s" % (cls.MY_SOFT_IP6S[i])) + + # let VPP know MAC addresses of peer (sub)interfaces + @classmethod + def resolve_icmpv6_nd_on_software_interfaces(cls, args): + for i in args: + ip = cls.VPP_SOFT_IP6S[i] + cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i)) + nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) / + IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) / + ICMPv6ND_NS(tgt=ip) / + ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i])) + cls.pg_add_stream(i, nd_req) + cls.pg_enable_capture([i]) + + cls.cli(2, "trace add pg-input 1") + cls.pg_start() + + # We don't need to read output + + # let VPP know MAC addresses of peer (sub)interfaces + @classmethod + def send_neighbour_advertisement_on_software_interfaces(cls, args): + for i in args: + ip = cls.VPP_SOFT_IP6S[i] + cls.log("Sending ICMPv6ND_NA message for %s on port %u" % (ip, i)) + pkt = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) / + IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) / + ICMPv6ND_NA(tgt=ip, R=0, S=0) / + ICMPv6NDOptDstLLAddr(lladdr=cls.MY_MACS[i])) + cls.pg_add_stream(i, pkt) + cls.pg_enable_capture([i]) + + cls.cli(2, "trace add pg-input 1") + cls.pg_start() + + @classmethod + def config_fib_entries(cls, count): + n_int = len(cls.interfaces) + for i in cls.interfaces: + cls.api("ip_add_del_route fd02::1/128 via %s count %u" % (cls.VPP_SOFT_IP6S[i], count / n_int)) + + @classmethod + def add_dot1_layers(cls, i, packet): + assert(type(packet) is Ether) + payload = packet.payload + det = cls.INT_DETAILS[i] + if isinstance(det, cls.Dot1QSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=det.sub_id) / payload) + elif isinstance(det, cls.Dot1ADSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload) + packet.type = 0x88A8 + + def remove_dot1_layers(self, i, packet): + self.assertEqual(type(packet), Ether) + payload = packet.payload + det = self.INT_DETAILS[i] + if isinstance(det, self.Dot1QSubint): + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan) + payload = payload.payload + elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan) + payload = payload.payload + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan) + payload = payload.payload + packet.remove_payload() + packet.add_payload(payload) + + def create_stream(self, pg_id): + pg_targets = [None] * 3 + pg_targets[0] = [1, 2] + pg_targets[1] = [0, 2] + pg_targets[2] = [0, 1] + pkts = [] + for i in range(0, 257): + target_pg_id = pg_targets[pg_id][i % 2] + info = self.create_packet_info(pg_id, target_pg_id) + payload = self.info_to_payload(info) + p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) / + IPv6(src=self.MY_SOFT_IP6S[pg_id], dst=self.MY_SOFT_IP6S[target_pg_id]) / + UDP(sport=1234, dport=1234) / + Raw(payload)) + info.data = p.copy() + self.add_dot1_layers(pg_id, p) + if not isinstance(self.INT_DETAILS[pg_id], self.Subint): + packet_sizes = [76, 512, 1518, 9018] + else: + packet_sizes = [76, 512, 1518+4, 9018+4] + size = packet_sizes[(i / 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + return pkts + + def verify_capture(self, o, capture): + last_info = {} + for i in self.interfaces: + last_info[i] = None + for packet in capture: + self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header + self.assertTrue(Dot1Q not in packet) + try: + ip = packet[IPv6] + udp = packet[UDP] + payload_info = self.payload_to_info(str(packet[Raw])) + packet_index = payload_info.index + src_pg = payload_info.src + dst_pg = payload_info.dst + self.assertEqual(dst_pg, o) + self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2) + next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg]) + last_info[src_pg] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(packet_index, next_info.index) + saved_packet = next_info.data + # Check standard fields + self.assertEqual(ip.src, saved_packet[IPv6].src) + self.assertEqual(ip.dst, saved_packet[IPv6].dst) + self.assertEqual(udp.sport, saved_packet[UDP].sport) + self.assertEqual(udp.dport, saved_packet[UDP].dport) + except: + self.log("Unexpected or invalid packet:") + packet.show() + raise + for i in self.interfaces: + remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i]) + self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i)) + + def test_fib(self): + """ IPv6 FIB test """ + + for i in self.interfaces: + pkts = self.create_stream(i) + self.pg_add_stream(i, pkts) + + self.pg_enable_capture(self.interfaces) + self.pg_start() + + for i in self.interfaces: + out = self.pg_get_capture(i) + self.log("Verifying capture %u" % i) + self.verify_capture(i, out) + + +if __name__ == '__main__': + unittest.main(testRunner = VppTestRunner) diff --git a/test/test_l2bd.py b/test/test_l2bd.py new file mode 100644 index 00000000000..c2b73a4d1f9 --- /dev/null +++ b/test/test_l2bd.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python +## @file test_l2bd.py +# Module to provide L2 bridge domain test case. +# +# The module provides a set of tools for L2 bridge domain tests. + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import unittest +import random + +from framework import * +from scapy.all import * + + +## Subclass of the VppTestCase class. +# +# This subclass is a class for L2 bridge domain test cases. It provides methods +# to create interfaces, configure L2 bridge domain, create and verify packet +# streams. +class TestL2bd(VppTestCase): + """ L2BD Test Case """ + + ## Test variables + interf_nr = 3 # Number of interfaces + bd_id = 1 # Bridge domain ID + mac_entries = 100 # Number of MAC entries for bridge-domain to learn + dot1q_sub_id = 100 # SubID of dot1q sub-interface + dot1q_tag = 100 # VLAN tag for dot1q sub-interface + dot1ad_sub_id = 200 # SubID of dot1ad sub-interface + dot1ad_outer_tag = 200 # VLAN S-tag for dot1ad sub-interface + dot1ad_inner_tag = 300 # VLAN C-tag for dot1ad sub-interface + pkts_per_burst = 257 # Number of packets per burst + + ## Class method to start the test case. + # Overrides setUpClass method in VppTestCase class. + # Python try..except statement is used to ensure that the tear down of + # the class will be executed even if exception is raised. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + super(TestL2bd, cls).setUpClass() + + try: + ## Create interfaces and sub-interfaces + cls.create_interfaces_and_subinterfaces(TestL2bd.interf_nr) + + ## Create BD with MAC learning enabled and put interfaces and + # sub-interfaces to this BD + cls.api("bridge_domain_add_del bd_id %u learn 1" % TestL2bd.bd_id) + for i in cls.interfaces: + if isinstance(cls.INT_DETAILS[i], cls.Subint): + interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id) + else: + interface = "pg%u" % i + cls.api("sw_interface_set_l2_bridge %s bd_id %u" + % (interface, TestL2bd.bd_id)) + + ## Make the BD learn a number of MAC entries specified by the test + # variable <mac_entries>. + cls.create_mac_entries(TestL2bd.mac_entries) + cls.cli(0, "show l2fib") + + except Exception as e: + super(TestL2bd, cls).tearDownClass() + raise e + + ## Method to define tear down VPP actions of the test case. + # Overrides tearDown method in VppTestCase class. + # @param self The object pointer. + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show l2fib verbose") + self.cli(2, "show error") + self.cli(2, "show run") + self.cli(2, "show bridge-domain 1 detail") + + ## Class method to create VLAN sub-interface. + # Uses VPP API command to create VLAN sub-interface. + # @param cls The class pointer. + # @param pg_index Integer variable to store the index of the packet + # generator interface to create VLAN sub-interface on. + # @param vlan_id Integer variable to store required VLAN tag value. + @classmethod + def create_vlan_subif(cls, pg_index, vlan_id): + cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan_id)) + + ## Class method to create dot1ad sub-interface. + # Use VPP API command to create dot1ad sub-interface. + # @param cls The class pointer. + # @param pg_index Integer variable to store the index of the packet + # generator interface to create dot1ad sub-interface on. + # @param outer_vlan_id Integer variable to store required outer VLAN tag + # value (S-TAG). + # @param inner_vlan_id Integer variable to store required inner VLAN tag + # value (C-TAG). + @classmethod + def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, + inner_vlan_id): + cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id" + " %u dot1ad" % (pg_index, sub_id, outer_vlan_id, inner_vlan_id)) + + ## Base class for interface. + # To define object representation of the interface. + class Interface(object): + pass + + ## Sub-class of the interface class. + # To define object representation of the HW interface. + class HardInt(Interface): + pass + + ## Sub-class of the interface class. + # To define object representation of the SW interface. + class SoftInt(Interface): + pass + + ## Sub-class of the SW interface class. + # To represent the general sub-interface. + class Subint(SoftInt): + ## The constructor. + # @param sub_id Integer variable to store sub-interface ID. + def __init__(self, sub_id): + self.sub_id = sub_id + + ## Sub-class of the SW interface class. + # To represent dot1q sub-interface. + class Dot1QSubint(Subint): + ## The constructor. + # @param sub_id Integer variable to store sub-interface ID. + # @param vlan Integer variable (optional) to store VLAN tag value. Set + # to sub_id value when VLAN tag value not provided. + def __init__(self, sub_id, vlan=None): + if vlan is None: + vlan = sub_id + super(TestL2bd.Dot1QSubint, self).__init__(sub_id) + self.vlan = vlan + + ## Sub-class of the SW interface class. + # To represent dot1ad sub-interface. + class Dot1ADSubint(Subint): + ## The constructor. + # @param sub_id Integer variable to store sub-interface ID. + # @param outer_vlan Integer variable to store outer VLAN tag value. + # @param inner_vlan Integer variable to store inner VLAN tag value. + def __init__(self, sub_id, outer_vlan, inner_vlan): + super(TestL2bd.Dot1ADSubint, self).__init__(sub_id) + self.outer_vlan = outer_vlan + self.inner_vlan = inner_vlan + + ## Class method to create interfaces and sub-interfaces. + # Current implementation: create three interfaces, then create Dot1Q + # sub-interfaces for the second and the third interface with VLAN tags + # equal to their sub-interface IDs. Set sub-interfaces status to admin-up. + # @param cls The class pointer. + # @param int_nr Integer variable to store the number of interfaces to be + # created. + # TODO: Parametrize required numbers of dot1q and dot1ad to be created. + @classmethod + def create_interfaces_and_subinterfaces(cls, int_nr): + ## A class list variable to store interface indexes. + cls.interfaces = range(int_nr) + + # Create interfaces + cls.create_interfaces(cls.interfaces) + + # Make vpp_api_test see interfaces created using debug CLI (in function + # create_interfaces) + cls.api("sw_interface_dump") + + ## A class dictionary variable to store data about interfaces. + # First create an empty dictionary then store interface data there. + cls.INT_DETAILS = dict() + + # 1st interface is untagged - no sub-interface required + cls.INT_DETAILS[0] = cls.HardInt() + + # 2nd interface is dot1q tagged + cls.INT_DETAILS[1] = cls.Dot1QSubint(TestL2bd.dot1q_sub_id, + TestL2bd.dot1q_tag) + cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan) + + # 3rd interface is dot1ad tagged + # FIXME: Wrong packet format/wrong layer on output of interface 2 + #self.INT_DETAILS[2] = self.Dot1ADSubint(TestL2bd.dot1ad_sub_id, TestL2bd.dot1ad_outer_tag, TestL2bd.dot1ad_inner_tag) + #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan) + + # Use dot1q for now. + cls.INT_DETAILS[2] = cls.Dot1QSubint(TestL2bd.dot1ad_sub_id, + TestL2bd.dot1ad_outer_tag) + cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan) + + for i in cls.interfaces: + if isinstance(cls.INT_DETAILS[i], cls.Subint): + cls.api("sw_interface_set_flags pg%u.%u admin-up" + % (i, cls.INT_DETAILS[i].sub_id)) + ## @var interfaces + # List variable to store interface indexes. + ## @var INT_DETAILS + # Dictionary variable to store data about interfaces. + + ## Class method for bridge-domain to learn defined number of MAC addresses. + # Create required number of host MAC addresses and distribute them among + # interfaces. Create host IPv4 address for every host MAC address. Create + # L2 MAC packet stream with host MAC addresses per interface to let + # the bridge domain learn these MAC addresses. + # @param cls The class pointer. + # @param count Integer variable to store the number of MAC addresses to be + # created. + @classmethod + def create_mac_entries(cls, count): + n_int = len(cls.interfaces) + macs_per_if = count / n_int + for i in cls.interfaces: + start_nr = macs_per_if*i + end_nr = count if i == (n_int - 1) else macs_per_if*(i+1) + cls.MY_MACS[i] = [] + cls.MY_IP4S[i] = [] + packets = [] + for j in range(start_nr, end_nr): + cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j)) + cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j)) + packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i])) + packets.append(packet) + cls.pg_add_stream(i, packets) + # Based on the verbosity level set in the system print the log. + cls.log("Sending broadcast eth frames for MAC learning", 1) + cls.pg_start() + # Packet stream capturing is not started as we don't need to read + # the output. + ## @var n_int + # Integer variable to store the number of interfaces. + ## @var macs_per_if + # Integer variable to store the number of MAC addresses per interface. + ## @var start_nr + # Integer variable to store the starting number of the range used to + # generate MAC addresses for the interface. + ## @var end_nr + # Integer variable to store the ending number of the range used to + # generate MAC addresses for the interface. + ## @var MY_MACS + # Dictionary variable to store list of MAC addresses per interface. + ## @var MY_IP4S + # Dictionary variable to store list of IPv4 addresses per interface. + + ## Class method to add dot1q or dot1ad layer to the packet. + # Based on sub-interface data of the defined interface add dot1q or dot1ad + # Ethernet header layer to the packet. + # @param cls The class pointer. + # @param i Integer variable to store the index of the interface. + # @param packet Object variable to store the packet where to add dot1q or + # dot1ad layer. + # TODO: Move this class method to utils.py. + @classmethod + def add_dot1_layers(cls, i, packet): + assert(type(packet) is Ether) + payload = packet.payload + if isinstance(cls.INT_DETAILS[i], cls.Dot1QSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=cls.INT_DETAILS[i].vlan) / payload) + elif isinstance(cls.INT_DETAILS[i], cls.Dot1ADSubint): + packet.remove_payload() + packet.add_payload(Dot1Q(vlan=cls.INT_DETAILS[i].outer_vlan, + type=0x8100) / + Dot1Q(vlan=cls.INT_DETAILS[i].inner_vlan) / + payload) + packet.type = 0x88A8 + ## @var payload + # Object variable to store payload of the packet. + ## @var INT_DETAILS + # Dictionary variable to store data about interfaces. + ## @var Dot1QSubint + # Class variable representing dot1q sub-interfaces. + ## @var Dot1ADSubint + # Class variable representing dot1ad sub-interfaces. + + ## Method to remove dot1q or dot1ad layer from the packet. + # Based on sub-interface data of the defined interface remove dot1q or + # dot1ad layer from the packet. + # @param cls The class pointer. + # @param i Integer variable to store the index of the interface. + # @param packet Object variable to store the packet where to remove dot1q + # or dot1ad layer. + def remove_dot1_layers(self, i, packet): + self.assertEqual(type(packet), Ether) + payload = packet.payload + if isinstance(self.INT_DETAILS[i], self.Dot1QSubint): + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan) + payload = payload.payload + elif isinstance(self.INT_DETAILS[i], self.Dot1ADSubint): # TODO: change 88A8 type + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan) + payload = payload.payload + self.assertEqual(type(payload), Dot1Q) + self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan) + payload = payload.payload + packet.remove_payload() + packet.add_payload(payload) + ## @var payload + # Object variable to store payload of the packet. + ## @var INT_DETAILS + # Dictionary variable to store data about interfaces. + ## @var Dot1QSubint + # Class variable representing dot1q sub-interfaces. + ## @var Dot1ADSubint + # Class variable representing dot1ad sub-interfaces. + + ## Method to create packet stream for the packet generator interface. + # Create input packet stream for the given packet generator interface with + # packets of different length targeted for all other created packet + # generator interfaces. + # @param self The object pointer. + # @param pg_id Integer variable to store the index of the interface to + # create the input packet stream. + # @return pkts List variable to store created input stream of packets. + def create_stream(self, pg_id): + # TODO: use variables to create lists based on interface number + pg_targets = [None] * 3 + pg_targets[0] = [1, 2] + pg_targets[1] = [0, 2] + pg_targets[2] = [0, 1] + pkts = [] + for i in range(0, TestL2bd.pkts_per_burst): + target_pg_id = pg_targets[pg_id][i % 2] + target_host_id = random.randrange(len(self.MY_MACS[target_pg_id])) + source_host_id = random.randrange(len(self.MY_MACS[pg_id])) + pkt_info = self.create_packet_info(pg_id, target_pg_id) + payload = self.info_to_payload(pkt_info) + p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id], + src=self.MY_MACS[pg_id][source_host_id]) / + IP(src=self.MY_IP4S[pg_id][source_host_id], + dst=self.MY_IP4S[target_pg_id][target_host_id]) / + UDP(sport=1234, dport=1234) / + Raw(payload)) + pkt_info.data = p.copy() + self.add_dot1_layers(pg_id, p) + if not isinstance(self.INT_DETAILS[pg_id], self.Subint): + packet_sizes = [64, 512, 1518, 9018] + else: + packet_sizes = [64, 512, 1518+4, 9018+4] + size = packet_sizes[(i / 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + return pkts + ## @var pg_targets + # List variable to store list of indexes of target packet generator + # interfaces for every source packet generator interface. + ## @var target_pg_id + # Integer variable to store the index of the random target packet + # generator interfaces. + ## @var target_host_id + # Integer variable to store the index of the randomly chosen + # destination host MAC/IPv4 address. + ## @var source_host_id + # Integer variable to store the index of the randomly chosen source + # host MAC/IPv4 address. + ## @var pkt_info + # Object variable to store the information about the generated packet. + ## @var payload + # String variable to store the payload of the packet to be generated. + ## @var p + # Object variable to store the generated packet. + ## @var packet_sizes + # List variable to store required packet sizes. + ## @var size + # List variable to store required packet sizes. + + ## Method to verify packet stream received on the packet generator interface. + # Verify packet-by-packet the output stream captured on a given packet + # generator (pg) interface using following packet payload data - order of + # packet in the stream, index of the source and destination pg interface, + # src and dst host IPv4 addresses and src port and dst port values of UDP + # layer. + # @param self The object pointer. + # @param o Integer variable to store the index of the interface to + # verify the output packet stream. + # @param capture List variable to store the captured output packet stream. + def verify_capture(self, o, capture): + last_info = {} + for i in self.interfaces: + last_info[i] = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + payload_info = self.payload_to_info(str(packet[Raw])) + # Check VLAN tags and Ethernet header + # TODO: Rework to check VLAN tag(s) and do not remove them + self.remove_dot1_layers(payload_info.src, packet) + self.assertTrue(Dot1Q not in packet) + self.assertEqual(payload_info.dst, o) + self.log("Got packet on port %u: src=%u (id=%u)" + % (o, payload_info.src, payload_info.index), 2) + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, payload_info.dst, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(payload_info.index, next_info.index) + # Check standard fields + self.assertEqual(ip.src, next_info.data[IP].src) + self.assertEqual(ip.dst, next_info.data[IP].dst) + self.assertEqual(udp.sport, next_info.data[UDP].sport) + self.assertEqual(udp.dport, next_info.data[UDP].dport) + except: + self.log("Unexpected or invalid packet:") + packet.show() + raise + for i in self.interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i, o, last_info[i]) + self.assertTrue(remaining_packet is None, + "Port %u: Packet expected from source %u didn't" + " arrive" % (o, i)) + ## @var last_info + # Dictionary variable to store verified packets per packet generator + # interface. + ## @var ip + # Object variable to store the IP layer of the packet. + ## @var udp + # Object variable to store the UDP layer of the packet. + ## @var payload_info + # Object variable to store required information about the packet. + ## @var next_info + # Object variable to store information about next packet. + ## @var remaining_packet + # Object variable to store information about remaining packet. + + ## Method defining VPP L2 bridge domain test case. + # Contains execution steps of the test case. + # @param self The object pointer. + def test_l2bd(self): + """ L2BD MAC learning test + + 1.config + MAC learning enabled + learn 100 MAC enries + 3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of dot1ad + in the first version) + + 2.sending l2 eth pkts between 3 interface + 64B, 512B, 1518B, 9200B (ether_size) + burst of 257 pkts per interface + """ + + ## Create incoming packet streams for packet-generator interfaces + for i in self.interfaces: + pkts = self.create_stream(i) + self.pg_add_stream(i, pkts) + + ## Enable packet capture and start packet sending + self.pg_enable_capture(self.interfaces) + self.pg_start() + + ## Verify outgoing packet streams per packet-generator interface + for i in self.interfaces: + out = self.pg_get_capture(i) + self.log("Verifying capture %u" % i) + self.verify_capture(i, out) + ## @var pkts + # List variable to store created input stream of packets for the packet + # generator interface. + ## @var out + # List variable to store captured output stream of packets for + # the packet generator interface. + + +if __name__ == '__main__': + unittest.main(testRunner = VppTestRunner) diff --git a/test/test_l2xc.py b/test/test_l2xc.py new file mode 100644 index 00000000000..f5fc8743f8e --- /dev/null +++ b/test/test_l2xc.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python +## @file test_l2xc.py +# Module to provide L2 cross-connect test case. +# +# The module provides a set of tools for L2 cross-connect tests. + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import unittest +import random +from framework import VppTestCase, VppTestRunner +from scapy.layers.l2 import Ether, Raw +from scapy.layers.inet import IP, UDP + + +## Subclass of the VppTestCase class. +# +# This subclass is a class for L2 cross-connect test cases. It provides methods +# to create interfaces, configuring L2 cross-connects, creating and verifying +# packet streams. +class TestL2xc(VppTestCase): + """ L2XC Test Case """ + + # Test variables + interf_nr = 4 # Number of interfaces + hosts_nr = 10 # Number of hosts + pkts_per_burst = 257 # Number of packets per burst + + ## Class method to start the test case. + # Overrides setUpClass method in VppTestCase class. + # There is used try..except statement to ensure that the tear down of + # the class will be executed even if any exception is raised. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + super(TestL2xc, cls).setUpClass() + + try: + ## Create interfaces + cls.interfaces = range(TestL2xc.interf_nr) + cls.create_interfaces(cls.interfaces) + + ## Create bi-directional cross-connects between pg0 and pg1 + cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable") + cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable") + + ## Create bi-directional cross-connects between pg2 and pg3 + cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable") + cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable") + + cls.cli(0, "show l2patch") + + ## Create host MAC and IPv4 lists + cls.create_host_lists(TestL2xc.hosts_nr) + + except Exception as e: + cls.tearDownClass() + raise e + + ## Method to define tear down VPP actions of the test case. + # Overrides tearDown method in VppTestCase class. + # @param self The object pointer. + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show l2patch") + self.cli(2, "show error") + self.cli(2, "show run") + + ## Class method to create required number of MAC and IPv4 addresses. + # Create required number of host MAC addresses and distribute them among + # interfaces. Create host IPv4 address for every host MAC address too. + # @param cls The class pointer. + # @param count Integer variable to store the number of MAC addresses to be + # created. + @classmethod + def create_host_lists(cls, count): + for i in cls.interfaces: + cls.MY_MACS[i] = [] + cls.MY_IP4S[i] = [] + for j in range(0, count): + cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j)) + cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j)) + ## @var MY_MACS + # Dictionary variable to store list of MAC addresses per interface. + ## @var MY_IP4S + # Dictionary variable to store list of IPv4 addresses per interface. + + ## Method to create packet stream for the packet generator interface. + # Create input packet stream for the given packet generator interface with + # packets of different length targeted for all other created packet + # generator interfaces. + # @param self The object pointer. + # @param pg_id Integer variable to store the index of the interface to + # create the input packet stream. + # @return pkts List variable to store created input stream of packets. + def create_stream(self, pg_id): + # TODO: use variables to create lists based on interface number + pg_targets = [None] * 4 + pg_targets[0] = [1] + pg_targets[1] = [0] + pg_targets[2] = [3] + pg_targets[3] = [2] + pkts = [] + for i in range(0, TestL2xc.pkts_per_burst): + target_pg_id = pg_targets[pg_id][0] + target_host_id = random.randrange(len(self.MY_MACS[target_pg_id])) + source_host_id = random.randrange(len(self.MY_MACS[pg_id])) + pkt_info = self.create_packet_info(pg_id, target_pg_id) + payload = self.info_to_payload(pkt_info) + p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id], + src=self.MY_MACS[pg_id][source_host_id]) / + IP(src=self.MY_IP4S[pg_id][source_host_id], + dst=self.MY_IP4S[target_pg_id][target_host_id]) / + UDP(sport=1234, dport=1234) / + Raw(payload)) + pkt_info.data = p.copy() + packet_sizes = [64, 512, 1518, 9018] + size = packet_sizes[(i / 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + return pkts + ## @var pg_targets + # List variable to store list of indexes of target packet generator + # interfaces for every source packet generator interface. + ## @var target_pg_id + # Integer variable to store the index of the random target packet + # generator interfaces. + ## @var target_host_id + # Integer variable to store the index of the randomly chosen + # destination host MAC/IPv4 address. + ## @var source_host_id + # Integer variable to store the index of the randomly chosen source + # host MAC/IPv4 address. + ## @var pkt_info + # Object variable to store the information about the generated packet. + ## @var payload + # String variable to store the payload of the packet to be generated. + ## @var p + # Object variable to store the generated packet. + ## @var packet_sizes + # List variable to store required packet sizes. + ## @var size + # List variable to store required packet sizes. + + ## Method to verify packet stream received on the packet generator interface. + # Verify packet-by-packet the output stream captured on a given packet + # generator (pg) interface using following packet payload data - order of + # packet in the stream, index of the source and destination pg interface, + # src and dst host IPv4 addresses and src port and dst port values of UDP + # layer. + # @param self The object pointer. + # @param o Integer variable to store the index of the interface to + # verify the output packet stream. + # @param capture List variable to store the captured output packet stream. + def verify_capture(self, o, capture): + last_info = {} + for i in self.interfaces: + last_info[i] = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + payload_info = self.payload_to_info(str(packet[Raw])) + self.assertEqual(payload_info.dst, o) + self.log("Got packet on port %u: src=%u (id=%u)" + % (o, payload_info.src, payload_info.index), 2) + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, payload_info.dst, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(payload_info.index, next_info.index) + # Check standard fields + self.assertEqual(ip.src, next_info.data[IP].src) + self.assertEqual(ip.dst, next_info.data[IP].dst) + self.assertEqual(udp.sport, next_info.data[UDP].sport) + self.assertEqual(udp.dport, next_info.data[UDP].dport) + except: + self.log("Unexpected or invalid packet:") + packet.show() + raise + for i in self.interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i, o, last_info[i]) + self.assertTrue(remaining_packet is None, + "Port %u: Packet expected from source %u didn't" + " arrive" % (o, i)) + ## @var last_info + # Dictionary variable to store verified packets per packet generator + # interface. + ## @var ip + # Object variable to store the IP layer of the packet. + ## @var udp + # Object variable to store the UDP layer of the packet. + ## @var payload_info + # Object variable to store required information about the packet. + ## @var next_info + # Object variable to store information about next packet. + ## @var remaining_packet + # Object variable to store information about remaining packet. + + ## Method defining L2 cross-connect test case. + # Contains steps of the test case. + # @param self The object pointer. + def test_l2xc(self): + """ L2XC test + + Test scenario: + 1.config + 2 pairs of 2 interfaces, l2xconnected + + 2.sending l2 eth packets between 4 interfaces + 64B, 512B, 1518B, 9018B (ether_size) + burst of packets per interface + """ + + ## Create incoming packet streams for packet-generator interfaces + for i in self.interfaces: + pkts = self.create_stream(i) + self.pg_add_stream(i, pkts) + + ## Enable packet capturing and start packet sending + self.pg_enable_capture(self.interfaces) + self.pg_start() + + ## Verify outgoing packet streams per packet-generator interface + for i in self.interfaces: + out = self.pg_get_capture(i) + self.log("Verifying capture %u" % i) + self.verify_capture(i, out) + ## @var pkts + # List variable to store created input stream of packets for the packet + # generator interface. + ## @var out + # List variable to store captured output stream of packets for + # the packet generator interface. + + +if __name__ == '__main__': + unittest.main(testRunner = VppTestRunner) diff --git a/test/test_vxlan.py b/test/test_vxlan.py new file mode 100644 index 00000000000..1db34927dd8 --- /dev/null +++ b/test/test_vxlan.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +import unittest +from framework import VppTestCase, VppTestRunner +from util import Util +from template_bd import BridgeDomain + +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP +from scapy_handlers.vxlan import VXLAN + + +## TestVxlan is a subclass of BridgeDomain, Util, VppTestCase classes. +# +# TestVxlan class defines VXLAN test cases for VXLAN encapsulation, +# decapsulation and VXLAN tunnel termination in L2 bridge-domain. +class TestVxlan(BridgeDomain, Util, VppTestCase): + """ VXLAN Test Case """ + + ## Method to initialize all parent classes. + # + # Initialize BridgeDomain objects, set documentation string for inherited + # tests and initialize VppTestCase object which must be called after + # doc strings are set. + def __init__(self, *args): + BridgeDomain.__init__(self) + self.test_decap.__func__.__doc__ = ' VXLAN BD decapsulation ' + self.test_encap.__func__.__doc__ = ' VXLAN BD encapsulation ' + VppTestCase.__init__(self, *args) + + ## Method for VXLAN encapsulate function. + # + # Encapsulate the original payload frame by adding VXLAN header with its + # UDP, IP and Ethernet fields. + def encapsulate(self, pkt): + return (Ether(src=self.MY_MACS[0], dst=self.VPP_MACS[0]) / + IP(src=self.MY_IP4S[0], dst=self.VPP_IP4S[0]) / + UDP(sport=4789, dport=4789, chksum=0) / + VXLAN(vni=1) / + pkt) + + ## Method for VXLAN decapsulate function. + # + # Decapsulate the original payload frame by removing VXLAN header with + # its UDP, IP and Ethernet fields. + def decapsulate(self, pkt): + return pkt[VXLAN].payload + + ## Method for checking VXLAN encapsulation. + # + def check_encapsulation(self, pkt): + # TODO: add error messages + ## Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved + # by VPP using ARP. + self.assertEqual(pkt[Ether].src, self.VPP_MACS[0]) + self.assertEqual(pkt[Ether].dst, self.MY_MACS[0]) + ## Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP. + self.assertEqual(pkt[IP].src, self.VPP_IP4S[0]) + self.assertEqual(pkt[IP].dst, self.MY_IP4S[0]) + ## Verify UDP destination port is VXLAN 4789, source UDP port could be + # arbitrary. + self.assertEqual(pkt[UDP].dport, 4789) + # TODO: checksum check + ## Verify VNI, based on configuration it must be 1. + self.assertEqual(pkt[VXLAN].vni, 1) + + ## Class method to start the VXLAN test case. + # Overrides setUpClass method in VppTestCase class. + # Python try..except statement is used to ensure that the tear down of + # the class will be executed even if exception is raised. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + super(TestVxlan, cls).setUpClass() + try: + ## Create 2 pg interfaces. + cls.create_interfaces(range(2)) + ## Configure IPv4 addresses on VPP pg0. + cls.config_ip4([0]) + ## Resolve MAC address for VPP's IP address on pg0. + cls.resolve_arp([0]) + + ## Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1 + # into BD. + cls.api("vxlan_add_del_tunnel src %s dst %s vni 1" % + (cls.VPP_IP4S[0], cls.MY_IP4S[0])) + cls.api("sw_interface_set_l2_bridge vxlan_tunnel0 bd_id 1") + cls.api("sw_interface_set_l2_bridge pg1 bd_id 1") + except: + ## In case setUpClass fails run tear down. + cls.tearDownClass() + raise + + ## Method to define VPP actions before tear down of the test case. + # Overrides tearDown method in VppTestCase class. + # @param self The object pointer. + def tearDown(self): + super(TestVxlan, self).tearDown() + self.cli(2, "show bridge-domain 1 detail") + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) diff --git a/test/util.py b/test/util.py new file mode 100644 index 00000000000..c72a3965d9e --- /dev/null +++ b/test/util.py @@ -0,0 +1,139 @@ +## @package util +# Module with common functions that should be used by the test cases. +# +# The module provides a set of tools for setup the test environment + +from scapy.layers.l2 import Ether, ARP +from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr + + +## Util class +# +# Test cases that want to use methods defined in Util class should +# inherit this class. +# +# class Example(Util, VppTestCase): +# pass +class Util(object): + + ## Class method to send ARP Request for each VPP IPv4 address in + # order to determine VPP interface MAC address to IPv4 bindings. + # + # Resolved MAC address is saved to the VPP_MACS dictionary with interface + # index as a key. ARP Request is sent from MAC in MY_MACS dictionary with + # interface index as a key. + # @param cls The class pointer. + # @param args List variable to store indices of VPP interfaces. + @classmethod + def resolve_arp(cls, args): + for i in args: + ip = cls.VPP_IP4S[i] + cls.log("Sending ARP request for %s on port %u" % (ip, i)) + arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) / + ARP(op=ARP.who_has, pdst=ip, + psrc=cls.MY_IP4S[i], hwsrc=cls.MY_MACS[i])) + cls.pg_add_stream(i, arp_req) + cls.pg_enable_capture([i]) + + cls.cli(2, "trace add pg-input 1") + cls.pg_start() + arp_reply = cls.pg_get_capture(i)[0] + if arp_reply[ARP].op == ARP.is_at: + cls.log("VPP pg%u MAC address is %s " % (i, arp_reply[ARP].hwsrc)) + cls.VPP_MACS[i] = arp_reply[ARP].hwsrc + else: + cls.log("No ARP received on port %u" % i) + cls.cli(2, "show trace") + ## @var ip + # <TODO add description> + ## @var arp_req + # <TODO add description> + ## @var arp_reply + # <TODO add description> + ## @var VPP_MACS + # <TODO add description> + + ## Class method to send ND request for each VPP IPv6 address in + # order to determine VPP MAC address to IPv6 bindings. + # + # Resolved MAC address is saved to the VPP_MACS dictionary with interface + # index as a key. ND Request is sent from MAC in MY_MACS dictionary with + # interface index as a key. + # @param cls The class pointer. + # @param args List variable to store indices of VPP interfaces. + @classmethod + def resolve_icmpv6_nd(cls, args): + for i in args: + ip = cls.VPP_IP6S[i] + cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i)) + nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) / + IPv6(src=cls.MY_IP6S[i], dst=ip) / + ICMPv6ND_NS(tgt=ip) / + ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i])) + cls.pg_add_stream(i, nd_req) + cls.pg_enable_capture([i]) + + cls.cli(2, "trace add pg-input 1") + cls.pg_start() + nd_reply = cls.pg_get_capture(i)[0] + icmpv6_na = nd_reply['ICMPv6 Neighbor Discovery - Neighbor Advertisement'] + dst_ll_addr = icmpv6_na['ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address'] + cls.VPP_MACS[i] = dst_ll_addr.lladdr + ## @var ip + # <TODO add description> + ## @var nd_req + # <TODO add description> + ## @var nd_reply + # <TODO add description> + ## @var icmpv6_na + # <TODO add description> + ## @var dst_ll_addr + # <TODO add description> + ## @var VPP_MACS + # <TODO add description> + + ## Class method to configure IPv4 addresses on VPP interfaces. + # + # Set dictionary variables MY_IP4S and VPP_IP4S to IPv4 addresses + # calculated using interface VPP interface index as a parameter. + # /24 IPv4 prefix is used, with VPP interface address host part set + # to .1 and MY address set to .2. + # Used IPv4 prefix scheme: 172.16.{VPP-interface-index}.0/24. + # @param cls The class pointer. + # @param args List variable to store indices of VPP interfaces. + @classmethod + def config_ip4(cls, args): + for i in args: + cls.MY_IP4S[i] = "172.16.%u.2" % i + cls.VPP_IP4S[i] = "172.16.%u.1" % i + cls.api("sw_interface_add_del_address pg%u %s/24" % (i, cls.VPP_IP4S[i])) + cls.log("My IPv4 address is %s" % (cls.MY_IP4S[i])) + ## @var MY_IP4S + # Dictionary variable to store host IPv4 addresses connected to packet + # generator interfaces. + ## @var VPP_IP4S + # Dictionary variable to store VPP IPv4 addresses of the packet + # generator interfaces. + + ## Class method to configure IPv6 addresses on VPP interfaces. + # + # Set dictionary variables MY_IP6S and VPP_IP6S to IPv6 addresses + # calculated using interface VPP interface index as a parameter. + # /64 IPv6 prefix is used, with VPP interface address host part set + # to ::1 and MY address set to ::2. + # Used IPv6 prefix scheme: fd10:{VPP-interface-index}::0/64. + # @param cls The class pointer. + # @param args List variable to store indices of VPP interfaces. + @classmethod + def config_ip6(cls, args): + for i in args: + cls.MY_IP6S[i] = "fd10:%u::2" % i + cls.VPP_IP6S[i] = "fd10:%u::1" % i + cls.api("sw_interface_add_del_address pg%u %s/64" % (i, cls.VPP_IP6S[i])) + cls.log("My IPv6 address is %s" % (cls.MY_IP6S[i])) + ## @var MY_IP6S + # Dictionary variable to store host IPv6 addresses connected to packet + # generator interfaces. + ## @var VPP_IP6S + # Dictionary variable to store VPP IPv6 addresses of the packet + # generator interfaces. |