diff options
Diffstat (limited to 'test/test_npt66.py')
-rw-r--r-- | test/test_npt66.py | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/test/test_npt66.py b/test/test_npt66.py new file mode 100644 index 00000000000..307dbabc39b --- /dev/null +++ b/test/test_npt66.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +import unittest +import ipaddress +from framework import VppTestCase +from asfframework import VppTestRunner + +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach +from scapy.layers.l2 import Ether +from scapy.packet import Raw + + +class TestNPT66(VppTestCase): + """NPTv6 Test Case""" + + extra_vpp_plugin_config = [ + "plugin npt66_plugin.so {enable}", + ] + + def setUp(self): + super(TestNPT66, self).setUp() + + # create 2 pg interfaces + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip6() + i.admin_down() + super(TestNPT66, self).tearDown() + + def send_and_verify(self, internal, reply_icmp_error=False): + sendif = self.pg0 + recvif = self.pg1 + local_mac = self.pg0.local_mac + remote_mac = self.pg0.remote_mac + src = ipaddress.ip_interface(internal).ip + 1 + dst = self.pg1.remote_ip6 + + p = ( + Ether(dst=local_mac, src=remote_mac) + / IPv6(src=src, dst=dst) + / ICMPv6EchoRequest() + / Raw(b"Request") + ) + # print('Sending packet') + # p.show2() + rxs = self.send_and_expect(sendif, p, recvif) + for rx in rxs: + # print('Received packet') + # rx.show2() + original_cksum = rx[ICMPv6EchoRequest].cksum + del rx[ICMPv6EchoRequest].cksum + rx = rx.__class__(bytes(rx)) + self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum) + + # Generate a replies + if reply_icmp_error: + # print('Generating an ICMP error message') + reply = ( + Ether(dst=rx[Ether].src, src=local_mac) + / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) + / ICMPv6DestUnreach() + / rx[IPv6] + ) + # print('Sending ICMP error message reply') + # reply.show2() + replies = self.send_and_expect(recvif, reply, sendif) + for r in replies: + # print('Received ICMP error message reply on the other side') + # r.show2() + self.assertEqual(str(p[IPv6].src), r[IPv6].dst) + original_cksum = r[ICMPv6EchoRequest].cksum + del r[ICMPv6EchoRequest].cksum + r = r.__class__(bytes(r)) + self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) + + else: + reply = ( + Ether(dst=rx[Ether].src, src=local_mac) + / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) + / ICMPv6EchoRequest() + / Raw(b"Reply") + ) + + replies = self.send_and_expect(recvif, reply, sendif) + for r in replies: + # r.show2() + self.assertEqual(str(p[IPv6].src), r[IPv6].dst) + original_cksum = r[ICMPv6EchoRequest].cksum + del r[ICMPv6EchoRequest].cksum + r = r.__class__(bytes(r)) + self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) + + def do_test(self, internal, external, reply_icmp_error=False): + """Add NPT66 binding and send packet""" + self.vapi.npt66_binding_add_del( + sw_if_index=self.pg1.sw_if_index, + internal=internal, + external=external, + is_add=True, + ) + ## TODO use route api + self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}") + + self.send_and_verify(internal, reply_icmp_error=reply_icmp_error) + + self.vapi.npt66_binding_add_del( + sw_if_index=self.pg1.sw_if_index, + internal=internal, + external=external, + is_add=False, + ) + + def test_npt66_simple(self): + """Send and receive a packet through NPT66""" + + self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48") + self.do_test("fc00:1::/48", "2001:db8:1::/48") + self.do_test("fc00:1234::/32", "2001:db8:1::/32") + self.do_test("fc00:1234::/63", "2001:db8:1::/56") + + def test_npt66_icmp6(self): + """Send and receive a packet through NPT66""" + + # Test ICMP6 error packets + self.do_test( + "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True + ) + self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True) + self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True) + self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True) + + +if __name__ == "__main__": + unittest.main(testRunner=VppTestRunner) |