diff options
Diffstat (limited to 'test/test_urpf.py')
-rw-r--r-- | test/test_urpf.py | 252 |
1 files changed, 134 insertions, 118 deletions
diff --git a/test/test_urpf.py b/test/test_urpf.py index 8f4e563f8bc..e0dc1210bfc 100644 --- a/test/test_urpf.py +++ b/test/test_urpf.py @@ -15,7 +15,7 @@ N_PKTS = 63 class TestURPF(VppTestCase): - """ Unicast Reverse Path Forwarding Test Case """ + """Unicast Reverse Path Forwarding Test Case""" @classmethod def setUpClass(cls): @@ -47,26 +47,27 @@ class TestURPF(VppTestCase): super(TestURPF, self).tearDown() def test_urpf4(self): - """ uRPF IP4 """ + """uRPF IP4""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src="3.3.3.3", dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg2.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src="3.3.3.3", dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -78,10 +79,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -90,16 +93,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -107,16 +111,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -129,22 +134,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -152,48 +160,49 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg1.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) def test_urpf6(self): - """ uRPF IP6 """ + """uRPF IP6""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src="3::3", dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg2.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src="3::3", dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -205,10 +214,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -217,16 +228,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -234,16 +246,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -256,22 +269,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -279,27 +295,27 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg1.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |