diff options
Diffstat (limited to 'test/test_nat44_ei.py')
-rw-r--r-- | test/test_nat44_ei.py | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py index 15eb70a6664..dc74d03771b 100644 --- a/test/test_nat44_ei.py +++ b/test/test_nat44_ei.py @@ -663,6 +663,7 @@ class MethodHolder(VppTestCase): self.assertEqual(scapy.compat.orb(record[230]), 3) # natPoolID self.assertEqual(struct.pack("!I", 0), record[283]) + return len(data) def verify_ipfix_max_sessions(self, data, limit): self.assertEqual(1, len(data)) @@ -673,6 +674,7 @@ class MethodHolder(VppTestCase): self.assertEqual(struct.pack("!I", 1), record[466]) # maxSessionEntries self.assertEqual(struct.pack("!I", limit), record[471]) + return len(data) def verify_no_nat44_user(self): """Verify that there is no NAT44EI user""" @@ -2463,7 +2465,7 @@ class TestNAT44EI(MethodHolder): Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=3025) - ) + ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2482,10 +2484,12 @@ class TestNAT44EI(MethodHolder): if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set + event_count = 0 for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_addr_exhausted(data) + event_count += self.verify_ipfix_addr_exhausted(data) + self.assertEqual(event_count, 1) def test_ipfix_max_sessions(self): """NAT44EI IPFIX logging maximum session entries exceeded""" @@ -2529,7 +2533,7 @@ class TestNAT44EI(MethodHolder): Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=1025) - ) + ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2548,10 +2552,14 @@ class TestNAT44EI(MethodHolder): if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set + event_count = 0 for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_sessions(data, max_sessions_per_thread) + event_count += self.verify_ipfix_max_sessions( + data, max_sessions_per_thread + ) + self.assertEqual(event_count, 1) def test_syslog_apmap(self): """NAT44EI syslog address and port mapping creation and deletion""" |