diff options
author | Vladislav Grishenko <themiron@yandex-team.ru> | 2023-09-14 22:14:38 +0500 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2023-10-16 13:13:00 +0000 |
commit | da34f4add5f141d58670d81d53553986e9a472b4 (patch) | |
tree | da22aadb4d979440daf735ec8640f87a204fef55 /test/test_nat64.py | |
parent | ff344a98afd2057cd0df312a9d7277a95853fd0a (diff) |
nat: add ipfix rate-limiter for nat44-ed, nat44-ei and nat64
This prevents ipfix flood with the repeating events and allows
to enable nat64 max_session and max_bibs events. Also fix wrong
endian for det44 and nat64 ipfix tests, now should be fine with
extended tests enabled.
Max session per user event @ nat44-ei requires more precise rate
limiter per user address, probably with sparse vec, not handled.
Type: improvement
Signed-off-by: Vladislav Grishenko <themiron@yandex-team.ru>
Change-Id: Ib20cc1ee3f81e7acc88a415fe83b4e2deae2a836
Diffstat (limited to 'test/test_nat64.py')
-rw-r--r-- | test/test_nat64.py | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/test/test_nat64.py b/test/test_nat64.py index 05a2031f921..4ae9f6614dd 100644 --- a/test/test_nat64.py +++ b/test/test_nat64.py @@ -462,9 +462,10 @@ class TestNAT64(VppTestCase): # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent - self.assertEqual(struct.pack("I", 2), record[466]) + self.assertEqual(struct.pack("!I", 2), record[466]) # maxBIBEntries - self.assertEqual(struct.pack("I", limit), record[472]) + self.assertEqual(struct.pack("!I", limit), record[472]) + return len(data) def verify_ipfix_bib(self, data, is_create, src_addr): """ @@ -622,9 +623,10 @@ class TestNAT64(VppTestCase): # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent - self.assertEqual(struct.pack("I", 1), record[466]) + self.assertEqual(struct.pack("!I", 1), record[466]) # maxSessionEntries - self.assertEqual(struct.pack("I", limit), record[471]) + self.assertEqual(struct.pack("!I", limit), record[471]) + return len(data) def test_nat64_inside_interface_handles_neighbor_advertisement(self): """NAT64 inside interface handles Neighbor Advertisement""" @@ -1859,7 +1861,7 @@ class TestNAT64(VppTestCase): Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=src, dst=remote_host_ip6) / TCP(sport=12345, dport=25) - ) + ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1878,16 +1880,18 @@ class TestNAT64(VppTestCase): if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set + event_count = 0 for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_sessions(data, max_sessions) + event_count += self.verify_ipfix_max_sessions(data, max_sessions) + self.assertEqual(event_count, 1) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) / TCP(sport=12345, dport=80) - ) + ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1895,6 +1899,7 @@ class TestNAT64(VppTestCase): self.vapi.ipfix_flush() capture = self.pg3.get_capture(1) # verify events in data set + event_count = 0 for p in capture: self.assertTrue(p.haslayer(IPFIX)) self.assertEqual(p[IP].src, self.pg3.local_ip4) @@ -1904,7 +1909,8 @@ class TestNAT64(VppTestCase): self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_bibs(data, max_bibs) + event_count += self.verify_ipfix_max_bibs(data, max_bibs) + self.assertEqual(event_count, 1) def test_ipfix_bib_ses(self): """IPFIX logging NAT64 BIB/session create and delete events""" |