aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorMohsin Kazmi <sykazmi@cisco.com>2024-12-05 20:38:56 +0000
committerBeno�t Ganne <bganne@cisco.com>2024-12-09 09:21:42 +0000
commitf0a126a1eb0129139188a888500689708ef488a4 (patch)
treea9b590cebc095aecc556579cc51f392dee64c820 /test
parent2e67a3f37798bb7ae68e53554a163a4945a46e4b (diff)
cnat: add support for icmp traceroute
Type: improvement Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com> Change-Id: Ief1e97d03b7a934547add35ac3ed1f93f2499a20
Diffstat (limited to 'test')
-rw-r--r--test/test_cnat.py126
1 files changed, 124 insertions, 2 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py
index 9e979a4e09e..8d8f3210577 100644
--- a/test/test_cnat.py
+++ b/test/test_cnat.py
@@ -11,9 +11,10 @@ from config import config
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP, ICMP
-from scapy.layers.inet import IPerror, TCPerror, UDPerror
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6TimeExceeded
from ipaddress import ip_network
@@ -760,12 +761,14 @@ class TestCNatSourceNAT(CnatCommonTestCase):
self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
self.sourcenat_test_icmp_echo_conf(is_v6=True)
+ self.sourcenat_test_icmp_traceroute_conf(is_v6=True)
def test_snat_v4(self):
# """ CNat Source Nat v4 """
self.sourcenat_test_tcp_udp_conf(TCP)
self.sourcenat_test_tcp_udp_conf(UDP)
self.sourcenat_test_icmp_echo_conf()
+ self.sourcenat_test_icmp_traceroute_conf()
def sourcenat_test_icmp_echo_conf(self, is_v6=False):
ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
@@ -774,6 +777,125 @@ class TestCNatSourceNAT(CnatCommonTestCase):
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
ctx.cnat_send_return().cnat_expect_return()
+ def sourcenat_test_icmp_traceroute_conf(self, is_v6=False):
+ # IPv4 ICMP
+ if not is_v6:
+ # Create an ICMP traceroute packet with TTL set to 1.
+ # The CNAT translates the packet, but the NATted packet is dropped
+ # due to the TTL of 1. An ICMP Time Exceeded message is sent
+ # to the source (which is the NATted address).
+ # The packet will be translated once more to the original
+ # source IP address.
+
+ icmp = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(
+ ttl=1,
+ src=self.pg0.remote_hosts[0].ip4,
+ dst=self.pg1.remote_hosts[0].ip4,
+ )
+ / ICMP(id=0xFEED, type=8) # ICMP Type Echo Request
+ / Raw()
+ )
+
+ self.rxs = self.send_and_expect(self.pg0, icmp, self.pg0)
+
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(rx[IP].src, "172.16.1.1")
+ self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMP].code, 0
+ ) # ICMP Code 0 (TTL Zero During Transit)
+ inner = rx[ICMP].payload
+ self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
+ self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
+ self.assertEqual(inner[ICMPerror].id, 0xFEED)
+
+ # source ---> NATted Transit ---> Transit 2 ... ---> Transit N ---> Destination
+ # Simulate an ICMP Time Exceeded message arriving at the NATted Transit
+ # from the Transit N-2 node. This occurs because the NATted packet
+ # is dropped due to a TTL of 1.
+ # An ICMP Time Exceeded message is sent back to the source
+ # (initially the NATted address). The CNAT then translates the message
+ # back to the original source IP address.
+
+ # For ICMP based traffic, snat session uses identifier for session key.
+ # snat allocates a new identifier. To hit the snat session from Transit N-2
+ # to NATed Transit, packet should use snat allocated identifier. To get the
+ # snat allocated identifier, echo request will be sent and captured at the
+ # destination, taken out the identifier from the packet and use it to set
+ # the identifier in the ICMP time exceed packet
+ icmp[IP].ttl = 64
+ rxs = self.send_and_expect(self.pg0, icmp, self.pg1)
+
+ icmp_error = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src="172.16.1.1", dst=self.pg2.remote_hosts[0].ip4)
+ / ICMP(type=11, code=0)
+ / IPerror(
+ src=self.pg2.remote_hosts[0].ip4, dst=self.pg1.remote_hosts[0].ip4
+ )
+ / ICMPerror(id=rxs[0][ICMP].id, type=8)
+ / Raw()
+ )
+
+ self.rxs = self.send_and_expect(self.pg1, icmp_error, self.pg0)
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(rx[IP].src, "172.16.1.1")
+ self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMP].code, 0
+ ) # ICMP Code 0 (TTL Zero During Transit)
+ inner = rx[ICMP].payload
+ self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
+ self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
+ self.assertEqual(inner[ICMPerror].id, 0xFEED)
+
+ # IPv6 ICMPv6
+ if is_v6:
+
+ # Create an ICMPv6 traceroute packet with Hop Limit set to 1.
+ # The CNAT translates the packet, but the NATted packet is dropped
+ # due to the Hop Limit of 1. An ICMPv6 Time Exceeded message is sent
+ # back to the source (which is the NATted address).
+ # The CNAT translates the message once more to restore
+ # the original source IPv6 address.
+ icmp6 = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(
+ hlim=1,
+ src=self.pg0.remote_hosts[0].ip6,
+ dst=self.pg1.remote_hosts[0].ip6,
+ )
+ / ICMPv6EchoRequest(id=0xFEED)
+ / Raw()
+ )
+ self.rxs = self.send_and_expect(self.pg0, icmp6, self.pg0)
+
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_hosts[0].ip6)
+ self.assertEqual(rx[IPv6].src, "fd01:1::1")
+ self.assertEqual(
+ rx[ICMPv6TimeExceeded].type, 3
+ ) # ICMPv6 Type 3 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMPv6TimeExceeded].code, 0
+ ) # ICMPv6 Code 0 (TTL Zero During Transit)
+ inner = rx[ICMPv6TimeExceeded].payload
+ self.assertEqual(inner[IPerror6].src, self.pg0.remote_hosts[0].ip6)
+ self.assertEqual(inner[IPerror6].dst, self.pg1.remote_hosts[0].ip6)
+ self.assertEqual(
+ inner[ICMPv6EchoRequest].type, 128
+ ) # ICMPv6 Echo Request
+ self.assertEqual(inner[ICMPv6EchoRequest].id, 0xFEED)
+
def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
ctx = CnatTestContext(self, L4PROTO, is_v6)
# we should source NAT
@@ -823,7 +945,7 @@ class TestCNatSourceNAT(CnatCommonTestCase):
@unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
class TestCNatDHCP(CnatCommonTestCase):
- """CNat Translation"""
+ """CNat DHCP"""
@classmethod
def setUpClass(cls):