aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat.py
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2018-01-04 04:03:14 -0800
committerMatus Fabian <matfabia@cisco.com>2018-01-08 01:51:24 -0800
commita431ad1c486ad0fd9ca35e14c527fe7611965fc2 (patch)
tree38326f15ef87cba103db422bf7744a33fd8413b1 /test/test_nat.py
parent1049139a6d323e07bfb87710c9d2f1d467e980a9 (diff)
NAT64: IPFix (VPP-1106)
Change-Id: Ib90dc5613c9fdac0344b3bd7f163e2f7163c64d8 Signed-off-by: Matus Fabian <matfabia@cisco.com>
Diffstat (limited to 'test/test_nat.py')
-rw-r--r--test/test_nat.py502
1 files changed, 477 insertions, 25 deletions
diff --git a/test/test_nat.py b/test/test_nat.py
index 4ced0af46e0..7194704046d 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -749,6 +749,148 @@ class MethodHolder(VppTestCase):
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
+ def verify_ipfix_max_sessions(self, data, limit):
+ """
+ Verify IPFIX maximum session entries exceeded event
+
+ :param data: Decoded IPFIX data records
+ :param limit: Number of maximum session entries that can be created.
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual(struct.pack("I", 1), record[466])
+ # maxSessionEntries
+ self.assertEqual(struct.pack("I", limit), record[471])
+
+ def verify_ipfix_max_bibs(self, data, limit):
+ """
+ Verify IPFIX maximum BIB entries exceeded event
+
+ :param data: Decoded IPFIX data records
+ :param limit: Number of maximum BIB entries that can be created.
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual(struct.pack("I", 2), record[466])
+ # maxBIBEntries
+ self.assertEqual(struct.pack("I", limit), record[472])
+
+ def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
+ """
+ Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
+
+ :param data: Decoded IPFIX data records
+ :param limit: Number of maximum fragments pending reassembly
+ :param src_addr: IPv6 source address
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual(struct.pack("I", 5), record[466])
+ # maxFragmentsPendingReassembly
+ self.assertEqual(struct.pack("I", limit), record[475])
+ # sourceIPv6Address
+ self.assertEqual(src_addr, record[27])
+
+ def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
+ """
+ Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
+
+ :param data: Decoded IPFIX data records
+ :param limit: Number of maximum fragments pending reassembly
+ :param src_addr: IPv4 source address
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 13)
+ # natQuotaExceededEvent
+ self.assertEqual(struct.pack("I", 5), record[466])
+ # maxFragmentsPendingReassembly
+ self.assertEqual(struct.pack("I", limit), record[475])
+ # sourceIPv4Address
+ self.assertEqual(src_addr, record[8])
+
+ def verify_ipfix_bib(self, data, is_create, src_addr):
+ """
+ Verify IPFIX NAT64 BIB create and delete events
+
+ :param data: Decoded IPFIX data records
+ :param is_create: Create event if nonzero value otherwise delete event
+ :param src_addr: IPv6 source address
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ if is_create:
+ self.assertEqual(ord(record[230]), 10)
+ else:
+ self.assertEqual(ord(record[230]), 11)
+ # sourceIPv6Address
+ self.assertEqual(src_addr, record[27])
+ # postNATSourceIPv4Address
+ self.assertEqual(self.nat_addr_n, record[225])
+ # protocolIdentifier
+ self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ # ingressVRFID
+ self.assertEqual(struct.pack("!I", 0), record[234])
+ # sourceTransportPort
+ self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
+ # postNAPTSourceTransportPort
+ self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
+
+ def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
+ dst_port):
+ """
+ Verify IPFIX NAT64 session create and delete events
+
+ :param data: Decoded IPFIX data records
+ :param is_create: Create event if nonzero value otherwise delete event
+ :param src_addr: IPv6 source address
+ :param dst_addr: IPv4 destination address
+ :param dst_port: destination TCP port
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ if is_create:
+ self.assertEqual(ord(record[230]), 6)
+ else:
+ self.assertEqual(ord(record[230]), 7)
+ # sourceIPv6Address
+ self.assertEqual(src_addr, record[27])
+ # destinationIPv6Address
+ self.assertEqual(socket.inet_pton(socket.AF_INET6,
+ self.compose_ip6(dst_addr,
+ '64:ff9b::',
+ 96)),
+ record[28])
+ # postNATSourceIPv4Address
+ self.assertEqual(self.nat_addr_n, record[225])
+ # postNATDestinationIPv4Address
+ self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
+ record[226])
+ # protocolIdentifier
+ self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ # ingressVRFID
+ self.assertEqual(struct.pack("!I", 0), record[234])
+ # sourceTransportPort
+ self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
+ # postNAPTSourceTransportPort
+ self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
+ # destinationTransportPort
+ self.assertEqual(struct.pack("!H", dst_port), record[11])
+ # postNAPTDestinationTransportPort
+ self.assertEqual(struct.pack("!H", dst_port), record[228])
+
class TestNAT44(MethodHolder):
""" NAT44 Test Cases """
@@ -2064,7 +2206,7 @@ class TestNAT44(MethodHolder):
self.verify_capture_out(capture)
self.nat44_add_address(self.nat_addr, is_add=0)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
- capture = self.pg3.get_capture(3)
+ capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
@@ -2103,7 +2245,7 @@ class TestNAT44(MethodHolder):
self.pg_start()
capture = self.pg1.get_capture(0)
self.vapi.cli("ipfix flush") # FIXME this should be an API call
- capture = self.pg3.get_capture(3)
+ capture = self.pg3.get_capture(9)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
@@ -2122,6 +2264,63 @@ class TestNAT44(MethodHolder):
data = ipfix.decode_data_set(p.getlayer(Set))
self.verify_ipfix_addr_exhausted(data)
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_ipfix_max_sessions(self):
+ """ IPFIX logging maximum session entries exceeded """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ nat44_config = self.vapi.nat_show_config()
+ max_sessions = 10 * nat44_config.translation_buckets
+
+ pkts = []
+ for i in range(0, max_sessions):
+ src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=src, dst=self.pg1.remote_ip4) /
+ TCP(sport=1025))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg1.get_capture(max_sessions)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=1025))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_sessions(data, max_sessions)
+
def test_pool_addr_fib(self):
""" NAT44 add pool addresses to FIB """
static_addr = '10.0.0.10'
@@ -3196,6 +3395,52 @@ class TestNAT44(MethodHolder):
adresses = self.vapi.nat44_address_dump()
self.assertEqual(0, len(adresses))
+ def test_ipfix_max_frags(self):
+ """ IPFIX logging maximum fragments pending reassembly exceeded """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat_set_reass(max_frag=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ self.tcp_port_in = random.randint(1025, 65535)
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.tcp_port_in,
+ 20,
+ data)
+ self.pg0.add_stream(pkts[-1])
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_fragments_ip4(data, 0,
+ self.pg0.remote_ip4n)
+
def tearDown(self):
super(TestNAT44, self).tearDown()
if not self.vpp_dead:
@@ -3499,6 +3744,8 @@ class TestDeterministicNAT(MethodHolder):
self.assertEqual(ord(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual('\x03\x00\x00\x00', record[466])
+ # maxEntriesPerUser
+ self.assertEqual('\xe8\x03\x00\x00', record[473])
# sourceIPv4Address
self.assertEqual(self.pg0.remote_ip4n, record[8])
@@ -3969,6 +4216,12 @@ class TestNAT64(MethodHolder):
""" NAT64 Test Cases """
@classmethod
+ def setUpConstants(cls):
+ super(TestNAT64, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
+ "nat64 st hash buckets 256", "}"])
+
+ @classmethod
def setUpClass(cls):
super(TestNAT64, cls).setUpClass()
@@ -3985,6 +4238,8 @@ class TestNAT64(MethodHolder):
cls.vrf1_nat_addr = '10.0.10.3'
cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
cls.vrf1_nat_addr)
+ cls.ipfix_src_port = 4739
+ cls.ipfix_domain_id = 1
cls.create_pg_interfaces(range(5))
cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
@@ -4914,6 +5169,220 @@ class TestNAT64(MethodHolder):
addresses = self.vapi.nat64_pool_addr_dump()
self.assertEqual(0, len(adresses))
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_ipfix_max_bibs_sessions(self):
+ """ IPFIX logging maximum session and BIB entries exceeded """
+ max_bibs = 1280
+ max_sessions = 2560
+ remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
+ '64:ff9b::',
+ 96)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ pkts = []
+ src = ""
+ for i in range(0, max_bibs):
+ src = "fd01:aa::%x" % (i)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=src, dst=remote_host_ip6) /
+ TCP(sport=12345, dport=80))
+ pkts.append(p)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=src, dst=remote_host_ip6) /
+ TCP(sport=12345, dport=22))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(max_sessions)
+
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=src, dst=remote_host_ip6) /
+ TCP(sport=12345, dport=25))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_sessions(data, max_sessions)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
+ TCP(sport=12345, dport=80))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(1)
+ # verify events in data set
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_bibs(data, max_bibs)
+
+ def test_ipfix_max_frags(self):
+ """ IPFIX logging maximum fragments pending reassembly exceeded """
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ data = 'a' * 200
+ pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
+ self.tcp_port_in, 20, data)
+ self.pg0.add_stream(pkts[-1])
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_fragments_ip6(data, 0,
+ self.pg0.remote_ip6n)
+
+ def test_ipfix_bib_ses(self):
+ """ IPFIX logging NAT64 BIB/session create and delete events """
+ self.tcp_port_in = random.randint(1025, 65535)
+ remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
+ '64:ff9b::',
+ 96)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ # Create
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
+ TCP(sport=self.tcp_port_in, dport=25))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg1.get_capture(1)
+ self.tcp_port_out = p[0][TCP].sport
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(10)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ if ord(data[0][230]) == 10:
+ self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
+ elif ord(data[0][230]) == 6:
+ self.verify_ipfix_nat64_ses(data,
+ 1,
+ self.pg0.remote_ip6n,
+ self.pg1.remote_ip4,
+ 25)
+ else:
+ self.logger.error(ppp("Unexpected or invalid packet: ", p))
+
+ # Delete
+ self.pg_enable_capture(self.pg_interfaces)
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n,
+ is_add=0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(2)
+ # verify events in data set
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ if ord(data[0][230]) == 11:
+ self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
+ elif ord(data[0][230]) == 7:
+ self.verify_ipfix_nat64_ses(data,
+ 0,
+ self.pg0.remote_ip6n,
+ self.pg1.remote_ip4,
+ 25)
+ else:
+ self.logger.error(ppp("Unexpected or invalid packet: ", p))
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.
@@ -4925,6 +5394,11 @@ class TestNAT64(MethodHolder):
"""
Clear NAT64 configuration.
"""
+ self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
+ domain_id=self.ipfix_domain_id)
+ self.ipfix_src_port = 4739
+ self.ipfix_domain_id = 1
+
self.vapi.nat64_set_timeouts()
interfaces = self.vapi.nat64_interface_dump()
@@ -4937,29 +5411,7 @@ class TestNAT64(MethodHolder):
intf.is_inside,
is_add=0)
- bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
- for bibe in bib:
- if bibe.is_static:
- self.vapi.nat64_add_del_static_bib(bibe.i_addr,
- bibe.o_addr,
- bibe.i_port,
- bibe.o_port,
- bibe.proto,
- bibe.vrf_id,
- is_add=0)
-
- bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
- for bibe in bib:
- if bibe.is_static:
- self.vapi.nat64_add_del_static_bib(bibe.i_addr,
- bibe.o_addr,
- bibe.i_port,
- bibe.o_port,
- bibe.proto,
- bibe.vrf_id,
- is_add=0)
-
- bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
+ bib = self.vapi.nat64_bib_dump(255)
for bibe in bib:
if bibe.is_static:
self.vapi.nat64_add_del_static_bib(bibe.i_addr,