From 603e75465899385a95350f3c96499050f7f960a5 Mon Sep 17 00:00:00 2001 From: Filip Varga Date: Tue, 21 Jul 2020 10:27:39 +0200 Subject: nat: move deterministic nat to det44 sub feature Type: refactor Change-Id: I0bb203102a0e13dd7448e2125925ab356bbd7937 Signed-off-by: Filip Varga --- src/plugins/nat/test/test_det44.py | 682 +++++++++++++++++++++++++++++++++++++ src/plugins/nat/test/test_nat.py | 619 --------------------------------- 2 files changed, 682 insertions(+), 619 deletions(-) create mode 100644 src/plugins/nat/test/test_det44.py (limited to 'src/plugins/nat/test') diff --git a/src/plugins/nat/test/test_det44.py b/src/plugins/nat/test/test_det44.py new file mode 100644 index 00000000000..ced77468959 --- /dev/null +++ b/src/plugins/nat/test/test_det44.py @@ -0,0 +1,682 @@ +#!/usr/bin/env python3 + +import socket +import struct +import unittest +import scapy.compat +from time import sleep +from framework import VppTestCase, running_extended_tests +from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder +from scapy.layers.inet import IP, TCP, UDP, ICMP +from scapy.layers.inet import IPerror, UDPerror +from scapy.layers.l2 import Ether +from util import ppp + + +class TestDET44(VppTestCase): + """ Deterministic NAT Test Cases """ + + @classmethod + def setUpClass(cls): + super(TestDET44, cls).setUpClass() + cls.vapi.cli("set log class det44 level debug") + + cls.tcp_port_in = 6303 + cls.tcp_external_port = 6303 + cls.udp_port_in = 6304 + cls.udp_external_port = 6304 + cls.icmp_id_in = 6305 + cls.nat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(3)) + cls.interfaces = list(cls.pg_interfaces) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + cls.pg0.generate_remote_hosts(2) + cls.pg0.configure_ipv4_neighbors() + + @classmethod + def tearDownClass(cls): + super(TestDET44, cls).tearDownClass() + + def setUp(self): + super(TestDET44, self).setUp() + self.vapi.det44_plugin_enable_disable(enable=1) + + def tearDown(self): + super(TestDET44, self).tearDown() + if not self.vpp_dead: + self.vapi.det44_plugin_enable_disable(enable=0) + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show det44 interfaces")) + self.logger.info(self.vapi.cli("show det44 timeouts")) + self.logger.info(self.vapi.cli("show det44 mappings")) + self.logger.info(self.vapi.cli("show det44 sessions")) + + def verify_capture_in(self, capture, in_if): + """ + Verify captured packets on inside network + + :param capture: Captured packets + :param in_if: Inside interface + """ + fired = False + for packet in capture: + try: + self.assert_packet_checksums_valid(packet) + self.assertEqual(packet[IP].dst, in_if.remote_ip4) + if packet.haslayer(TCP): + self.assertEqual(packet[TCP].dport, self.tcp_port_in) + elif packet.haslayer(UDP): + self.assertEqual(packet[UDP].dport, self.udp_port_in) + else: + self.assertEqual(packet[ICMP].id, self.icmp_id_in) + except: + fired = True + self.logger.error(ppp("Unexpected or invalid packet " + "(inside network):", packet)) + if fired: + raise + + def verify_ipfix_max_entries_per_user(self, data, limit, src_addr): + """ + Verify IPFIX maximum entries per user exceeded event + + :param data: Decoded IPFIX data records + :param limit: Number of maximum entries per user + :param src_addr: IPv4 source address + """ + self.assertEqual(1, len(data)) + record = data[0] + # natEvent + self.assertEqual(scapy.compat.orb(record[230]), 13) + # natQuotaExceededEvent + self.assertEqual(struct.pack("I", 3), record[466]) + # maxEntriesPerUser + self.assertEqual(struct.pack("I", limit), record[473]) + # sourceIPv4Address + self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8]) + + def initiate_tcp_session(self, in_if, out_if): + """ + Initiates TCP session 3 WAY HAND SHAKE + + :param in_if: Inside interface + :param out_if: Outside interface + """ + + # SYN packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="S")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = out_if.get_capture(1) + p = capture[0] + self.tcp_port_out = p[TCP].sport + + # SYN + ACK packet out->in + p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / + IP(src=out_if.remote_ip4, dst=self.nat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="SA")) + out_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + in_if.get_capture(1) + + # ACK packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + out_if.get_capture(1) + + def create_stream_in(self, in_if, out_if, ttl=64): + """ + Create packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + :param ttl: TTL of generated packets + """ + pkts = [] + # TCP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + UDP(sport=self.udp_port_in, dport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + ICMP(id=self.icmp_id_in, type='echo-request')) + pkts.append(p) + + return pkts + + def create_stream_out(self, out_if, dst_ip=None, ttl=64): + """ + Create packet stream for outside network + + :param out_if: Outside interface + :param dst_ip: Destination IP address (Default use global NAT address) + :param ttl: TTL of generated packets + """ + if dst_ip is None: + dst_ip = self.nat_addr + pkts = [] + # TCP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + UDP(dport=self.udp_port_out, sport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + ICMP(id=self.icmp_external_id, type='echo-reply')) + pkts.append(p) + + return pkts + + def verify_capture_out(self, capture, nat_ip=None): + """ + Verify captured packets on outside network + + :param capture: Captured packets + :param nat_ip: Translated IP address (Default use global NAT address) + :param same_port: Source port number is not translated (Default False) + """ + if nat_ip is None: + nat_ip = self.nat_addr + for packet in capture: + try: + self.assertEqual(packet[IP].src, nat_ip) + if packet.haslayer(TCP): + self.tcp_port_out = packet[TCP].sport + elif packet.haslayer(UDP): + self.udp_port_out = packet[UDP].sport + else: + self.icmp_external_id = packet[ICMP].id + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + + def test_deterministic_mode(self): + """ NAT plugin run deterministic mode """ + in_addr = '172.16.255.0' + out_addr = '172.17.255.50' + in_addr_t = '172.16.255.20' + in_plen = 24 + out_plen = 32 + + self.vapi.det44_add_del_map(is_add=1, in_addr=in_addr, + in_plen=in_plen, out_addr=out_addr, + out_plen=out_plen) + + rep1 = self.vapi.det44_forward(in_addr_t) + self.assertEqual(str(rep1.out_addr), out_addr) + rep2 = self.vapi.det44_reverse(rep1.out_port_hi, out_addr) + + self.assertEqual(str(rep2.in_addr), in_addr_t) + + deterministic_mappings = self.vapi.det44_map_dump() + self.assertEqual(len(deterministic_mappings), 1) + dsm = deterministic_mappings[0] + self.assertEqual(in_addr, str(dsm.in_addr)) + self.assertEqual(in_plen, dsm.in_plen) + self.assertEqual(out_addr, str(dsm.out_addr)) + self.assertEqual(out_plen, dsm.out_plen) + + def test_set_timeouts(self): + """ Set deterministic NAT timeouts """ + timeouts_before = self.vapi.det44_get_timeouts() + + self.vapi.det44_set_timeouts( + udp=timeouts_before.udp + 10, + tcp_established=timeouts_before.tcp_established + 10, + tcp_transitory=timeouts_before.tcp_transitory + 10, + icmp=timeouts_before.icmp + 10) + + timeouts_after = self.vapi.det44_get_timeouts() + + self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) + self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) + self.assertNotEqual(timeouts_before.tcp_established, + timeouts_after.tcp_established) + self.assertNotEqual(timeouts_before.tcp_transitory, + timeouts_after.tcp_transitory) + + def test_in(self): + """ DET44 translation test (TCP, UDP, ICMP) """ + + nat_ip = "10.0.0.10" + + self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, + in_plen=32, + out_addr=socket.inet_aton(nat_ip), + out_plen=32) + + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip) + + # out2in + pkts = self.create_stream_out(self.pg1, nat_ip) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + # session dump test + sessions = self.vapi.det44_session_dump(self.pg0.remote_ip4) + self.assertEqual(len(sessions), 3) + + # TCP session + s = sessions[0] + self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) + self.assertEqual(s.in_port, self.tcp_port_in) + self.assertEqual(s.out_port, self.tcp_port_out) + self.assertEqual(s.ext_port, self.tcp_external_port) + + # UDP session + s = sessions[1] + self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) + self.assertEqual(s.in_port, self.udp_port_in) + self.assertEqual(s.out_port, self.udp_port_out) + self.assertEqual(s.ext_port, self.udp_external_port) + + # ICMP session + s = sessions[2] + self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) + self.assertEqual(s.in_port, self.icmp_id_in) + self.assertEqual(s.out_port, self.icmp_external_id) + + def test_multiple_users(self): + """ Deterministic NAT multiple users """ + + nat_ip = "10.0.0.10" + port_in = 80 + external_port = 6303 + + host0 = self.pg0.remote_hosts[0] + host1 = self.pg0.remote_hosts[1] + + self.vapi.det44_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24, + out_addr=socket.inet_aton(nat_ip), + out_plen=32) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + + # host0 to out + p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / + IP(src=host0.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=port_in, dport=external_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, nat_ip) + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(tcp.dport, external_port) + port_out0 = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # host1 to out + p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / + IP(src=host1.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=port_in, dport=external_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, nat_ip) + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(tcp.dport, external_port) + port_out1 = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + dms = self.vapi.det44_map_dump() + self.assertEqual(1, len(dms)) + self.assertEqual(2, dms[0].ses_num) + + # out to host0 + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=nat_ip) / + TCP(sport=external_port, dport=port_out0)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg1.remote_ip4) + self.assertEqual(ip.dst, host0.ip4) + self.assertEqual(tcp.dport, port_in) + self.assertEqual(tcp.sport, external_port) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # out to host1 + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=nat_ip) / + TCP(sport=external_port, dport=port_out1)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg1.remote_ip4) + self.assertEqual(ip.dst, host1.ip4) + self.assertEqual(tcp.dport, port_in) + self.assertEqual(tcp.sport, external_port) + except: + self.logger.error(ppp("Unexpected or invalid packet", p)) + raise + + # session close api test + self.vapi.det44_close_session_out(socket.inet_aton(nat_ip), + port_out1, + self.pg1.remote_ip4, + external_port) + dms = self.vapi.det44_map_dump() + self.assertEqual(dms[0].ses_num, 1) + + self.vapi.det44_close_session_in(host0.ip4, + port_in, + self.pg1.remote_ip4, + external_port) + dms = self.vapi.det44_map_dump() + self.assertEqual(dms[0].ses_num, 0) + + def test_tcp_session_close_detection_in(self): + """ DET44 TCP session close from inside network """ + self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, + in_plen=32, + out_addr=socket.inet_aton(self.nat_addr), + out_plen=32) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + + # close the session from inside + try: + # FIN packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="F")) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(1) + + pkts = [] + + # ACK packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="A")) + pkts.append(p) + + # FIN packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="F")) + pkts.append(p) + + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(2) + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(1) + + # Check if deterministic NAT44 closed the session + dms = self.vapi.det44_map_dump() + self.assertEqual(0, dms[0].ses_num) + except: + self.logger.error("TCP session termination failed") + raise + + def test_tcp_session_close_detection_out(self): + """ Deterministic NAT TCP session close from outside network """ + self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, + in_plen=32, + out_addr=socket.inet_aton(self.nat_addr), + out_plen=32) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + + # close the session from outside + try: + # FIN packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="F")) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(1) + + pkts = [] + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + pkts.append(p) + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="F")) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(2) + + # ACK packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="A")) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(1) + + # Check if deterministic NAT44 closed the session + dms = self.vapi.det44_map_dump() + self.assertEqual(0, dms[0].ses_num) + except: + self.logger.error("TCP session termination failed") + raise + + @unittest.skipUnless(running_extended_tests, "part of extended tests") + def test_session_timeout(self): + """ Deterministic NAT session timeouts """ + self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, + in_plen=32, + out_addr=socket.inet_aton(self.nat_addr), + out_plen=32) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + self.vapi.det44_set_timeouts(udp=5, tcp_established=5, + tcp_transitory=5, icmp=5) + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + sleep(15) + + dms = self.vapi.det44_map_dump() + self.assertEqual(0, dms[0].ses_num) + + # TODO: ipfix needs to be separated from NAT base plugin + @unittest.skipUnless(running_extended_tests, "part of extended tests") + def test_session_limit_per_user(self): + """ Deterministic NAT maximum sessions per user limit """ + self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, + in_plen=32, + out_addr=socket.inet_aton(self.nat_addr), + out_plen=32) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + is_add=1, is_inside=1) + self.vapi.det44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1, is_inside=0) + self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4, + src_address=self.pg2.local_ip4, + path_mtu=512, + template_interval=10) + self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, + enable=1) + + pkts = [] + for port in range(1025, 2025): + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(sport=port, dport=port)) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(len(pkts)) + + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(sport=3001, dport=3002)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.assert_nothing_captured() + + # verify ICMP error packet + capture = self.pg0.get_capture(1) + p = capture[0] + self.assertTrue(p.haslayer(ICMP)) + icmp = p[ICMP] + self.assertEqual(icmp.type, 3) + self.assertEqual(icmp.code, 1) + self.assertTrue(icmp.haslayer(IPerror)) + inner_ip = icmp[IPerror] + self.assertEqual(inner_ip[UDPerror].sport, 3001) + self.assertEqual(inner_ip[UDPerror].dport, 3002) + + dms = self.vapi.det44_map_dump() + + self.assertEqual(1000, dms[0].ses_num) + + # verify IPFIX logging + self.vapi.ipfix_flush() + sleep(1) + capture = self.pg2.get_capture(2) + ipfix = IPFIXDecoder() + # first load template + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + ipfix.add_template(p.getlayer(Template)) + # verify events in data set + for p in capture: + if p.haslayer(Data): + data = ipfix.decode_data_set(p.getlayer(Set)) + self.verify_ipfix_max_entries_per_user(data, + 1000, + self.pg0.remote_ip4) + self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, + enable=0) diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index 16ffcfde941..09bf8a2d41e 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -7462,625 +7462,6 @@ class TestNAT44Out2InDPO(MethodHolder): self.verify_capture_in(capture, self.pg0) -class TestDeterministicNAT(MethodHolder): - """ Deterministic NAT Test Cases """ - - @classmethod - def setUpConstants(cls): - super(TestDeterministicNAT, cls).setUpConstants() - cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"]) - - @classmethod - def setUpClass(cls): - super(TestDeterministicNAT, cls).setUpClass() - cls.vapi.cli("set log class nat level debug") - - cls.tcp_port_in = 6303 - cls.tcp_external_port = 6303 - cls.udp_port_in = 6304 - cls.udp_external_port = 6304 - cls.icmp_id_in = 6305 - cls.nat_addr = '10.0.0.3' - - cls.create_pg_interfaces(range(3)) - cls.interfaces = list(cls.pg_interfaces) - - for i in cls.interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - - cls.pg0.generate_remote_hosts(2) - cls.pg0.configure_ipv4_neighbors() - - @classmethod - def tearDownClass(cls): - super(TestDeterministicNAT, cls).tearDownClass() - - def create_stream_in(self, in_if, out_if, ttl=64): - """ - Create packet stream for inside network - - :param in_if: Inside interface - :param out_if: Outside interface - :param ttl: TTL of generated packets - """ - pkts = [] - # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) - pkts.append(p) - - return pkts - - def create_stream_out(self, out_if, dst_ip=None, ttl=64): - """ - Create packet stream for outside network - - :param out_if: Outside interface - :param dst_ip: Destination IP address (Default use global NAT address) - :param ttl: TTL of generated packets - """ - if dst_ip is None: - dst_ip = self.nat_addr - pkts = [] - # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) - pkts.append(p) - - # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_port_out, sport=self.udp_external_port)) - pkts.append(p) - - # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_external_id, type='echo-reply')) - pkts.append(p) - - return pkts - - def verify_capture_out(self, capture, nat_ip=None): - """ - Verify captured packets on outside network - - :param capture: Captured packets - :param nat_ip: Translated IP address (Default use global NAT address) - :param same_port: Source port number is not translated (Default False) - """ - if nat_ip is None: - nat_ip = self.nat_addr - for packet in capture: - try: - self.assertEqual(packet[IP].src, nat_ip) - if packet.haslayer(TCP): - self.tcp_port_out = packet[TCP].sport - elif packet.haslayer(UDP): - self.udp_port_out = packet[UDP].sport - else: - self.icmp_external_id = packet[ICMP].id - except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) - raise - - def test_deterministic_mode(self): - """ NAT plugin run deterministic mode """ - in_addr = '172.16.255.0' - out_addr = '172.17.255.50' - in_addr_t = '172.16.255.20' - in_plen = 24 - out_plen = 32 - - nat_config = self.vapi.nat_show_config() - self.assertEqual(1, nat_config.deterministic) - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=in_addr, - in_plen=in_plen, out_addr=out_addr, - out_plen=out_plen) - - rep1 = self.vapi.nat_det_forward(in_addr_t) - self.assertEqual(str(rep1.out_addr), out_addr) - rep2 = self.vapi.nat_det_reverse(rep1.out_port_hi, out_addr) - - self.assertEqual(str(rep2.in_addr), in_addr_t) - - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 1) - dsm = deterministic_mappings[0] - self.assertEqual(in_addr, str(dsm.in_addr)) - self.assertEqual(in_plen, dsm.in_plen) - self.assertEqual(out_addr, str(dsm.out_addr)) - self.assertEqual(out_plen, dsm.out_plen) - - self.clear_nat_det() - deterministic_mappings = self.vapi.nat_det_map_dump() - self.assertEqual(len(deterministic_mappings), 0) - - def test_set_timeouts(self): - """ Set deterministic NAT timeouts """ - timeouts_before = self.vapi.nat_get_timeouts() - - self.vapi.nat_set_timeouts( - udp=timeouts_before.udp + 10, - tcp_established=timeouts_before.tcp_established + 10, - tcp_transitory=timeouts_before.tcp_transitory + 10, - icmp=timeouts_before.icmp + 10) - - timeouts_after = self.vapi.nat_get_timeouts() - - self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) - self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) - self.assertNotEqual(timeouts_before.tcp_established, - timeouts_after.tcp_established) - self.assertNotEqual(timeouts_before.tcp_transitory, - timeouts_after.tcp_transitory) - - def test_det_in(self): - """ Deterministic NAT translation test (TCP, UDP, ICMP) """ - - nat_ip = "10.0.0.10" - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - # in2out - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip) - - # out2in - pkts = self.create_stream_out(self.pg1, nat_ip) - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) - - # session dump test - sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4) - self.assertEqual(len(sessions), 3) - - # TCP session - s = sessions[0] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.tcp_port_in) - self.assertEqual(s.out_port, self.tcp_port_out) - self.assertEqual(s.ext_port, self.tcp_external_port) - - # UDP session - s = sessions[1] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.udp_port_in) - self.assertEqual(s.out_port, self.udp_port_out) - self.assertEqual(s.ext_port, self.udp_external_port) - - # ICMP session - s = sessions[2] - self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4) - self.assertEqual(s.in_port, self.icmp_id_in) - self.assertEqual(s.out_port, self.icmp_external_id) - - def test_multiple_users(self): - """ Deterministic NAT multiple users """ - - nat_ip = "10.0.0.10" - port_in = 80 - external_port = 6303 - - host0 = self.pg0.remote_hosts[0] - host1 = self.pg0.remote_hosts[1] - - self.vapi.nat_det_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24, - out_addr=socket.inet_aton(nat_ip), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - # host0 to out - p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / - IP(src=host0.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out0 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - # host1 to out - p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / - IP(src=host1.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=external_port)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, nat_ip) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, external_port) - port_out1 = tcp.sport - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - dms = self.vapi.nat_det_map_dump() - self.assertEqual(1, len(dms)) - self.assertEqual(2, dms[0].ses_num) - - # out to host0 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out0)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host0.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise - - # out to host1 - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=external_port, dport=port_out1)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.pg1.remote_ip4) - self.assertEqual(ip.dst, host1.ip4) - self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, external_port) - except: - self.logger.error(ppp("Unexpected or invalid packet", p)) - raise - - # session close api test - self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip), - port_out1, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 1) - - self.vapi.nat_det_close_session_in(host0.ip4, - port_in, - self.pg1.remote_ip4, - external_port) - dms = self.vapi.nat_det_map_dump() - self.assertEqual(dms[0].ses_num, 0) - - def test_tcp_session_close_detection_in(self): - """ Deterministic NAT TCP session close from inside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - - # close the session from inside - try: - # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) - - pkts = [] - - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - pkts.append(p) - - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - pkts.append(p) - - self.pg1.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(2) - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) - - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise - - def test_tcp_session_close_detection_out(self): - """ Deterministic NAT TCP session close from outside network """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - - # close the session from outside - try: - # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(1) - - pkts = [] - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) - pkts.append(p) - - # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) - pkts.append(p) - - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(2) - - # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="A")) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg0.get_capture(1) - - # Check if deterministic NAT44 closed the session - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - except: - self.logger.error("TCP session termination failed") - raise - - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_timeout(self): - """ Deterministic NAT session timeouts """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - self.initiate_tcp_session(self.pg0, self.pg1) - self.vapi.nat_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, - icmp=5) - pkts = self.create_stream_in(self.pg0, self.pg1) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - sleep(15) - - dms = self.vapi.nat_det_map_dump() - self.assertEqual(0, dms[0].ses_num) - - @unittest.skipUnless(running_extended_tests, "part of extended tests") - def test_session_limit_per_user(self): - """ Deterministic NAT maximum sessions per user limit """ - self.vapi.nat_det_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4, - in_plen=32, - out_addr=socket.inet_aton(self.nat_addr), - out_plen=32) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4, - src_address=self.pg2.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=1) - - pkts = [] - for port in range(1025, 2025): - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=port, dport=port)) - pkts.append(p) - - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=3001, dport=3002)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.assert_nothing_captured() - - # verify ICMP error packet - capture = self.pg0.get_capture(1) - p = capture[0] - self.assertTrue(p.haslayer(ICMP)) - icmp = p[ICMP] - self.assertEqual(icmp.type, 3) - self.assertEqual(icmp.code, 1) - self.assertTrue(icmp.haslayer(IPerror)) - inner_ip = icmp[IPerror] - self.assertEqual(inner_ip[UDPerror].sport, 3001) - self.assertEqual(inner_ip[UDPerror].dport, 3002) - - dms = self.vapi.nat_det_map_dump() - - self.assertEqual(1000, dms[0].ses_num) - - # verify IPFIX logging - self.vapi.ipfix_flush() - sleep(1) - capture = self.pg2.get_capture(2) - ipfix = IPFIXDecoder() - # first load template - for p in capture: - self.assertTrue(p.haslayer(IPFIX)) - if p.haslayer(Template): - ipfix.add_template(p.getlayer(Template)) - # verify events in data set - for p in capture: - if p.haslayer(Data): - data = ipfix.decode_data_set(p.getlayer(Set)) - self.verify_ipfix_max_entries_per_user(data, - 1000, - self.pg0.remote_ip4) - - def clear_nat_det(self): - """ - Clear deterministic NAT configuration. - """ - self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, - enable=0) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=240, icmp=60) - deterministic_mappings = self.vapi.nat_det_map_dump() - for dsm in deterministic_mappings: - self.vapi.nat_det_add_del_map(is_add=0, in_addr=dsm.in_addr, - in_plen=dsm.in_plen, - out_addr=dsm.out_addr, - out_plen=dsm.out_plen) - - interfaces = self.vapi.nat44_interface_dump() - for intf in interfaces: - self.vapi.nat44_interface_add_del_feature( - sw_if_index=intf.sw_if_index, - flags=intf.flags) - - def tearDown(self): - super(TestDeterministicNAT, self).tearDown() - if not self.vpp_dead: - self.clear_nat_det() - - def show_commands_at_teardown(self): - self.logger.info(self.vapi.cli("show nat44 interfaces")) - self.logger.info(self.vapi.cli("show nat timeouts")) - self.logger.info( - self.vapi.cli("show nat44 deterministic mappings")) - self.logger.info( - self.vapi.cli("show nat44 deterministic sessions")) - - class TestNAT64(MethodHolder): """ NAT64 Test Cases """ -- cgit 1.2.3-korg