diff options
Diffstat (limited to 'test/test_urpf.py')
-rw-r--r-- | test/test_urpf.py | 312 |
1 files changed, 192 insertions, 120 deletions
diff --git a/test/test_urpf.py b/test/test_urpf.py index 8f4e563f8bc..1e4a6c5bb34 100644 --- a/test/test_urpf.py +++ b/test/test_urpf.py @@ -2,11 +2,12 @@ import unittest -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase +from asfframework import VppTestRunner from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP, ICMP +from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from vpp_papi import VppEnum @@ -15,7 +16,7 @@ N_PKTS = 63 class TestURPF(VppTestCase): - """ Unicast Reverse Path Forwarding Test Case """ + """Unicast Reverse Path Forwarding Test Case""" @classmethod def setUpClass(cls): @@ -47,26 +48,27 @@ class TestURPF(VppTestCase): super(TestURPF, self).tearDown() def test_urpf4(self): - """ uRPF IP4 """ + """uRPF IP4""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src="3.3.3.3", dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg2.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src="3.3.3.3", dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -78,10 +80,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -90,16 +94,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -107,16 +112,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -129,22 +135,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -152,48 +161,49 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg1.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) def test_urpf6(self): - """ uRPF IP6 """ + """uRPF IP6""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src="3::3", dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg2.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src="3::3", dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -205,10 +215,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -217,16 +229,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -234,16 +247,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -256,22 +270,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -279,27 +296,82 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg1.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) + def test_interface_dump(self): + """uRPF Interface Dump""" -if __name__ == '__main__': + self.create_loopback_interfaces(3) + e = VppEnum + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.loop1.sw_if_index, + ) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.loop2.sw_if_index, + ) + + ret = self.vapi.urpf_interface_dump() + self.assertEqual(len(ret), 2) + + dump_loop1 = ret[0] + dump_loop2 = ret[1] + self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index) + self.assertTrue(dump_loop1.is_input) + self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT) + self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4) + self.assertEqual(dump_loop2.sw_if_index, self.loop2.sw_if_index) + self.assertFalse(dump_loop2.is_input) + self.assertEqual(dump_loop2.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE) + self.assertEqual(dump_loop2.af, e.vl_api_address_family_t.ADDRESS_IP6) + + ret = self.vapi.urpf_interface_dump(sw_if_index=self.loop1.sw_if_index) + self.assertEqual(len(ret), 1) + + dump_loop1 = ret[0] + self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index) + self.assertTrue(dump_loop1.is_input) + self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT) + self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4) + + # cleanup + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.loop1.sw_if_index, + ) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.loop2.sw_if_index, + ) + + +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |