diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/asf/asfframework.py | 27 | ||||
-rw-r--r-- | test/test_ipsec_api.py | 63 | ||||
-rw-r--r-- | test/test_ipsec_spd_fp_input.py | 46 | ||||
-rw-r--r-- | test/test_sflow.py | 212 | ||||
-rw-r--r-- | test/test_snort.py | 54 |
5 files changed, 349 insertions, 53 deletions
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py index 841a923dc79..7670a0753d1 100644 --- a/test/asf/asfframework.py +++ b/test/asf/asfframework.py @@ -155,17 +155,6 @@ def _is_platform_aarch64(): is_platform_aarch64 = _is_platform_aarch64() -def _is_distro_ubuntu2404(): - with open("/etc/os-release") as f: - for line in f.readlines(): - if "noble" in line: - return True - return False - - -is_distro_ubuntu2404 = _is_distro_ubuntu2404() - - def _is_distro_debian11(): with open("/etc/os-release") as f: for line in f.readlines(): @@ -226,8 +215,6 @@ class TestCaseTag(Enum): FIXME_DEBIAN11 = 4 # marks suites broken on debug vpp image FIXME_VPP_DEBUG = 5 - # marks suites broken on Ubuntu-24.04 - FIXME_UBUNTU2404 = 6 def create_tag_decorator(e): @@ -246,7 +233,6 @@ tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS) tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN) tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11) tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG) -tag_fixme_ubuntu2404 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2404) class DummyVpp: @@ -309,12 +295,6 @@ class VppAsfTestCase(CPUInterface, unittest.TestCase): cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls) @classmethod - def skip_fixme_ubuntu2404(cls): - """if @tag_fixme_ubuntu2404 & is Ubuntu24.04 - mark for skip""" - if cls.has_tag(TestCaseTag.FIXME_UBUNTU2404) and is_distro_ubuntu2404 == True: - cls = unittest.skip("Skipping @tag_fixme_ubuntu2404 tests")(cls) - - @classmethod def instance(cls): """Return the instance of this testcase""" return cls.test_instance @@ -1366,13 +1346,6 @@ class VppTestResult(unittest.TestResult): test_title = colorize(f"FIXME with ASAN: {test_title}", RED) test.skip_fixme_asan() - if ( - test.has_tag(TestCaseTag.FIXME_UBUNTU2404) - and is_distro_ubuntu2404 == True - ): - test_title = colorize(f"FIXME with Ubuntu 24.04: {test_title}", RED) - test.skip_fixme_ubuntu2404() - if hasattr(test, "vpp_worker_count"): if test.vpp_worker_count == 0: test_title += " [main thread only]" diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py index 7208d2887b5..158cb6b9df5 100644 --- a/test/test_ipsec_api.py +++ b/test/test_ipsec_api.py @@ -4,6 +4,7 @@ from framework import VppTestCase from asfframework import VppTestRunner from template_ipsec import IPsecIPv4Params from vpp_papi import VppEnum +from ipaddress import IPv4Address from vpp_ipsec import VppIpsecSA @@ -120,20 +121,15 @@ class IpsecApiTestCase(VppTestCase): ) self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0) - def __check_sa_binding(self, sa_id, thread_index): - found_sa = False + def __sa_dump(self, sa): sa_dumps = self.vapi.ipsec_sa_v5_dump() for dump in sa_dumps: - if dump.entry.sad_id == sa_id: - self.assertEqual(dump.thread_index, thread_index) - found_sa = True - break + if dump.entry.sad_id == sa.id: + return dump + self.fail("SA not found in VPP") - if not found_sa: - self.fail("SA not found in VPP") - - def test_sa_worker_bind(self): - """Bind an SA to a worker""" + def test_sa_basic(self): + """basic SA API tests""" sa = VppIpsecSA( self, self.ipv4_params.scapy_tun_sa_id, @@ -143,14 +139,51 @@ class IpsecApiTestCase(VppTestCase): self.ipv4_params.crypt_algo_vpp_id, self.ipv4_params.crypt_key, VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP, + flags=VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND, ) sa.add_vpp_config() - self.__check_sa_binding(sa.id, 0xFFFF) - + # check general SA dump + dump = self.__sa_dump(sa) + self.assertEqual(dump.entry.sad_id, sa.id) + self.assertEqual(dump.entry.spi, sa.spi) + self.assertEqual(dump.entry.protocol, sa.proto) + self.assertEqual(dump.entry.crypto_algorithm, sa.crypto_alg) + self.assertEqual( + dump.entry.crypto_key.data[: dump.entry.crypto_key.length], sa.crypto_key + ) + self.assertEqual(dump.entry.integrity_algorithm, sa.integ_alg) + self.assertEqual( + dump.entry.integrity_key.data[: dump.entry.integrity_key.length], + sa.integ_key, + ) + self.assertEqual(dump.entry.flags, sa.flags) + self.assertEqual(dump.entry.tunnel.instance, 0) + self.assertEqual(dump.entry.tunnel.src, IPv4Address("0.0.0.0")) + self.assertEqual(dump.entry.tunnel.dst, IPv4Address("0.0.0.0")) + self.assertEqual(dump.entry.tunnel.sw_if_index, 0) + self.assertEqual(dump.entry.tunnel.table_id, sa.table_id) + self.assertEqual(dump.entry.tunnel.encap_decap_flags, sa.tun_flags) + self.assertEqual(dump.entry.tunnel.mode, 0) + self.assertEqual(dump.entry.tunnel.flags, 0) + self.assertEqual(dump.entry.tunnel.dscp, 0) + self.assertEqual(dump.entry.tunnel.hop_limit, 0) + self.assertEqual(dump.entry.salt, 0) + self.assertEqual(dump.entry.udp_src_port, 0) + self.assertEqual(dump.entry.udp_dst_port, 0) + self.assertEqual(dump.entry.anti_replay_window_size, 64) + self.assertEqual(dump.sw_if_index, 0xFFFFFFFF) + self.assertEqual(dump.seq_outbound, 0) + self.assertEqual(dump.last_seq_inbound, 0) + self.assertEqual(dump.replay_window, 0xFFFFFFFFFFFFFFFF) + self.assertEqual(dump.thread_index, 0xFFFF) + self.assertEqual(dump.stat_index, 0) + + # check SA binding API self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1) - - self.__check_sa_binding(sa.id, 2) + dump = self.__sa_dump(sa) + self.assertEqual(dump.thread_index, 2) sa.remove_vpp_config() diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py index eb04df49244..1953bbe5eaf 100644 --- a/test/test_ipsec_spd_fp_input.py +++ b/test/test_ipsec_spd_fp_input.py @@ -9,6 +9,7 @@ from template_ipsec import IPSecIPv6Fwd from test_ipsec_esp import TemplateIpsecEsp from template_ipsec import SpdFastPathTemplate from config import config +import pdb def debug_signal_handler(signal, frame): @@ -888,5 +889,50 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect): self.assertEqual(p.tra_sa_in.get_err("lost"), 0) +class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect): + """IPSec/IPv6 inbound: Policy mode test case with fast path""" + + # In this test sa_in defines a tunnel. Matching should be + # done based on the sa tunnel header. + + @classmethod + def setUpClass(cls): + super(IPSec6SpdTestCaseTunProtect, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(IPSec6SpdTestCaseTunProtect, cls).tearDownClass() + + def setUp(self): + super(IPSec6SpdTestCaseTunProtect, self).setUp() + + def tearDown(self): + super(IPSec6SpdTestCaseTunProtect, self).tearDown() + + def test_ipsec6_spd_inbound_tun_protect(self): + pkt_count = 5 + payload_size = 64 + p = self.params[socket.AF_INET6] + send_pkts = self.gen_encrypt_pkts6( + p, + p.scapy_tun_sa, + self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip6, + count=pkt_count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1) + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + pkts = p.tun_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, + pkt_count, + "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts), + ) + self.assertEqual(p.tun_sa_in.get_err("lost"), 0) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) diff --git a/test/test_sflow.py b/test/test_sflow.py new file mode 100644 index 00000000000..d37ed84f252 --- /dev/null +++ b/test/test_sflow.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +import unittest +from framework import VppTestCase +from asfframework import VppTestRunner +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.layers.inet import IP, UDP +from random import randint +import re # for finding counters in "sh errors" output + + +class SFlowTestCase(VppTestCase): + """sFlow test case""" + + @classmethod + def setUpClass(self): + super(SFlowTestCase, self).setUpClass() + + @classmethod + def teadDownClass(cls): + super(SFlowTestCase, cls).tearDownClass() + + def setUp(self): + self.create_pg_interfaces(range(2)) # create pg0 and pg1 + for i in self.pg_interfaces: + i.admin_up() # put the interface up + i.config_ip4() # configure IPv4 address on the interface + i.resolve_arp() # resolve ARP, so that we know VPP MAC + + def tearDown(self): + for i in self.pg_interfaces: + i.admin_down() + i.unconfig() + i.set_table_ip4(0) + i.set_table_ip6(0) + + def is_hw_interface_in_dump(self, dump, hw_if_index): + for i in dump: + if i.hw_if_index == hw_if_index: + return True + else: + return False + + def enable_sflow_via_api(self): + ## TEST: Enable one interface + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all + ret = self.vapi.sflow_interface_dump() + self.assertTrue(self.is_hw_interface_in_dump(ret, 1)) + + ## TEST: Disable one interface + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all after enable + disable + ret = self.vapi.sflow_interface_dump() + self.assertEqual(len(ret), 0) + + ## TEST: Enable both interfaces + ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True) + self.assertEqual(ret.retval, 0) + ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True) + self.assertEqual(ret.retval, 0) + + ## TEST: interface dump all + ret = self.vapi.sflow_interface_dump() + self.assertTrue(self.is_hw_interface_in_dump(ret, 1)) + self.assertTrue(self.is_hw_interface_in_dump(ret, 2)) + + ## TEST: the default sampling rate + ret = self.vapi.sflow_sampling_rate_get() + self.assert_equal(ret.sampling_N, 10000) + + ## TEST: sflow_sampling_rate_set() + self.vapi.sflow_sampling_rate_set(sampling_N=1) + ret = self.vapi.sflow_sampling_rate_get() + self.assert_equal(ret.sampling_N, 1) + + ## TEST: the default polling interval + ret = self.vapi.sflow_polling_interval_get() + self.assert_equal(ret.polling_S, 20) + + ## TEST: sflow_polling_interval_set() + self.vapi.sflow_polling_interval_set(polling_S=10) + ret = self.vapi.sflow_polling_interval_get() + self.assert_equal(ret.polling_S, 10) + + ## TEST: the default header bytes + ret = self.vapi.sflow_header_bytes_get() + self.assert_equal(ret.header_B, 128) + + ## TEST: sflow_header_bytes_set() + self.vapi.sflow_header_bytes_set(header_B=96) + ret = self.vapi.sflow_header_bytes_get() + self.assert_equal(ret.header_B, 96) + + def create_stream(self, src_if, dst_if, count): + packets = [] + for i in range(count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=randint(49152, 65535), dport=5678) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + # return the created packet list + return packets + + def verify_capture(self, src_if, dst_if, capture): + packet_info = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet[Raw]) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except: + self.logger.error("Unexpected or invalid packet:", packet) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + def get_sflow_counter(self, counter): + counters = self.vapi.cli("sh errors").split("\n") + for i in range(1, len(counters) - 1): + results = counters[i].split() + if results[1] == "sflow": + if re.search(counter, counters[i]) is not None: + return int(results[0]) + return None + + def verify_sflow(self, count): + ctr_processed = "sflow packets processed" + ctr_sampled = "sflow packets sampled" + ctr_dropped = "sflow packets dropped" + ctr_ps_sent = "sflow PSAMPLE sent" + ctr_ps_fail = "sflow PSAMPLE send failed" + processed = self.get_sflow_counter(ctr_processed) + sampled = self.get_sflow_counter(ctr_sampled) + dropped = self.get_sflow_counter(ctr_dropped) + ps_sent = self.get_sflow_counter(ctr_ps_sent) + ps_fail = self.get_sflow_counter(ctr_ps_fail) + self.assert_equal(processed, count, ctr_processed) + self.assert_equal(sampled, count, ctr_sampled) + self.assert_equal(dropped, None, ctr_dropped) + # TODO decide how to warn if PSAMPLE is not working + # It requires a prior "sudo modprobe psample", but + # that should probably be done at system boot time + # or maybe in a systemctl startup script, so we + # should only warn here. + self.logger.info(ctr_ps_sent + "=" + str(ps_sent)) + self.logger.info(ctr_ps_fail + "=" + str(ps_fail)) + + def test_basic(self): + self.enable_sflow_via_api() + count = 7 + # create the packet stream + packets = self.create_stream(self.pg0, self.pg1, count) + # add the stream to the source interface + self.pg0.add_stream(packets) + # enable capture on both interfaces + self.pg0.enable_capture() + self.pg1.enable_capture() + # start the packet generator + self.pg_start() + # get capture - the proper count of packets was saved by + # create_packet_info() based on dst_if parameter + capture = self.pg1.get_capture() + # assert nothing captured on pg0 (always do this last, so that + # some time has already passed since pg_start()) + self.pg0.assert_nothing_captured() + # verify capture + self.verify_capture(self.pg0, self.pg1, capture) + # verify sflow counters + self.verify_sflow(count) diff --git a/test/test_snort.py b/test/test_snort.py index 19401cb7b85..5335091dba7 100644 --- a/test/test_snort.py +++ b/test/test_snort.py @@ -12,10 +12,10 @@ class TestSnort(VppTestCase): def setUpClass(cls): super(TestSnort, cls).setUpClass() try: - cls.create_pg_interfaces(range(2)) + cls.create_pg_interfaces(range(4)) for i in cls.pg_interfaces: i.config_ip4().resolve_arp() - i.admin_up() + i.admin_down() except Exception: cls.tearDownClass() raise @@ -24,26 +24,28 @@ class TestSnort(VppTestCase): def tearDownClass(cls): for i in cls.pg_interfaces: i.unconfig_ip4() - i.admin_down() super(TestSnort, cls).tearDownClass() def test_snort_cli(self): # TODO: add a test with packets # { cli command : part of the expected reply } - print("TEST SNORT CLI") commands_replies = { "snort create-instance name snortTest queue-size 16 on-disconnect drop": "", "snort create-instance name snortTest2 queue-size 16 on-disconnect pass": "", "snort attach instance snortTest interface pg0 output": "", "snort attach instance snortTest2 interface pg1 input": "", + "snort attach all-instances interface pg2 inout": "", + "snort attach instance snortTest instance snortTest2 interface pg3 inout": "", "show snort instances": "snortTest", "show snort interfaces": "pg0", "show snort clients": "number of clients", "show snort mode": "input mode: interrupt", "snort mode polling": "", "snort mode interrupt": "", - "snort detach interface pg0": "", - "snort detach interface pg1": "", + "snort detach instance snortTest interface pg0": "", + "snort detach instance snortTest2 interface pg1": "", + "snort detach all-instances interface pg2": "", + "snort detach instance snortTest instance snortTest2 interface pg3": "", "snort delete instance snortTest": "", } @@ -64,7 +66,7 @@ class TestSnortVapi(VppTestCase): for i in cls.pg_interfaces: i.config_ip4() i.resolve_arp() - i.admin_up() + i.admin_down() except Exception: cls.tearDownClass() raise @@ -73,7 +75,6 @@ class TestSnortVapi(VppTestCase): def tearDownClass(cls): for i in cls.pg_interfaces: i.unconfig_ip4() - i.admin_down() super(TestSnortVapi, cls).tearDownClass() def test_snort_01_modes_set_interrupt(self): @@ -109,14 +110,20 @@ class TestSnortVapi(VppTestCase): reply = self.vapi.snort_interface_attach( instance_index=0, sw_if_index=1, snort_dir=1 ) + reply = self.vapi.snort_interface_attach( + instance_index=0, sw_if_index=2, snort_dir=2 + ) + # verify attaching with an invalid direction is rejected try: reply = self.vapi.snort_interface_attach( - instance_index=1, sw_if_index=1, snort_dir=1 + instance_index=1, sw_if_index=2, snort_dir=4 ) except: pass else: self.assertNotEqual(reply.retval, 0) + reply = self.vapi.cli("show snort interfaces") + self.assertNotIn("snortTest1", reply) reply = self.vapi.snort_interface_attach( instance_index=1, sw_if_index=2, snort_dir=3 @@ -124,6 +131,31 @@ class TestSnortVapi(VppTestCase): reply = self.vapi.cli("show snort interfaces") self.assertIn("snortTest0", reply) self.assertIn("snortTest1", reply) + self.assertIn("input", reply) + self.assertIn("inout", reply) + self.assertIn("output", reply) + + # verify attaching a previously attached interface is rejected + try: + reply = self.vapi.snort_interface_attach( + instance_index=1, sw_if_index=2, snort_dir=2 + ) + except: + pass + else: + self.assertNotEqual(reply.retval, 0) + + # verify attaching an invalid sw_if_index is rejected + try: + reply = self.vapi.snort_interface_attach( + instance_index=1, sw_if_index=3, snort_dir=2 + ) + except: + pass + else: + self.assertNotEqual(reply.retval, 0) + reply = self.vapi.cli("show snort interfaces") + self.assertIn("snortTest1", reply) def test_snort_05_delete_instance(self): """Instances can be deleted""" @@ -131,14 +163,14 @@ class TestSnortVapi(VppTestCase): reply = self.vapi.cli("show snort interfaces") self.assertNotIn("snortTest0", reply) self.assertIn("snortTest1", reply) - reply = self.vapi.cli("show snort interfaces") self.assertNotIn("pg0", reply) self.assertIn("pg1", reply) def test_snort_06_detach_if(self): """Interfaces can be detached""" + # verify detaching an invalid sw_if_index is rejected try: - reply = self.vapi.snort_interface_detach(sw_if_index=1) + reply = self.vapi.snort_interface_detach(sw_if_index=3) except: pass else: |