diff options
Diffstat (limited to 'test/test_l2xc.py')
-rw-r--r-- | test/test_l2xc.py | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/test/test_l2xc.py b/test/test_l2xc.py new file mode 100644 index 00000000000..f5fc8743f8e --- /dev/null +++ b/test/test_l2xc.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python +## @file test_l2xc.py +# Module to provide L2 cross-connect test case. +# +# The module provides a set of tools for L2 cross-connect tests. + +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) + +import unittest +import random +from framework import VppTestCase, VppTestRunner +from scapy.layers.l2 import Ether, Raw +from scapy.layers.inet import IP, UDP + + +## Subclass of the VppTestCase class. +# +# This subclass is a class for L2 cross-connect test cases. It provides methods +# to create interfaces, configuring L2 cross-connects, creating and verifying +# packet streams. +class TestL2xc(VppTestCase): + """ L2XC Test Case """ + + # Test variables + interf_nr = 4 # Number of interfaces + hosts_nr = 10 # Number of hosts + pkts_per_burst = 257 # Number of packets per burst + + ## Class method to start the test case. + # Overrides setUpClass method in VppTestCase class. + # There is used try..except statement to ensure that the tear down of + # the class will be executed even if any exception is raised. + # @param cls The class pointer. + @classmethod + def setUpClass(cls): + super(TestL2xc, cls).setUpClass() + + try: + ## Create interfaces + cls.interfaces = range(TestL2xc.interf_nr) + cls.create_interfaces(cls.interfaces) + + ## Create bi-directional cross-connects between pg0 and pg1 + cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable") + cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable") + + ## Create bi-directional cross-connects between pg2 and pg3 + cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable") + cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable") + + cls.cli(0, "show l2patch") + + ## Create host MAC and IPv4 lists + cls.create_host_lists(TestL2xc.hosts_nr) + + except Exception as e: + cls.tearDownClass() + raise e + + ## Method to define tear down VPP actions of the test case. + # Overrides tearDown method in VppTestCase class. + # @param self The object pointer. + def tearDown(self): + self.cli(2, "show int") + self.cli(2, "show trace") + self.cli(2, "show hardware") + self.cli(2, "show l2patch") + self.cli(2, "show error") + self.cli(2, "show run") + + ## Class method to create required number of MAC and IPv4 addresses. + # Create required number of host MAC addresses and distribute them among + # interfaces. Create host IPv4 address for every host MAC address too. + # @param cls The class pointer. + # @param count Integer variable to store the number of MAC addresses to be + # created. + @classmethod + def create_host_lists(cls, count): + for i in cls.interfaces: + cls.MY_MACS[i] = [] + cls.MY_IP4S[i] = [] + for j in range(0, count): + cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j)) + cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j)) + ## @var MY_MACS + # Dictionary variable to store list of MAC addresses per interface. + ## @var MY_IP4S + # Dictionary variable to store list of IPv4 addresses per interface. + + ## Method to create packet stream for the packet generator interface. + # Create input packet stream for the given packet generator interface with + # packets of different length targeted for all other created packet + # generator interfaces. + # @param self The object pointer. + # @param pg_id Integer variable to store the index of the interface to + # create the input packet stream. + # @return pkts List variable to store created input stream of packets. + def create_stream(self, pg_id): + # TODO: use variables to create lists based on interface number + pg_targets = [None] * 4 + pg_targets[0] = [1] + pg_targets[1] = [0] + pg_targets[2] = [3] + pg_targets[3] = [2] + pkts = [] + for i in range(0, TestL2xc.pkts_per_burst): + target_pg_id = pg_targets[pg_id][0] + target_host_id = random.randrange(len(self.MY_MACS[target_pg_id])) + source_host_id = random.randrange(len(self.MY_MACS[pg_id])) + pkt_info = self.create_packet_info(pg_id, target_pg_id) + payload = self.info_to_payload(pkt_info) + p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id], + src=self.MY_MACS[pg_id][source_host_id]) / + IP(src=self.MY_IP4S[pg_id][source_host_id], + dst=self.MY_IP4S[target_pg_id][target_host_id]) / + UDP(sport=1234, dport=1234) / + Raw(payload)) + pkt_info.data = p.copy() + packet_sizes = [64, 512, 1518, 9018] + size = packet_sizes[(i / 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + return pkts + ## @var pg_targets + # List variable to store list of indexes of target packet generator + # interfaces for every source packet generator interface. + ## @var target_pg_id + # Integer variable to store the index of the random target packet + # generator interfaces. + ## @var target_host_id + # Integer variable to store the index of the randomly chosen + # destination host MAC/IPv4 address. + ## @var source_host_id + # Integer variable to store the index of the randomly chosen source + # host MAC/IPv4 address. + ## @var pkt_info + # Object variable to store the information about the generated packet. + ## @var payload + # String variable to store the payload of the packet to be generated. + ## @var p + # Object variable to store the generated packet. + ## @var packet_sizes + # List variable to store required packet sizes. + ## @var size + # List variable to store required packet sizes. + + ## Method to verify packet stream received on the packet generator interface. + # Verify packet-by-packet the output stream captured on a given packet + # generator (pg) interface using following packet payload data - order of + # packet in the stream, index of the source and destination pg interface, + # src and dst host IPv4 addresses and src port and dst port values of UDP + # layer. + # @param self The object pointer. + # @param o Integer variable to store the index of the interface to + # verify the output packet stream. + # @param capture List variable to store the captured output packet stream. + def verify_capture(self, o, capture): + last_info = {} + for i in self.interfaces: + last_info[i] = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + payload_info = self.payload_to_info(str(packet[Raw])) + self.assertEqual(payload_info.dst, o) + self.log("Got packet on port %u: src=%u (id=%u)" + % (o, payload_info.src, payload_info.index), 2) + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, payload_info.dst, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(payload_info.index, next_info.index) + # Check standard fields + self.assertEqual(ip.src, next_info.data[IP].src) + self.assertEqual(ip.dst, next_info.data[IP].dst) + self.assertEqual(udp.sport, next_info.data[UDP].sport) + self.assertEqual(udp.dport, next_info.data[UDP].dport) + except: + self.log("Unexpected or invalid packet:") + packet.show() + raise + for i in self.interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i, o, last_info[i]) + self.assertTrue(remaining_packet is None, + "Port %u: Packet expected from source %u didn't" + " arrive" % (o, i)) + ## @var last_info + # Dictionary variable to store verified packets per packet generator + # interface. + ## @var ip + # Object variable to store the IP layer of the packet. + ## @var udp + # Object variable to store the UDP layer of the packet. + ## @var payload_info + # Object variable to store required information about the packet. + ## @var next_info + # Object variable to store information about next packet. + ## @var remaining_packet + # Object variable to store information about remaining packet. + + ## Method defining L2 cross-connect test case. + # Contains steps of the test case. + # @param self The object pointer. + def test_l2xc(self): + """ L2XC test + + Test scenario: + 1.config + 2 pairs of 2 interfaces, l2xconnected + + 2.sending l2 eth packets between 4 interfaces + 64B, 512B, 1518B, 9018B (ether_size) + burst of packets per interface + """ + + ## Create incoming packet streams for packet-generator interfaces + for i in self.interfaces: + pkts = self.create_stream(i) + self.pg_add_stream(i, pkts) + + ## Enable packet capturing and start packet sending + self.pg_enable_capture(self.interfaces) + self.pg_start() + + ## Verify outgoing packet streams per packet-generator interface + for i in self.interfaces: + out = self.pg_get_capture(i) + self.log("Verifying capture %u" % i) + self.verify_capture(i, out) + ## @var pkts + # List variable to store created input stream of packets for the packet + # generator interface. + ## @var out + # List variable to store captured output stream of packets for + # the packet generator interface. + + +if __name__ == '__main__': + unittest.main(testRunner = VppTestRunner) |