diff options
Diffstat (limited to 'test/test_ip6.py')
-rw-r--r-- | test/test_ip6.py | 114 |
1 files changed, 113 insertions, 1 deletions
diff --git a/test/test_ip6.py b/test/test_ip6.py index aad3713c4c0..dbe87465f7b 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -13,7 +13,7 @@ from vpp_neighbor import find_nbr, VppNeighbor from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q -from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \ +from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \ ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \ ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \ ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types @@ -1506,5 +1506,117 @@ class TestIP6LoadBalance(VppTestCase): self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3) +class TestIP6Punt(VppTestCase): + """ IPv6 Punt Police/Redirect """ + + def setUp(self): + super(TestIP6Punt, self).setUp() + + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + super(TestIP6Punt, self).tearDown() + for i in self.pg_interfaces: + i.unconfig_ip6() + i.admin_down() + + def send_and_expect(self, input, pkts, output): + input.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = output.get_capture(len(pkts)) + return rx + + def send_and_assert_no_replies(self, intf, pkts, remark): + self.vapi.cli("clear trace") + intf.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + for i in self.pg_interfaces: + i.get_capture(0) + i.assert_nothing_captured(remark=remark) + + def test_ip_punt(self): + """ IP6 punt police and redirect """ + + p = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) / + TCP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + pkts = p * 1025 + + # + # Configure a punt redirect via pg1. + # + nh_addr = inet_pton(AF_INET6, + self.pg1.remote_ip6) + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg1.sw_if_index, + nh_addr, + is_ip6=1) + + self.send_and_expect(self.pg0, pkts, self.pg1) + + # + # add a policer + # + policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0, + rate_type=1) + self.vapi.ip_punt_police(policer.policer_index, is_ip6=1) + + self.vapi.cli("clear trace") + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # + # the number of packet recieved should be greater than 0, + # but not equal to the number sent, since some were policed + # + rx = self.pg1._get_capture(1) + self.assertTrue(len(rx) > 0) + self.assertTrue(len(rx) < len(pkts)) + + # + # remove the poilcer. back to full rx + # + self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1) + self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0, + rate_type=1, is_add=0) + self.send_and_expect(self.pg0, pkts, self.pg1) + + # + # remove the redirect. expect full drop. + # + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg1.sw_if_index, + nh_addr, + is_add=0, + is_ip6=1) + self.send_and_assert_no_replies(self.pg0, pkts, + "IP no punt config") + + # + # Add a redirect that is not input port selective + # + self.vapi.ip_punt_redirect(0xffffffff, + self.pg1.sw_if_index, + nh_addr, + is_ip6=1) + self.send_and_expect(self.pg0, pkts, self.pg1) + + self.vapi.ip_punt_redirect(0xffffffff, + self.pg1.sw_if_index, + nh_addr, + is_add=0, + is_ip6=1) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |