aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_snat.py38
1 files changed, 37 insertions, 1 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
index 42b07193178..ace17237b38 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -1338,7 +1338,7 @@ class TestDeterministicNAT(MethodHolder):
cls.icmp_id_in = 6305
cls.snat_addr = '10.0.0.3'
- cls.create_pg_interfaces(range(2))
+ cls.create_pg_interfaces(range(3))
cls.interfaces = list(cls.pg_interfaces)
for i in cls.interfaces:
@@ -1483,6 +1483,21 @@ class TestDeterministicNAT(MethodHolder):
self.logger.error("TCP 3 way handshake failed")
raise
+ def verify_ipfix_max_entries_per_user(self, data):
+ """
+ Verify IPFIX maximum entries per user exceeded event
+
+ :param data: Decoded IPFIX data records
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual('\x03\x00\x00\x00', record[466])
+ # sourceIPv4Address
+ self.assertEqual(self.pg0.remote_ip4n, record[8])
+
def test_deterministic_mode(self):
""" S-NAT run deterministic mode """
in_addr = '172.16.255.0'
@@ -1827,6 +1842,11 @@ class TestDeterministicNAT(MethodHolder):
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+ src_address=self.pg2.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
pkts = []
for port in range(1025, 2025):
@@ -1852,10 +1872,26 @@ class TestDeterministicNAT(MethodHolder):
self.assertEqual(1000, dms[0].ses_num)
+ # verify IPFIX logging
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg2.get_capture(2)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_entries_per_user(data)
+
def clear_snat(self):
"""
Clear SNAT configuration.
"""
+ self.vapi.snat_ipfix(enable=0)
self.vapi.snat_det_set_timeouts()
deterministic_mappings = self.vapi.snat_det_map_dump()
for dsm in deterministic_mappings: