diff options
author | 2024-12-05 20:38:56 +0000 | |
---|---|---|
committer | 2024-12-09 09:21:42 +0000 | |
commit | f0a126a1eb0129139188a888500689708ef488a4 (patch) | |
tree | a9b590cebc095aecc556579cc51f392dee64c820 /test | |
parent | 2e67a3f37798bb7ae68e53554a163a4945a46e4b (diff) |
cnat: add support for icmp traceroute
Type: improvement
Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com>
Change-Id: Ief1e97d03b7a934547add35ac3ed1f93f2499a20
Diffstat (limited to 'test')
-rw-r--r-- | test/test_cnat.py | 126 |
1 files changed, 124 insertions, 2 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py index 9e979a4e09e..8d8f3210577 100644 --- a/test/test_cnat.py +++ b/test/test_cnat.py @@ -11,9 +11,10 @@ from config import config from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP, ICMP -from scapy.layers.inet import IPerror, TCPerror, UDPerror +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import ICMPv6TimeExceeded from ipaddress import ip_network @@ -760,12 +761,14 @@ class TestCNatSourceNAT(CnatCommonTestCase): self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True) self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True) self.sourcenat_test_icmp_echo_conf(is_v6=True) + self.sourcenat_test_icmp_traceroute_conf(is_v6=True) def test_snat_v4(self): # """ CNat Source Nat v4 """ self.sourcenat_test_tcp_udp_conf(TCP) self.sourcenat_test_tcp_udp_conf(UDP) self.sourcenat_test_icmp_echo_conf() + self.sourcenat_test_icmp_traceroute_conf() def sourcenat_test_icmp_echo_conf(self, is_v6=False): ctx = CnatTestContext(self, ICMP, is_v6=is_v6) @@ -774,6 +777,125 @@ class TestCNatSourceNAT(CnatCommonTestCase): ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8) ctx.cnat_send_return().cnat_expect_return() + def sourcenat_test_icmp_traceroute_conf(self, is_v6=False): + # IPv4 ICMP + if not is_v6: + # Create an ICMP traceroute packet with TTL set to 1. + # The CNAT translates the packet, but the NATted packet is dropped + # due to the TTL of 1. An ICMP Time Exceeded message is sent + # to the source (which is the NATted address). + # The packet will be translated once more to the original + # source IP address. + + icmp = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP( + ttl=1, + src=self.pg0.remote_hosts[0].ip4, + dst=self.pg1.remote_hosts[0].ip4, + ) + / ICMP(id=0xFEED, type=8) # ICMP Type Echo Request + / Raw() + ) + + self.rxs = self.send_and_expect(self.pg0, icmp, self.pg0) + + for rx in self.rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4) + self.assertEqual(rx[IP].src, "172.16.1.1") + self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded) + self.assertEqual( + rx[ICMP].code, 0 + ) # ICMP Code 0 (TTL Zero During Transit) + inner = rx[ICMP].payload + self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4) + self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4) + self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request + self.assertEqual(inner[ICMPerror].id, 0xFEED) + + # source ---> NATted Transit ---> Transit 2 ... ---> Transit N ---> Destination + # Simulate an ICMP Time Exceeded message arriving at the NATted Transit + # from the Transit N-2 node. This occurs because the NATted packet + # is dropped due to a TTL of 1. + # An ICMP Time Exceeded message is sent back to the source + # (initially the NATted address). The CNAT then translates the message + # back to the original source IP address. + + # For ICMP based traffic, snat session uses identifier for session key. + # snat allocates a new identifier. To hit the snat session from Transit N-2 + # to NATed Transit, packet should use snat allocated identifier. To get the + # snat allocated identifier, echo request will be sent and captured at the + # destination, taken out the identifier from the packet and use it to set + # the identifier in the ICMP time exceed packet + icmp[IP].ttl = 64 + rxs = self.send_and_expect(self.pg0, icmp, self.pg1) + + icmp_error = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src="172.16.1.1", dst=self.pg2.remote_hosts[0].ip4) + / ICMP(type=11, code=0) + / IPerror( + src=self.pg2.remote_hosts[0].ip4, dst=self.pg1.remote_hosts[0].ip4 + ) + / ICMPerror(id=rxs[0][ICMP].id, type=8) + / Raw() + ) + + self.rxs = self.send_and_expect(self.pg1, icmp_error, self.pg0) + for rx in self.rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4) + self.assertEqual(rx[IP].src, "172.16.1.1") + self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded) + self.assertEqual( + rx[ICMP].code, 0 + ) # ICMP Code 0 (TTL Zero During Transit) + inner = rx[ICMP].payload + self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4) + self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4) + self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request + self.assertEqual(inner[ICMPerror].id, 0xFEED) + + # IPv6 ICMPv6 + if is_v6: + + # Create an ICMPv6 traceroute packet with Hop Limit set to 1. + # The CNAT translates the packet, but the NATted packet is dropped + # due to the Hop Limit of 1. An ICMPv6 Time Exceeded message is sent + # back to the source (which is the NATted address). + # The CNAT translates the message once more to restore + # the original source IPv6 address. + icmp6 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6( + hlim=1, + src=self.pg0.remote_hosts[0].ip6, + dst=self.pg1.remote_hosts[0].ip6, + ) + / ICMPv6EchoRequest(id=0xFEED) + / Raw() + ) + self.rxs = self.send_and_expect(self.pg0, icmp6, self.pg0) + + for rx in self.rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IPv6].dst, self.pg0.remote_hosts[0].ip6) + self.assertEqual(rx[IPv6].src, "fd01:1::1") + self.assertEqual( + rx[ICMPv6TimeExceeded].type, 3 + ) # ICMPv6 Type 3 (Time Exceeded) + self.assertEqual( + rx[ICMPv6TimeExceeded].code, 0 + ) # ICMPv6 Code 0 (TTL Zero During Transit) + inner = rx[ICMPv6TimeExceeded].payload + self.assertEqual(inner[IPerror6].src, self.pg0.remote_hosts[0].ip6) + self.assertEqual(inner[IPerror6].dst, self.pg1.remote_hosts[0].ip6) + self.assertEqual( + inner[ICMPv6EchoRequest].type, 128 + ) # ICMPv6 Echo Request + self.assertEqual(inner[ICMPv6EchoRequest].id, 0xFEED) + def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False): ctx = CnatTestContext(self, L4PROTO, is_v6) # we should source NAT @@ -823,7 +945,7 @@ class TestCNatSourceNAT(CnatCommonTestCase): @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests") class TestCNatDHCP(CnatCommonTestCase): - """CNat Translation""" + """CNat DHCP""" @classmethod def setUpClass(cls): |