diff options
author | Matus Fabian <matfabia@cisco.com> | 2017-04-18 05:29:59 -0700 |
---|---|---|
committer | Ole Trøan <otroan@employees.org> | 2017-04-21 08:21:45 +0000 |
commit | 2f2db1c2021c5aae564d2783a33366527afb5fce (patch) | |
tree | 8dc4c75b51a6f1022c50ee21edd0f209c5f87815 /test/test_snat.py | |
parent | 5fec1e8b2282f4d3d1d02556020254a84c3b6e3d (diff) |
CGN: IPFIX logging
maximum entries per user exceeded event
Change-Id: Ie35d7f40f55001e2ef4a38f934f176594f25b189
Signed-off-by: Matus Fabian <matfabia@cisco.com>
Diffstat (limited to 'test/test_snat.py')
-rw-r--r-- | test/test_snat.py | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/test/test_snat.py b/test/test_snat.py index 42b07193178..ace17237b38 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -1338,7 +1338,7 @@ class TestDeterministicNAT(MethodHolder): cls.icmp_id_in = 6305 cls.snat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(2)) + cls.create_pg_interfaces(range(3)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: @@ -1483,6 +1483,21 @@ class TestDeterministicNAT(MethodHolder): self.logger.error("TCP 3 way handshake failed") raise + def verify_ipfix_max_entries_per_user(self, data): + """ + Verify IPFIX maximum entries per user exceeded event + + :param data: Decoded IPFIX data records + """ + self.assertEqual(1, len(data)) + record = data[0] + # natEvent + self.assertEqual(ord(record[230]), 13) + # natQuotaExceededEvent + self.assertEqual('\x03\x00\x00\x00', record[466]) + # sourceIPv4Address + self.assertEqual(self.pg0.remote_ip4n, record[8]) + def test_deterministic_mode(self): """ S-NAT run deterministic mode """ in_addr = '172.16.255.0' @@ -1827,6 +1842,11 @@ class TestDeterministicNAT(MethodHolder): self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) + self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n, + src_address=self.pg2.local_ip4n, + path_mtu=512, + template_interval=10) + self.vapi.snat_ipfix() pkts = [] for port in range(1025, 2025): @@ -1852,10 +1872,26 @@ class TestDeterministicNAT(MethodHolder): self.assertEqual(1000, dms[0].ses_num) + # verify IPFIX logging + self.vapi.cli("ipfix flush") # FIXME this should be an API call + capture = self.pg2.get_capture(2) + ipfix = IPFIXDecoder() + # first load template + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + ipfix.add_template(p.getlayer(Template)) + # verify events in data set + for p in capture: + if p.haslayer(Data): + data = ipfix.decode_data_set(p.getlayer(Set)) + self.verify_ipfix_max_entries_per_user(data) + def clear_snat(self): """ Clear SNAT configuration. """ + self.vapi.snat_ipfix(enable=0) self.vapi.snat_det_set_timeouts() deterministic_mappings = self.vapi.snat_det_map_dump() for dsm in deterministic_mappings: |