diff options
author | Ole Troan <otroan@employees.org> | 2023-10-12 18:54:55 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2023-10-16 10:59:22 +0000 |
commit | ff344a98afd2057cd0df312a9d7277a95853fd0a (patch) | |
tree | 08243d8cf38c29e6d14610aa5169adeab2575471 /test/test_npt66.py | |
parent | 1fe132ec1a33edf89503140aff0a26ed73542296 (diff) |
npt66: icmp6 alg to handle icmp6 error messages
Support rewriting the inner packet for ICMP6 error messages.
Type: feature
Change-Id: I7e11f53626037075a23310f1cb7e673b0cb52843
Signed-off-by: Ole Troan <otroan@employees.org>
Diffstat (limited to 'test/test_npt66.py')
-rw-r--r-- | test/test_npt66.py | 76 |
1 files changed, 57 insertions, 19 deletions
diff --git a/test/test_npt66.py b/test/test_npt66.py index 44a9e87cb89..c8676219458 100644 --- a/test/test_npt66.py +++ b/test/test_npt66.py @@ -4,7 +4,7 @@ import unittest import ipaddress from framework import VppTestCase, VppTestRunner -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach from scapy.layers.l2 import Ether from scapy.packet import Raw @@ -33,7 +33,7 @@ class TestNPT66(VppTestCase): i.admin_down() super(TestNPT66, self).tearDown() - def send_and_verify(self, internal): + def send_and_verify(self, internal, reply_icmp_error=False): sendif = self.pg0 recvif = self.pg1 local_mac = self.pg0.local_mac @@ -47,30 +47,57 @@ class TestNPT66(VppTestCase): / ICMPv6EchoRequest() / Raw(b"Request") ) + # print('Sending packet') + # p.show2() rxs = self.send_and_expect(sendif, p, recvif) for rx in rxs: + # print('Received packet') + # rx.show2() original_cksum = rx[ICMPv6EchoRequest].cksum del rx[ICMPv6EchoRequest].cksum rx = rx.__class__(bytes(rx)) self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum) # Generate a replies - reply = ( - Ether(dst=rx[Ether].src, src=local_mac) - / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) - / ICMPv6EchoRequest() - / Raw(b"Reply") - ) - - replies = self.send_and_expect(recvif, reply, sendif) - for r in replies: - self.assertEqual(str(p[IPv6].src), r[IPv6].dst) - original_cksum = r[ICMPv6EchoRequest].cksum - del r[ICMPv6EchoRequest].cksum - r = r.__class__(bytes(r)) - self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) - - def do_test(self, internal, external): + if reply_icmp_error: + # print('Generating an ICMP error message') + reply = ( + Ether(dst=rx[Ether].src, src=local_mac) + / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) + / ICMPv6DestUnreach() + / rx[IPv6] + ) + # print('Sending ICMP error message reply') + # reply.show2() + replies = self.send_and_expect(recvif, reply, sendif) + for r in replies: + # print('Received ICMP error message reply on the other side') + # r.show2() + self.assertEqual(str(p[IPv6].src), r[IPv6].dst) + original_cksum = r[ICMPv6EchoRequest].cksum + del r[ICMPv6EchoRequest].cksum + r = r.__class__(bytes(r)) + self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) + + else: + reply = ( + Ether(dst=rx[Ether].src, src=local_mac) + / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) + / ICMPv6EchoRequest() + / Raw(b"Reply") + ) + + replies = self.send_and_expect(recvif, reply, sendif) + for r in replies: + r.show2() + self.assertEqual(str(p[IPv6].src), r[IPv6].dst) + original_cksum = r[ICMPv6EchoRequest].cksum + del r[ICMPv6EchoRequest].cksum + r = r.__class__(bytes(r)) + self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) + + def do_test(self, internal, external, reply_icmp_error=False): + """Add NPT66 binding and send packet""" self.vapi.npt66_binding_add_del( sw_if_index=self.pg1.sw_if_index, internal=internal, @@ -80,7 +107,7 @@ class TestNPT66(VppTestCase): ## TODO use route api self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}") - self.send_and_verify(internal) + self.send_and_verify(internal, reply_icmp_error=reply_icmp_error) self.vapi.npt66_binding_add_del( sw_if_index=self.pg1.sw_if_index, @@ -97,6 +124,17 @@ class TestNPT66(VppTestCase): self.do_test("fc00:1234::/32", "2001:db8:1::/32") self.do_test("fc00:1234::/63", "2001:db8:1::/56") + def test_npt66_icmp6(self): + """Send and receive a packet through NPT66""" + + # Test ICMP6 error packets + self.do_test( + "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True + ) + self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True) + self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True) + self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True) + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |