aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_urpf.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_urpf.py')
-rw-r--r--test/test_urpf.py312
1 files changed, 192 insertions, 120 deletions
diff --git a/test/test_urpf.py b/test/test_urpf.py
index 8f4e563f8bc..1e4a6c5bb34 100644
--- a/test/test_urpf.py
+++ b/test/test_urpf.py
@@ -2,11 +2,12 @@
import unittest
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, ICMP
+from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from vpp_papi import VppEnum
@@ -15,7 +16,7 @@ N_PKTS = 63
class TestURPF(VppTestCase):
- """ Unicast Reverse Path Forwarding Test Case """
+ """Unicast Reverse Path Forwarding Test Case"""
@classmethod
def setUpClass(cls):
@@ -47,26 +48,27 @@ class TestURPF(VppTestCase):
super(TestURPF, self).tearDown()
def test_urpf4(self):
- """ uRPF IP4 """
+ """uRPF IP4"""
e = VppEnum
- p_spoof_loose = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IP(src="3.3.3.3", dst=self.pg1.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100)) * N_PKTS
- p_spoof_strict = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IP(src=self.pg2.remote_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100)) * N_PKTS
- p_good = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100)) * N_PKTS
+ p_spoof_loose = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src="3.3.3.3", dst=self.pg1.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
+ p_spoof_strict = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
+ p_good = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
#
# before adding the uRPF, ensure all packets are forwarded
@@ -78,10 +80,12 @@ class TestURPF(VppTestCase):
#
# apply loose uRPF check on pg0 rx
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg0.sw_if_index,
+ )
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
@@ -90,16 +94,17 @@ class TestURPF(VppTestCase):
# packets from address to which there is no route are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS)
#
# crank it up to strict mode
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg0.sw_if_index,
+ )
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
@@ -107,16 +112,17 @@ class TestURPF(VppTestCase):
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop",
- 2 * N_PKTS)
+ self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
#
# disable uRPF, all traffic should pass
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg0.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
@@ -129,22 +135,25 @@ class TestURPF(VppTestCase):
# for strict they should not be forwarded if they would be
# forwarded thru that interface.
#
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg1.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS)
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg1.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
# the strict packet, from a peer is allowed, since it does
@@ -152,48 +161,49 @@ class TestURPF(VppTestCase):
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS)
# change the strict packet so that it would forward through pg1
- p_spoof_strict = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IP(src=self.pg1.remote_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100)) * N_PKTS
+ p_spoof_strict = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
- self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop",
- 2 * N_PKTS)
+ self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
# cleanup
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
- af=e.vl_api_address_family_t.ADDRESS_IP4,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.pg1.sw_if_index,
+ )
def test_urpf6(self):
- """ uRPF IP6 """
+ """uRPF IP6"""
e = VppEnum
- p_spoof_loose = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IPv6(src="3::3", dst=self.pg1.remote_ip6) /
- UDP(sport=1236, dport=1236) /
- Raw(b'\xa5' * 100)) * N_PKTS
- p_spoof_strict = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IPv6(src=self.pg2.remote_ip6,
- dst=self.pg1.remote_ip6) /
- UDP(sport=1236, dport=1236) /
- Raw(b'\xa5' * 100)) * N_PKTS
- p_good = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6,
- dst=self.pg1.remote_ip6) /
- UDP(sport=1236, dport=1236) /
- Raw(b'\xa5' * 100)) * N_PKTS
+ p_spoof_loose = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src="3::3", dst=self.pg1.remote_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
+ p_spoof_strict = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
+ p_good = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
#
# before adding the uRPF, ensure all packets are forwarded
@@ -205,10 +215,12 @@ class TestURPF(VppTestCase):
#
# apply loose uRPF check on pg0 rx
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg0.sw_if_index,
+ )
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
@@ -217,16 +229,17 @@ class TestURPF(VppTestCase):
# packets from address to which there is no route are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS)
#
# crank it up to strict mode
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg0.sw_if_index,
+ )
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
@@ -234,16 +247,17 @@ class TestURPF(VppTestCase):
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop",
- 2 * N_PKTS)
+ self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
#
# disable uRPF, all traffic should pass
#
- self.vapi.urpf_update(is_input=True,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg0.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg0.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
@@ -256,22 +270,25 @@ class TestURPF(VppTestCase):
# for strict they should not be forwarded if they would be
# forwarded thru that interface.
#
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg1.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS)
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg1.sw_if_index,
+ )
self.send_and_expect(self.pg0, p_good, self.pg1)
# the strict packet, from a peer is allowed, since it does
@@ -279,27 +296,82 @@ class TestURPF(VppTestCase):
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
- self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop",
- N_PKTS)
+ self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS)
# change the strict packet so that it would forward through pg1
- p_spoof_strict = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_mac) /
- IPv6(src=self.pg1.remote_ip6,
- dst=self.pg1.remote_ip6) /
- UDP(sport=1236, dport=1236) /
- Raw(b'\xa5' * 100)) * N_PKTS
+ p_spoof_strict = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ ) * N_PKTS
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
- self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop",
- 2 * N_PKTS)
+ self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
# cleanup
- self.vapi.urpf_update(is_input=False,
- mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
- af=e.vl_api_address_family_t.ADDRESS_IP6,
- sw_if_index=self.pg1.sw_if_index)
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.pg1.sw_if_index,
+ )
+ def test_interface_dump(self):
+ """uRPF Interface Dump"""
-if __name__ == '__main__':
+ self.create_loopback_interfaces(3)
+ e = VppEnum
+ self.vapi.urpf_update(
+ is_input=True,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.loop1.sw_if_index,
+ )
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.loop2.sw_if_index,
+ )
+
+ ret = self.vapi.urpf_interface_dump()
+ self.assertEqual(len(ret), 2)
+
+ dump_loop1 = ret[0]
+ dump_loop2 = ret[1]
+ self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
+ self.assertTrue(dump_loop1.is_input)
+ self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
+ self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
+ self.assertEqual(dump_loop2.sw_if_index, self.loop2.sw_if_index)
+ self.assertFalse(dump_loop2.is_input)
+ self.assertEqual(dump_loop2.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE)
+ self.assertEqual(dump_loop2.af, e.vl_api_address_family_t.ADDRESS_IP6)
+
+ ret = self.vapi.urpf_interface_dump(sw_if_index=self.loop1.sw_if_index)
+ self.assertEqual(len(ret), 1)
+
+ dump_loop1 = ret[0]
+ self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
+ self.assertTrue(dump_loop1.is_input)
+ self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
+ self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
+
+ # cleanup
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP4,
+ sw_if_index=self.loop1.sw_if_index,
+ )
+ self.vapi.urpf_update(
+ is_input=False,
+ mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
+ af=e.vl_api_address_family_t.ADDRESS_IP6,
+ sw_if_index=self.loop2.sw_if_index,
+ )
+
+
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)