summaryrefslogtreecommitdiffstats
path: root/test/test_l2xc.py
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2016-10-11 11:47:09 +0200
committerMatej Klotton <mklotton@cisco.com>2016-10-26 17:42:19 +0200
commitf62ae1288a776527c7f7ba3951531fbd07bc63da (patch)
treeca877f7e3bb7e4f6309cf711fca9de1fcd4865a2 /test/test_l2xc.py
parentf530a5526a1f501462ff4247a5bb38e80c13678d (diff)
refactor test framework
Change-Id: I31da3b1857b6399f9899276a2d99cdd19436296c Signed-off-by: Klement Sekera <ksekera@cisco.com> Signed-off-by: Matej Klotton <mklotton@cisco.com> Signed-off-by: Jan Gelety <jgelety@cisco.com> Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
Diffstat (limited to 'test/test_l2xc.py')
-rw-r--r--test/test_l2xc.py276
1 files changed, 106 insertions, 170 deletions
diff --git a/test/test_l2xc.py b/test/test_l2xc.py
index f5fc8743f8e..d448a04d13c 100644
--- a/test/test_l2xc.py
+++ b/test/test_l2xc.py
@@ -1,210 +1,152 @@
#!/usr/bin/env python
-## @file test_l2xc.py
-# Module to provide L2 cross-connect test case.
-#
-# The module provides a set of tools for L2 cross-connect tests.
-
-import logging
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
import unittest
import random
-from framework import VppTestCase, VppTestRunner
-from scapy.layers.l2 import Ether, Raw
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
+from logging import *
+
+from framework import VppTestCase, VppTestRunner
+from util import TestHost
-## Subclass of the VppTestCase class.
-#
-# This subclass is a class for L2 cross-connect test cases. It provides methods
-# to create interfaces, configuring L2 cross-connects, creating and verifying
-# packet streams.
class TestL2xc(VppTestCase):
""" L2XC Test Case """
# Test variables
- interf_nr = 4 # Number of interfaces
hosts_nr = 10 # Number of hosts
pkts_per_burst = 257 # Number of packets per burst
- ## Class method to start the test case.
- # Overrides setUpClass method in VppTestCase class.
- # There is used try..except statement to ensure that the tear down of
- # the class will be executed even if any exception is raised.
- # @param cls The class pointer.
@classmethod
def setUpClass(cls):
super(TestL2xc, cls).setUpClass()
- try:
- ## Create interfaces
- cls.interfaces = range(TestL2xc.interf_nr)
- cls.create_interfaces(cls.interfaces)
+ def setUp(self):
+ super(TestL2xc, self).setUp()
- ## Create bi-directional cross-connects between pg0 and pg1
- cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable")
- cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable")
+ # create 4 pg interfaces
+ self.create_pg_interfaces(range(4))
- ## Create bi-directional cross-connects between pg2 and pg3
- cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable")
- cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable")
+ # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
+ self.flows = dict()
+ self.flows[self.pg0] = [self.pg1]
+ self.flows[self.pg1] = [self.pg0]
+ self.flows[self.pg2] = [self.pg3]
+ self.flows[self.pg3] = [self.pg2]
- cls.cli(0, "show l2patch")
+ # packet sizes
+ self.pg_if_packet_sizes = [64, 512, 1518, 9018]
- ## Create host MAC and IPv4 lists
- cls.create_host_lists(TestL2xc.hosts_nr)
+ self.interfaces = list(self.pg_interfaces)
- except Exception as e:
- cls.tearDownClass()
- raise e
+ # Create bi-directional cross-connects between pg0 and pg1
+ self.vapi.sw_interface_set_l2_xconnect(
+ self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
+ self.vapi.sw_interface_set_l2_xconnect(
+ self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
+
+ # Create bi-directional cross-connects between pg2 and pg3
+ self.vapi.sw_interface_set_l2_xconnect(
+ self.pg2.sw_if_index, self.pg3.sw_if_index, enable=1)
+ self.vapi.sw_interface_set_l2_xconnect(
+ self.pg3.sw_if_index, self.pg2.sw_if_index, enable=1)
+
+ info(self.vapi.cli("show l2patch"))
+
+ # mapping between packet-generator index and lists of test hosts
+ self.hosts_by_pg_idx = dict()
+
+ # Create host MAC and IPv4 lists
+ # self.MY_MACS = dict()
+ # self.MY_IP4S = dict()
+ self.create_host_lists(TestL2xc.hosts_nr)
+
+ # setup all interfaces
+ for i in self.interfaces:
+ i.admin_up()
- ## Method to define tear down VPP actions of the test case.
- # Overrides tearDown method in VppTestCase class.
- # @param self The object pointer.
def tearDown(self):
- self.cli(2, "show int")
- self.cli(2, "show trace")
- self.cli(2, "show hardware")
- self.cli(2, "show l2patch")
- self.cli(2, "show error")
- self.cli(2, "show run")
-
- ## Class method to create required number of MAC and IPv4 addresses.
- # Create required number of host MAC addresses and distribute them among
- # interfaces. Create host IPv4 address for every host MAC address too.
- # @param cls The class pointer.
- # @param count Integer variable to store the number of MAC addresses to be
- # created.
- @classmethod
- def create_host_lists(cls, count):
- for i in cls.interfaces:
- cls.MY_MACS[i] = []
- cls.MY_IP4S[i] = []
+ super(TestL2xc, self).tearDown()
+ if not self.vpp_dead:
+ info(self.vapi.cli("show l2patch"))
+
+ def create_host_lists(self, count):
+ """ Method to create required number of MAC and IPv4 addresses.
+ Create required number of host MAC addresses and distribute them among
+ interfaces. Create host IPv4 address for every host MAC address too.
+
+ :param count: Number of hosts to create MAC and IPv4 addresses for.
+ Type: int
+ """
+ for pg_if in self.pg_interfaces:
+ # self.MY_MACS[i.sw_if_index] = []
+ # self.MY_IP4S[i.sw_if_index] = []
+ self.hosts_by_pg_idx[pg_if.sw_if_index] = []
+ hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
for j in range(0, count):
- cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j))
- cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j))
- ## @var MY_MACS
- # Dictionary variable to store list of MAC addresses per interface.
- ## @var MY_IP4S
- # Dictionary variable to store list of IPv4 addresses per interface.
-
- ## Method to create packet stream for the packet generator interface.
- # Create input packet stream for the given packet generator interface with
- # packets of different length targeted for all other created packet
- # generator interfaces.
- # @param self The object pointer.
- # @param pg_id Integer variable to store the index of the interface to
- # create the input packet stream.
- # @return pkts List variable to store created input stream of packets.
- def create_stream(self, pg_id):
- # TODO: use variables to create lists based on interface number
- pg_targets = [None] * 4
- pg_targets[0] = [1]
- pg_targets[1] = [0]
- pg_targets[2] = [3]
- pg_targets[3] = [2]
+ host = TestHost(
+ "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+ "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
+ hosts.append(host)
+
+ def create_stream(self, src_if, packet_sizes):
pkts = []
for i in range(0, TestL2xc.pkts_per_burst):
- target_pg_id = pg_targets[pg_id][0]
- target_host_id = random.randrange(len(self.MY_MACS[target_pg_id]))
- source_host_id = random.randrange(len(self.MY_MACS[pg_id]))
- pkt_info = self.create_packet_info(pg_id, target_pg_id)
+ dst_if = self.flows[src_if][0]
+ dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
+ src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
+ pkt_info = self.create_packet_info(
+ src_if.sw_if_index, dst_if.sw_if_index)
payload = self.info_to_payload(pkt_info)
- p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id],
- src=self.MY_MACS[pg_id][source_host_id]) /
- IP(src=self.MY_IP4S[pg_id][source_host_id],
- dst=self.MY_IP4S[target_pg_id][target_host_id]) /
+ p = (Ether(dst=dst_host.mac, src=src_host.mac) /
+ IP(src=src_host.ip4, dst=dst_host.ip4) /
UDP(sport=1234, dport=1234) /
Raw(payload))
pkt_info.data = p.copy()
- packet_sizes = [64, 512, 1518, 9018]
size = packet_sizes[(i / 2) % len(packet_sizes)]
self.extend_packet(p, size)
pkts.append(p)
return pkts
- ## @var pg_targets
- # List variable to store list of indexes of target packet generator
- # interfaces for every source packet generator interface.
- ## @var target_pg_id
- # Integer variable to store the index of the random target packet
- # generator interfaces.
- ## @var target_host_id
- # Integer variable to store the index of the randomly chosen
- # destination host MAC/IPv4 address.
- ## @var source_host_id
- # Integer variable to store the index of the randomly chosen source
- # host MAC/IPv4 address.
- ## @var pkt_info
- # Object variable to store the information about the generated packet.
- ## @var payload
- # String variable to store the payload of the packet to be generated.
- ## @var p
- # Object variable to store the generated packet.
- ## @var packet_sizes
- # List variable to store required packet sizes.
- ## @var size
- # List variable to store required packet sizes.
-
- ## Method to verify packet stream received on the packet generator interface.
- # Verify packet-by-packet the output stream captured on a given packet
- # generator (pg) interface using following packet payload data - order of
- # packet in the stream, index of the source and destination pg interface,
- # src and dst host IPv4 addresses and src port and dst port values of UDP
- # layer.
- # @param self The object pointer.
- # @param o Integer variable to store the index of the interface to
- # verify the output packet stream.
- # @param capture List variable to store the captured output packet stream.
- def verify_capture(self, o, capture):
- last_info = {}
+
+ def verify_capture(self, pg_if, capture):
+ last_info = dict()
for i in self.interfaces:
- last_info[i] = None
+ last_info[i.sw_if_index] = None
+ dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
try:
ip = packet[IP]
udp = packet[UDP]
payload_info = self.payload_to_info(str(packet[Raw]))
- self.assertEqual(payload_info.dst, o)
- self.log("Got packet on port %u: src=%u (id=%u)"
- % (o, payload_info.src, payload_info.index), 2)
+ packet_index = payload_info.index
+ self.assertEqual(payload_info.dst, dst_sw_if_index)
+ debug("Got packet on port %s: src=%u (id=%u)" %
+ (pg_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, payload_info.dst,
+ payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
- self.assertEqual(payload_info.index, next_info.index)
+ self.assertEqual(packet_index, next_info.index)
+ saved_packet = next_info.data
# Check standard fields
- self.assertEqual(ip.src, next_info.data[IP].src)
- self.assertEqual(ip.dst, next_info.data[IP].dst)
- self.assertEqual(udp.sport, next_info.data[UDP].sport)
- self.assertEqual(udp.dport, next_info.data[UDP].dport)
+ self.assertEqual(ip.src, saved_packet[IP].src)
+ self.assertEqual(ip.dst, saved_packet[IP].dst)
+ self.assertEqual(udp.sport, saved_packet[UDP].sport)
+ self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.log("Unexpected or invalid packet:")
+ error("Unexpected or invalid packet:")
packet.show()
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i, o, last_info[i])
+ i, dst_sw_if_index, last_info[i.sw_if_index])
self.assertTrue(remaining_packet is None,
"Port %u: Packet expected from source %u didn't"
- " arrive" % (o, i))
- ## @var last_info
- # Dictionary variable to store verified packets per packet generator
- # interface.
- ## @var ip
- # Object variable to store the IP layer of the packet.
- ## @var udp
- # Object variable to store the UDP layer of the packet.
- ## @var payload_info
- # Object variable to store required information about the packet.
- ## @var next_info
- # Object variable to store information about next packet.
- ## @var remaining_packet
- # Object variable to store information about remaining packet.
-
- ## Method defining L2 cross-connect test case.
- # Contains steps of the test case.
- # @param self The object pointer.
+ " arrive" % (dst_sw_if_index, i.sw_if_index))
+
def test_l2xc(self):
""" L2XC test
@@ -217,27 +159,21 @@ class TestL2xc(VppTestCase):
burst of packets per interface
"""
- ## Create incoming packet streams for packet-generator interfaces
+ # Create incoming packet streams for packet-generator interfaces
for i in self.interfaces:
- pkts = self.create_stream(i)
- self.pg_add_stream(i, pkts)
+ pkts = self.create_stream(i, self.pg_if_packet_sizes)
+ i.add_stream(pkts)
- ## Enable packet capturing and start packet sending
- self.pg_enable_capture(self.interfaces)
+ # Enable packet capturing and start packet sending
+ self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- ## Verify outgoing packet streams per packet-generator interface
- for i in self.interfaces:
- out = self.pg_get_capture(i)
- self.log("Verifying capture %u" % i)
- self.verify_capture(i, out)
- ## @var pkts
- # List variable to store created input stream of packets for the packet
- # generator interface.
- ## @var out
- # List variable to store captured output stream of packets for
- # the packet generator interface.
+ # Verify outgoing packet streams per packet-generator interface
+ for i in self.pg_interfaces:
+ capture = i.get_capture()
+ info("Verifying capture on interface %s" % i.name)
+ self.verify_capture(i, capture)
if __name__ == '__main__':
- unittest.main(testRunner = VppTestRunner)
+ unittest.main(testRunner=VppTestRunner)