from asfframework import VppTestRunner from framework import VppTestCase import unittest from config import config from scapy.layers.l2 import Ether from scapy.packet import Raw from scapy.layers.inet import IP, UDP from random import randint from util import ppp @unittest.skipIf("mdata" in config.excluded_plugins, "Exclude mdata plugin tests") class TestMdataCli(VppTestCase): """mdata plugin test""" @classmethod def setUpClass(cls): super(TestMdataCli, cls).setUpClass() try: cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.config_ip4() i.resolve_arp() i.admin_up() except Exception: cls.tearDownClass() raise @classmethod def tearDownClass(cls): for i in cls.pg_interfaces: i.unconfig_ip4() i.admin_down() super(TestMdataCli, cls).tearDownClass() # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test def create_stream(self, src_if, dst_if, count): packets = [] for i in range(count): info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = ( Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / UDP(sport=randint(49152, 65535), dport=5678) / Raw(payload) ) info.data = p.copy() packets.append(p) return packets def verify_capture(self, src_if, dst_if, capture): packet_info = None for packet in capture: try: ip = packet[IP] udp = packet[UDP] self.logger.debug(f"Converting payload to info for {packet[Raw]}") # convert the payload to packet info object payload_info = self.payload_to_info(packet[Raw]) # make sure the indexes match self.assert_equal( payload_info.src, src_if.sw_if_index, "source sw_if_index" ) self.assert_equal( payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" ) packet_info = self.get_next_packet_info_for_interface2( src_if.sw_if_index, dst_if.sw_if_index, packet_info ) # make sure we didn't run out of saved packets self.assertIsNotNone(packet_info) self.assert_equal( payload_info.index, packet_info.index, "packet info index" ) saved_packet = packet_info.data # fetch the saved packet # assert the values match self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") # ... more assertions here self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") except Exception: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise remaining_packet = self.get_next_packet_info_for_interface2( src_if.sw_if_index, dst_if.sw_if_index, packet_info ) self.assertIsNone( remaining_packet, "Interface %s: Packet expected from interface " "%s didn't arrive" % (dst_if.name, src_if.name), ) def test_mdata_cli(self): """turn on mdata tracking, send packets, verify, check CLI output""" self.vapi.cli("buffer metadata tracking on") packets = self.create_stream(self.pg0, self.pg1, 5) self.pg0.add_stream(packets) self.pg0.enable_capture() self.pg1.enable_capture() self.pg_start() capture = self.pg1.get_capture() self.pg0.assert_nothing_captured() self.verify_capture(self.pg0, self.pg1, capture) result = self.vapi.cli("show buffer metadata") expected = [ "ip4-input", "ip4-rewrite", "ip4-lookup", "ethernet-input", "pg1-tx", "pg1-output", ] for entry in expected: self.assertIn(entry, result) self.vapi.cli("buffer metadata tracking off") if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)