diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_mdata.py | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/test/test_mdata.py b/test/test_mdata.py new file mode 100644 index 00000000000..8f7b3d2a837 --- /dev/null +++ b/test/test_mdata.py @@ -0,0 +1,123 @@ +from framework import VppTestCase, VppTestRunner +import unittest +from config import config +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.layers.inet import IP, UDP +from random import randint +from util import ppp + + +@unittest.skipIf("mdata" in config.excluded_plugins, "Exclude mdata plugin tests") +class TestMdataCli(VppTestCase): + """mdata plugin test""" + + @classmethod + def setUpClass(cls): + super(TestMdataCli, cls).setUpClass() + try: + cls.create_pg_interfaces(range(2)) + for i in cls.pg_interfaces: + i.config_ip4() + i.resolve_arp() + i.admin_up() + except Exception: + cls.tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + for i in cls.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + super(TestMdataCli, cls).tearDownClass() + + # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test + def create_stream(self, src_if, dst_if, count): + packets = [] + for i in range(count): + info = self.create_packet_info(src_if, dst_if) + payload = self.info_to_payload(info) + + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=randint(1000, 2000), dport=5678) + / Raw(payload) + ) + + info.data = p.copy() + packets.append(p) + + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet[Raw]) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except Exception: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def test_mdata_cli(self): + """turn on mdata tracking, send packets, verify, check CLI output""" + self.vapi.cli("buffer metadata tracking on") + + packets = self.create_stream(self.pg0, self.pg1, 5) + self.pg0.add_stream(packets) + self.pg0.enable_capture() + self.pg1.enable_capture() + self.pg_start() + + capture = self.pg1.get_capture() + self.pg0.assert_nothing_captured() + self.verify_capture(self.pg0, self.pg1, capture) + + result = self.vapi.cli("show buffer metadata") + expected = [ + "ip4-input", + "ip4-rewrite", + "ip4-lookup", + "ethernet-input", + "pg1-tx", + "pg1-output", + ] + for entry in expected: + self.assertIn(entry, result) + self.vapi.cli("buffer metadata tracking off") + + +if __name__ == "__main__": + unittest.main(testRunner=VppTestRunner) |