From 2f2db1c2021c5aae564d2783a33366527afb5fce Mon Sep 17 00:00:00 2001 From: Matus Fabian Date: Tue, 18 Apr 2017 05:29:59 -0700 Subject: CGN: IPFIX logging maximum entries per user exceeded event Change-Id: Ie35d7f40f55001e2ef4a38f934f176594f25b189 Signed-off-by: Matus Fabian --- test/test_snat.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/test_snat.py b/test/test_snat.py index 42b07193..ace17237 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -1338,7 +1338,7 @@ class TestDeterministicNAT(MethodHolder): cls.icmp_id_in = 6305 cls.snat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(2)) + cls.create_pg_interfaces(range(3)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: @@ -1483,6 +1483,21 @@ class TestDeterministicNAT(MethodHolder): self.logger.error("TCP 3 way handshake failed") raise + def verify_ipfix_max_entries_per_user(self, data): + """ + Verify IPFIX maximum entries per user exceeded event + + :param data: Decoded IPFIX data records + """ + self.assertEqual(1, len(data)) + record = data[0] + # natEvent + self.assertEqual(ord(record[230]), 13) + # natQuotaExceededEvent + self.assertEqual('\x03\x00\x00\x00', record[466]) + # sourceIPv4Address + self.assertEqual(self.pg0.remote_ip4n, record[8]) + def test_deterministic_mode(self): """ S-NAT run deterministic mode """ in_addr = '172.16.255.0' @@ -1827,6 +1842,11 @@ class TestDeterministicNAT(MethodHolder): self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) + self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n, + src_address=self.pg2.local_ip4n, + path_mtu=512, + template_interval=10) + self.vapi.snat_ipfix() pkts = [] for port in range(1025, 2025): @@ -1852,10 +1872,26 @@ class TestDeterministicNAT(MethodHolder): self.assertEqual(1000, dms[0].ses_num) + # verify IPFIX logging + self.vapi.cli("ipfix flush") # FIXME this should be an API call + capture = self.pg2.get_capture(2) + ipfix = IPFIXDecoder() + # first load template + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + ipfix.add_template(p.getlayer(Template)) + # verify events in data set + for p in capture: + if p.haslayer(Data): + data = ipfix.decode_data_set(p.getlayer(Set)) + self.verify_ipfix_max_entries_per_user(data) + def clear_snat(self): """ Clear SNAT configuration. """ + self.vapi.snat_ipfix(enable=0) self.vapi.snat_det_set_timeouts() deterministic_mappings = self.vapi.snat_det_map_dump() for dsm in deterministic_mappings: -- cgit 1.2.3-korg