from framework import VppTestCase from asfframework import VppTestRunner from config import config import unittest import re from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from scapy.packet import Raw from random import randint from util import ppp def create_stream(self, src_if, dst_if, count): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = ( Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / UDP(sport=randint(49152, 65535), dport=5678) / Raw(payload) ) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def verify_capture(self, src_if, dst_if, capture, reply): packet_info = None for packet in capture: try: ip = packet[IP] udp = packet[UDP] # convert the payload to packet info object payload_info = self.payload_to_info(packet[Raw]) # make sure the indexes match self.assert_equal( payload_info.src, src_if.sw_if_index, "source sw_if_index" ) self.assert_equal( payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" ) packet_info = self.get_next_packet_info_for_interface2( src_if.sw_if_index, dst_if.sw_if_index, packet_info ) # make sure we didn't run out of saved packets self.assertIsNotNone(packet_info) self.assert_equal( payload_info.index, packet_info.index, "packet info index" ) saved_packet = packet_info.data # fetch the saved packet # assert the values match self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") # ... more assertions here self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") except Exception: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise remaining_packet = self.get_next_packet_info_for_interface2( src_if.sw_if_index, dst_if.sw_if_index, packet_info ) self.assertIsNone( remaining_packet, "Interface %s: Packet expected from interface " "%s didn't arrive" % (dst_if.name, src_if.name), ) # find timestamps and get actual delay pattern = r"\d{2}:\d{2}:\d{2}:\d{6}" timestamps = re.findall(pattern, reply) actual_delay = int(timestamps[2][9:]) - int(timestamps[0][9:]) self.assertTrue( actual_delay >= 100000, f"Delay is lower than expected: {actual_delay} < 100000" ) @unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") class TestNsimCli(VppTestCase): """NSIM plugin tests [CLI]""" @classmethod def setUpClass(cls): super(TestNsimCli, cls).setUpClass() try: cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.config_ip4() i.resolve_arp() i.admin_up() except Exception: cls.tearDownClass() raise @classmethod def tearDownClass(cls): cls.vapi.cli("nsim cross-connect enable-disable pg0 pg1 disable") cls.vapi.cli("nsim output-feature enable-disable pg0 disable") for i in cls.pg_interfaces: i.unconfig_ip4() i.admin_down() super(TestNsimCli, cls).tearDownClass() def test_nsim_delay(self): """Add 100ms delay""" packets = create_stream(self, self.pg0, self.pg1, 5) self.pg0.add_stream(packets) self.pg0.enable_capture() self.pg1.enable_capture() self.vapi.cli( "set nsim delay 100.0 ms bandwidth 1 gbit packet-size 128 drop-fraction 0.0" ) self.vapi.cli("nsim cross-connect enable-disable pg0 pg1") self.vapi.cli("nsim output-feature enable-disable pg0") self.pg_start() capture = self.pg1.get_capture() self.pg0.assert_nothing_captured() reply = self.vapi.cli("show trace") verify_capture(self, self.pg0, self.pg1, capture, reply) self.assertIn("nsim", reply) reply = self.vapi.cli("show nsim") self.assertIn("delay: 100.0 ms", reply) def test_nsim_drop(self): """Drop all packets""" packets = create_stream(self, self.pg0, self.pg1, 5) self.pg0.add_stream(packets) self.vapi.cli("clear trace") # test fails if running test-debug and no delay is set ("invalid delay 0.00") self.vapi.cli( "set nsim delay 1 us bandwidth 1 gbit packet-size 128 drop-fraction 1.0 packets-per-drop 0" ) self.pg_start() self.pg1.assert_nothing_captured() reply = self.vapi.cli("show nsim") self.assertIn("drop fraction: 1.0", reply) reply = self.vapi.cli("show trace") self.assertIn("sw_if_index -1", reply) @unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") class TestNsimApi(VppTestCase): """NSIM plugin tests [API]""" @classmethod def setUpClass(cls): super(TestNsimApi, cls).setUpClass() try: cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.config_ip4() i.resolve_arp() i.admin_up() except Exception: cls.tearDownClass() raise @classmethod def tearDownClass(cls): cls.vapi.nsim_cross_connect_enable_disable( enable_disable=False, sw_if_index0=1, sw_if_index1=2 ) cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1) for i in cls.pg_interfaces: i.unconfig_ip4() i.admin_down() super(TestNsimApi, cls).tearDownClass() def test_nsim_delay(self): """Add 100ms delay""" packets = create_stream(self, self.pg0, self.pg1, 5) self.pg0.add_stream(packets) self.pg0.enable_capture() self.pg1.enable_capture() # "show nsim" shows 99.9ms if delay is exactly 100000 self.vapi.nsim_configure2( delay_in_usec=100001, average_packet_size=128, bandwidth_in_bits_per_second=100000000000, packets_per_drop=0, packets_per_reorder=0, ) self.vapi.nsim_cross_connect_enable_disable( enable_disable=True, sw_if_index0=1, sw_if_index1=2 ) self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1) self.pg_start() capture = self.pg1.get_capture() reply = self.vapi.cli("show trace") verify_capture(self, self.pg0, self.pg1, capture, reply) self.assertIn("nsim", reply) reply = self.vapi.cli("show nsim") self.assertIn("delay: 100.0 ms", reply) # has to be separated, otherwise we get "VPP API client: read failed" # when configuring NSIM (nsim_configure2) and then VPP crashes on teardown @unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") class TestNsimApi2(VppTestCase): """NSIM plugin tests [API]""" @classmethod def setUpClass(cls): super(TestNsimApi2, cls).setUpClass() try: cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.config_ip4() i.resolve_arp() i.admin_up() except Exception: cls.tearDownClass() raise @classmethod def tearDownClass(cls): cls.vapi.nsim_cross_connect_enable_disable( enable_disable=False, sw_if_index0=1, sw_if_index1=2 ) cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1) for i in cls.pg_interfaces: i.unconfig_ip4() i.admin_down() super(TestNsimApi2, cls).tearDownClass() def test_nsim_drop(self): """Drop all packets""" packets = create_stream(self, self.pg0, self.pg1, 5) self.pg0.add_stream(packets) self.pg0.enable_capture() self.pg1.enable_capture() self.vapi.cli("clear trace") self.vapi.nsim_configure2( delay_in_usec=10, average_packet_size=128, bandwidth_in_bits_per_second=100000000, packets_per_drop=1, packets_per_reorder=0, ) self.vapi.nsim_cross_connect_enable_disable( enable_disable=True, sw_if_index0=1, sw_if_index1=2 ) self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1) self.pg_start() self.pg1.assert_nothing_captured() reply = self.vapi.cli("show nsim") self.assertIn("drop fraction: 1.0", reply) reply = self.vapi.cli("show trace") self.assertIn("sw_if_index -1", reply) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)