aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/asf/asfframework.py27
-rw-r--r--test/test_ipsec_api.py63
-rw-r--r--test/test_ipsec_spd_fp_input.py46
-rw-r--r--test/test_sflow.py212
-rw-r--r--test/test_snort.py54
5 files changed, 349 insertions, 53 deletions
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py
index 841a923dc79..7670a0753d1 100644
--- a/test/asf/asfframework.py
+++ b/test/asf/asfframework.py
@@ -155,17 +155,6 @@ def _is_platform_aarch64():
is_platform_aarch64 = _is_platform_aarch64()
-def _is_distro_ubuntu2404():
- with open("/etc/os-release") as f:
- for line in f.readlines():
- if "noble" in line:
- return True
- return False
-
-
-is_distro_ubuntu2404 = _is_distro_ubuntu2404()
-
-
def _is_distro_debian11():
with open("/etc/os-release") as f:
for line in f.readlines():
@@ -226,8 +215,6 @@ class TestCaseTag(Enum):
FIXME_DEBIAN11 = 4
# marks suites broken on debug vpp image
FIXME_VPP_DEBUG = 5
- # marks suites broken on Ubuntu-24.04
- FIXME_UBUNTU2404 = 6
def create_tag_decorator(e):
@@ -246,7 +233,6 @@ tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
-tag_fixme_ubuntu2404 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2404)
class DummyVpp:
@@ -309,12 +295,6 @@ class VppAsfTestCase(CPUInterface, unittest.TestCase):
cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
@classmethod
- def skip_fixme_ubuntu2404(cls):
- """if @tag_fixme_ubuntu2404 & is Ubuntu24.04 - mark for skip"""
- if cls.has_tag(TestCaseTag.FIXME_UBUNTU2404) and is_distro_ubuntu2404 == True:
- cls = unittest.skip("Skipping @tag_fixme_ubuntu2404 tests")(cls)
-
- @classmethod
def instance(cls):
"""Return the instance of this testcase"""
return cls.test_instance
@@ -1366,13 +1346,6 @@ class VppTestResult(unittest.TestResult):
test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
test.skip_fixme_asan()
- if (
- test.has_tag(TestCaseTag.FIXME_UBUNTU2404)
- and is_distro_ubuntu2404 == True
- ):
- test_title = colorize(f"FIXME with Ubuntu 24.04: {test_title}", RED)
- test.skip_fixme_ubuntu2404()
-
if hasattr(test, "vpp_worker_count"):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"
diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py
index 7208d2887b5..158cb6b9df5 100644
--- a/test/test_ipsec_api.py
+++ b/test/test_ipsec_api.py
@@ -4,6 +4,7 @@ from framework import VppTestCase
from asfframework import VppTestRunner
from template_ipsec import IPsecIPv4Params
from vpp_papi import VppEnum
+from ipaddress import IPv4Address
from vpp_ipsec import VppIpsecSA
@@ -120,20 +121,15 @@ class IpsecApiTestCase(VppTestCase):
)
self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0)
- def __check_sa_binding(self, sa_id, thread_index):
- found_sa = False
+ def __sa_dump(self, sa):
sa_dumps = self.vapi.ipsec_sa_v5_dump()
for dump in sa_dumps:
- if dump.entry.sad_id == sa_id:
- self.assertEqual(dump.thread_index, thread_index)
- found_sa = True
- break
+ if dump.entry.sad_id == sa.id:
+ return dump
+ self.fail("SA not found in VPP")
- if not found_sa:
- self.fail("SA not found in VPP")
-
- def test_sa_worker_bind(self):
- """Bind an SA to a worker"""
+ def test_sa_basic(self):
+ """basic SA API tests"""
sa = VppIpsecSA(
self,
self.ipv4_params.scapy_tun_sa_id,
@@ -143,14 +139,51 @@ class IpsecApiTestCase(VppTestCase):
self.ipv4_params.crypt_algo_vpp_id,
self.ipv4_params.crypt_key,
VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
+ flags=VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
)
sa.add_vpp_config()
- self.__check_sa_binding(sa.id, 0xFFFF)
-
+ # check general SA dump
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.entry.sad_id, sa.id)
+ self.assertEqual(dump.entry.spi, sa.spi)
+ self.assertEqual(dump.entry.protocol, sa.proto)
+ self.assertEqual(dump.entry.crypto_algorithm, sa.crypto_alg)
+ self.assertEqual(
+ dump.entry.crypto_key.data[: dump.entry.crypto_key.length], sa.crypto_key
+ )
+ self.assertEqual(dump.entry.integrity_algorithm, sa.integ_alg)
+ self.assertEqual(
+ dump.entry.integrity_key.data[: dump.entry.integrity_key.length],
+ sa.integ_key,
+ )
+ self.assertEqual(dump.entry.flags, sa.flags)
+ self.assertEqual(dump.entry.tunnel.instance, 0)
+ self.assertEqual(dump.entry.tunnel.src, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.dst, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.sw_if_index, 0)
+ self.assertEqual(dump.entry.tunnel.table_id, sa.table_id)
+ self.assertEqual(dump.entry.tunnel.encap_decap_flags, sa.tun_flags)
+ self.assertEqual(dump.entry.tunnel.mode, 0)
+ self.assertEqual(dump.entry.tunnel.flags, 0)
+ self.assertEqual(dump.entry.tunnel.dscp, 0)
+ self.assertEqual(dump.entry.tunnel.hop_limit, 0)
+ self.assertEqual(dump.entry.salt, 0)
+ self.assertEqual(dump.entry.udp_src_port, 0)
+ self.assertEqual(dump.entry.udp_dst_port, 0)
+ self.assertEqual(dump.entry.anti_replay_window_size, 64)
+ self.assertEqual(dump.sw_if_index, 0xFFFFFFFF)
+ self.assertEqual(dump.seq_outbound, 0)
+ self.assertEqual(dump.last_seq_inbound, 0)
+ self.assertEqual(dump.replay_window, 0xFFFFFFFFFFFFFFFF)
+ self.assertEqual(dump.thread_index, 0xFFFF)
+ self.assertEqual(dump.stat_index, 0)
+
+ # check SA binding API
self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1)
-
- self.__check_sa_binding(sa.id, 2)
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.thread_index, 2)
sa.remove_vpp_config()
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index eb04df49244..1953bbe5eaf 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -9,6 +9,7 @@ from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
from template_ipsec import SpdFastPathTemplate
from config import config
+import pdb
def debug_signal_handler(signal, frame):
@@ -888,5 +889,50 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect):
+ """IPSec/IPv6 inbound: Policy mode test case with fast path"""
+
+ # In this test sa_in defines a tunnel. Matching should be
+ # done based on the sa tunnel header.
+
+ @classmethod
+ def setUpClass(cls):
+ super(IPSec6SpdTestCaseTunProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(IPSec6SpdTestCaseTunProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(IPSec6SpdTestCaseTunProtect, self).setUp()
+
+ def tearDown(self):
+ super(IPSec6SpdTestCaseTunProtect, self).tearDown()
+
+ def test_ipsec6_spd_inbound_tun_protect(self):
+ pkt_count = 5
+ payload_size = 64
+ p = self.params[socket.AF_INET6]
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=pkt_count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ pkts = p.tun_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ self.assertEqual(p.tun_sa_in.get_err("lost"), 0)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_sflow.py b/test/test_sflow.py
new file mode 100644
index 00000000000..d37ed84f252
--- /dev/null
+++ b/test/test_sflow.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+
+import unittest
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP
+from random import randint
+import re # for finding counters in "sh errors" output
+
+
+class SFlowTestCase(VppTestCase):
+ """sFlow test case"""
+
+ @classmethod
+ def setUpClass(self):
+ super(SFlowTestCase, self).setUpClass()
+
+ @classmethod
+ def teadDownClass(cls):
+ super(SFlowTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ self.create_pg_interfaces(range(2)) # create pg0 and pg1
+ for i in self.pg_interfaces:
+ i.admin_up() # put the interface up
+ i.config_ip4() # configure IPv4 address on the interface
+ i.resolve_arp() # resolve ARP, so that we know VPP MAC
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig()
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
+
+ def is_hw_interface_in_dump(self, dump, hw_if_index):
+ for i in dump:
+ if i.hw_if_index == hw_if_index:
+ return True
+ else:
+ return False
+
+ def enable_sflow_via_api(self):
+ ## TEST: Enable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+
+ ## TEST: Disable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all after enable + disable
+ ret = self.vapi.sflow_interface_dump()
+ self.assertEqual(len(ret), 0)
+
+ ## TEST: Enable both interfaces
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+ ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 2))
+
+ ## TEST: the default sampling rate
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 10000)
+
+ ## TEST: sflow_sampling_rate_set()
+ self.vapi.sflow_sampling_rate_set(sampling_N=1)
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 1)
+
+ ## TEST: the default polling interval
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 20)
+
+ ## TEST: sflow_polling_interval_set()
+ self.vapi.sflow_polling_interval_set(polling_S=10)
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 10)
+
+ ## TEST: the default header bytes
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 128)
+
+ ## TEST: sflow_header_bytes_set()
+ self.vapi.sflow_header_bytes_set(header_B=96)
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 96)
+
+ def create_stream(self, src_if, dst_if, count):
+ packets = []
+ for i in range(count):
+ # create packet info stored in the test case instance
+ info = self.create_packet_info(src_if, dst_if)
+ # convert the info into packet payload
+ payload = self.info_to_payload(info)
+ # create the packet itself
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=randint(49152, 65535), dport=5678)
+ / Raw(payload)
+ )
+ # store a copy of the packet in the packet info
+ info.data = p.copy()
+ # append the packet to the list
+ packets.append(p)
+ # return the created packet list
+ return packets
+
+ def verify_capture(self, src_if, dst_if, capture):
+ packet_info = None
+ for packet in capture:
+ try:
+ ip = packet[IP]
+ udp = packet[UDP]
+ # convert the payload to packet info object
+ payload_info = self.payload_to_info(packet[Raw])
+ # make sure the indexes match
+ self.assert_equal(
+ payload_info.src, src_if.sw_if_index, "source sw_if_index"
+ )
+ self.assert_equal(
+ payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+ )
+ packet_info = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ # make sure we didn't run out of saved packets
+ self.assertIsNotNone(packet_info)
+ self.assert_equal(
+ payload_info.index, packet_info.index, "packet info index"
+ )
+ saved_packet = packet_info.data # fetch the saved packet
+ # assert the values match
+ self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
+ self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+ except:
+ self.logger.error("Unexpected or invalid packet:", packet)
+ raise
+ remaining_packet = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ self.assertIsNone(
+ remaining_packet,
+ "Interface %s: Packet expected from interface "
+ "%s didn't arrive" % (dst_if.name, src_if.name),
+ )
+
+ def get_sflow_counter(self, counter):
+ counters = self.vapi.cli("sh errors").split("\n")
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == "sflow":
+ if re.search(counter, counters[i]) is not None:
+ return int(results[0])
+ return None
+
+ def verify_sflow(self, count):
+ ctr_processed = "sflow packets processed"
+ ctr_sampled = "sflow packets sampled"
+ ctr_dropped = "sflow packets dropped"
+ ctr_ps_sent = "sflow PSAMPLE sent"
+ ctr_ps_fail = "sflow PSAMPLE send failed"
+ processed = self.get_sflow_counter(ctr_processed)
+ sampled = self.get_sflow_counter(ctr_sampled)
+ dropped = self.get_sflow_counter(ctr_dropped)
+ ps_sent = self.get_sflow_counter(ctr_ps_sent)
+ ps_fail = self.get_sflow_counter(ctr_ps_fail)
+ self.assert_equal(processed, count, ctr_processed)
+ self.assert_equal(sampled, count, ctr_sampled)
+ self.assert_equal(dropped, None, ctr_dropped)
+ # TODO decide how to warn if PSAMPLE is not working
+ # It requires a prior "sudo modprobe psample", but
+ # that should probably be done at system boot time
+ # or maybe in a systemctl startup script, so we
+ # should only warn here.
+ self.logger.info(ctr_ps_sent + "=" + str(ps_sent))
+ self.logger.info(ctr_ps_fail + "=" + str(ps_fail))
+
+ def test_basic(self):
+ self.enable_sflow_via_api()
+ count = 7
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, count)
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ # enable capture on both interfaces
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture - the proper count of packets was saved by
+ # create_packet_info() based on dst_if parameter
+ capture = self.pg1.get_capture()
+ # assert nothing captured on pg0 (always do this last, so that
+ # some time has already passed since pg_start())
+ self.pg0.assert_nothing_captured()
+ # verify capture
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify sflow counters
+ self.verify_sflow(count)
diff --git a/test/test_snort.py b/test/test_snort.py
index 19401cb7b85..5335091dba7 100644
--- a/test/test_snort.py
+++ b/test/test_snort.py
@@ -12,10 +12,10 @@ class TestSnort(VppTestCase):
def setUpClass(cls):
super(TestSnort, cls).setUpClass()
try:
- cls.create_pg_interfaces(range(2))
+ cls.create_pg_interfaces(range(4))
for i in cls.pg_interfaces:
i.config_ip4().resolve_arp()
- i.admin_up()
+ i.admin_down()
except Exception:
cls.tearDownClass()
raise
@@ -24,26 +24,28 @@ class TestSnort(VppTestCase):
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
- i.admin_down()
super(TestSnort, cls).tearDownClass()
def test_snort_cli(self):
# TODO: add a test with packets
# { cli command : part of the expected reply }
- print("TEST SNORT CLI")
commands_replies = {
"snort create-instance name snortTest queue-size 16 on-disconnect drop": "",
"snort create-instance name snortTest2 queue-size 16 on-disconnect pass": "",
"snort attach instance snortTest interface pg0 output": "",
"snort attach instance snortTest2 interface pg1 input": "",
+ "snort attach all-instances interface pg2 inout": "",
+ "snort attach instance snortTest instance snortTest2 interface pg3 inout": "",
"show snort instances": "snortTest",
"show snort interfaces": "pg0",
"show snort clients": "number of clients",
"show snort mode": "input mode: interrupt",
"snort mode polling": "",
"snort mode interrupt": "",
- "snort detach interface pg0": "",
- "snort detach interface pg1": "",
+ "snort detach instance snortTest interface pg0": "",
+ "snort detach instance snortTest2 interface pg1": "",
+ "snort detach all-instances interface pg2": "",
+ "snort detach instance snortTest instance snortTest2 interface pg3": "",
"snort delete instance snortTest": "",
}
@@ -64,7 +66,7 @@ class TestSnortVapi(VppTestCase):
for i in cls.pg_interfaces:
i.config_ip4()
i.resolve_arp()
- i.admin_up()
+ i.admin_down()
except Exception:
cls.tearDownClass()
raise
@@ -73,7 +75,6 @@ class TestSnortVapi(VppTestCase):
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
- i.admin_down()
super(TestSnortVapi, cls).tearDownClass()
def test_snort_01_modes_set_interrupt(self):
@@ -109,14 +110,20 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.snort_interface_attach(
instance_index=0, sw_if_index=1, snort_dir=1
)
+ reply = self.vapi.snort_interface_attach(
+ instance_index=0, sw_if_index=2, snort_dir=2
+ )
+ # verify attaching with an invalid direction is rejected
try:
reply = self.vapi.snort_interface_attach(
- instance_index=1, sw_if_index=1, snort_dir=1
+ instance_index=1, sw_if_index=2, snort_dir=4
)
except:
pass
else:
self.assertNotEqual(reply.retval, 0)
+ reply = self.vapi.cli("show snort interfaces")
+ self.assertNotIn("snortTest1", reply)
reply = self.vapi.snort_interface_attach(
instance_index=1, sw_if_index=2, snort_dir=3
@@ -124,6 +131,31 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.cli("show snort interfaces")
self.assertIn("snortTest0", reply)
self.assertIn("snortTest1", reply)
+ self.assertIn("input", reply)
+ self.assertIn("inout", reply)
+ self.assertIn("output", reply)
+
+ # verify attaching a previously attached interface is rejected
+ try:
+ reply = self.vapi.snort_interface_attach(
+ instance_index=1, sw_if_index=2, snort_dir=2
+ )
+ except:
+ pass
+ else:
+ self.assertNotEqual(reply.retval, 0)
+
+ # verify attaching an invalid sw_if_index is rejected
+ try:
+ reply = self.vapi.snort_interface_attach(
+ instance_index=1, sw_if_index=3, snort_dir=2
+ )
+ except:
+ pass
+ else:
+ self.assertNotEqual(reply.retval, 0)
+ reply = self.vapi.cli("show snort interfaces")
+ self.assertIn("snortTest1", reply)
def test_snort_05_delete_instance(self):
"""Instances can be deleted"""
@@ -131,14 +163,14 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.cli("show snort interfaces")
self.assertNotIn("snortTest0", reply)
self.assertIn("snortTest1", reply)
- reply = self.vapi.cli("show snort interfaces")
self.assertNotIn("pg0", reply)
self.assertIn("pg1", reply)
def test_snort_06_detach_if(self):
"""Interfaces can be detached"""
+ # verify detaching an invalid sw_if_index is rejected
try:
- reply = self.vapi.snort_interface_detach(sw_if_index=1)
+ reply = self.vapi.snort_interface_detach(sw_if_index=3)
except:
pass
else: