summaryrefslogtreecommitdiffstats
path: root/src/plugins/abf/test/test_abf.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/abf/test/test_abf.py')
-rw-r--r--src/plugins/abf/test/test_abf.py330
1 files changed, 0 insertions, 330 deletions
diff --git a/src/plugins/abf/test/test_abf.py b/src/plugins/abf/test/test_abf.py
deleted file mode 100644
index 097476b879a..00000000000
--- a/src/plugins/abf/test/test_abf.py
+++ /dev/null
@@ -1,330 +0,0 @@
-#!/usr/bin/env python3
-
-from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
-import unittest
-
-from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
- VppIpTable, FibPathProto
-from vpp_acl import AclRule, VppAcl
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import IPv6
-from ipaddress import IPv4Network, IPv6Network
-
-from vpp_object import VppObject
-
-NUM_PKTS = 67
-
-
-def find_abf_policy(test, id):
- policies = test.vapi.abf_policy_dump()
- for p in policies:
- if id == p.policy.policy_id:
- return True
- return False
-
-
-def find_abf_itf_attach(test, id, sw_if_index):
- attachs = test.vapi.abf_itf_attach_dump()
- for a in attachs:
- if id == a.attach.policy_id and \
- sw_if_index == a.attach.sw_if_index:
- return True
- return False
-
-
-class VppAbfPolicy(VppObject):
-
- def __init__(self,
- test,
- policy_id,
- acl,
- paths):
- self._test = test
- self.policy_id = policy_id
- self.acl = acl
- self.paths = paths
- self.encoded_paths = []
- for path in self.paths:
- self.encoded_paths.append(path.encode())
-
- def add_vpp_config(self):
- self._test.vapi.abf_policy_add_del(
- 1,
- {'policy_id': self.policy_id,
- 'acl_index': self.acl.acl_index,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
- self._test.registry.register(self, self._test.logger)
-
- def remove_vpp_config(self):
- self._test.vapi.abf_policy_add_del(
- 0,
- {'policy_id': self.policy_id,
- 'acl_index': self.acl.acl_index,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
-
- def query_vpp_config(self):
- return find_abf_policy(self._test, self.policy_id)
-
- def object_id(self):
- return ("abf-policy-%d" % self.policy_id)
-
-
-class VppAbfAttach(VppObject):
-
- def __init__(self,
- test,
- policy_id,
- sw_if_index,
- priority,
- is_ipv6=0):
- self._test = test
- self.policy_id = policy_id
- self.sw_if_index = sw_if_index
- self.priority = priority
- self.is_ipv6 = is_ipv6
-
- def add_vpp_config(self):
- self._test.vapi.abf_itf_attach_add_del(
- 1,
- {'policy_id': self.policy_id,
- 'sw_if_index': self.sw_if_index,
- 'priority': self.priority,
- 'is_ipv6': self.is_ipv6})
- self._test.registry.register(self, self._test.logger)
-
- def remove_vpp_config(self):
- self._test.vapi.abf_itf_attach_add_del(
- 0,
- {'policy_id': self.policy_id,
- 'sw_if_index': self.sw_if_index,
- 'priority': self.priority,
- 'is_ipv6': self.is_ipv6})
-
- def query_vpp_config(self):
- return find_abf_itf_attach(self._test,
- self.policy_id,
- self.sw_if_index)
-
- def object_id(self):
- return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
-
-
-class TestAbf(VppTestCase):
- """ ABF Test Case """
-
- @classmethod
- def setUpClass(cls):
- super(TestAbf, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestAbf, cls).tearDownClass()
-
- def setUp(self):
- super(TestAbf, self).setUp()
-
- self.create_pg_interfaces(range(5))
-
- for i in self.pg_interfaces[:4]:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- i.config_ip6()
- i.resolve_ndp()
-
- def tearDown(self):
- for i in self.pg_interfaces:
- i.unconfig_ip4()
- i.unconfig_ip6()
- i.admin_down()
- super(TestAbf, self).tearDown()
-
- def test_abf4(self):
- """ IPv4 ACL Based Forwarding
- """
-
- #
- # We are not testing the various matching capabilities
- # of ACLs, that's done elsewhere. Here ware are testing
- # the application of ACLs to a forwarding path to achieve
- # ABF
- # So we construct just a few ACLs to ensure the ABF policies
- # are correctly constructed and used. And a few path types
- # to test the API path decoding.
- #
-
- #
- # Rule 1
- #
- rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
- src_prefix=IPv4Network("1.1.1.1/32"),
- dst_prefix=IPv4Network("1.1.1.2/32"))
- acl_1 = VppAcl(self, rules=[rule_1])
- acl_1.add_vpp_config()
-
- #
- # ABF policy for ACL 1 - path via interface 1
- #
- abf_1 = VppAbfPolicy(self, 10, acl_1,
- [VppRoutePath(self.pg1.remote_ip4,
- self.pg1.sw_if_index)])
- abf_1.add_vpp_config()
-
- #
- # Attach the policy to input interface Pg0
- #
- attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
- attach_1.add_vpp_config()
-
- #
- # fire in packet matching the ACL src,dst. If it's forwarded
- # then the ABF was successful, since default routing will drop it
- #
- p_1 = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
-
- #
- # Attach a 'better' priority policy to the same interface
- #
- abf_2 = VppAbfPolicy(self, 11, acl_1,
- [VppRoutePath(self.pg2.remote_ip4,
- self.pg2.sw_if_index)])
- abf_2.add_vpp_config()
- attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
- attach_2.add_vpp_config()
-
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
-
- #
- # Attach a policy with priority in the middle
- #
- abf_3 = VppAbfPolicy(self, 12, acl_1,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)])
- abf_3.add_vpp_config()
- attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
- attach_3.add_vpp_config()
-
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
-
- #
- # remove the best priority
- #
- attach_2.remove_vpp_config()
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
-
- #
- # Attach one of the same policies to Pg1
- #
- attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
- attach_4.add_vpp_config()
-
- p_2 = (Ether(src=self.pg1.remote_mac,
- dst=self.pg1.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
- self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
-
- #
- # detach the policy from PG1, now expect traffic to be dropped
- #
- attach_4.remove_vpp_config()
-
- self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
-
- #
- # Swap to route via a next-hop in the non-default table
- #
- table_20 = VppIpTable(self, 20)
- table_20.add_vpp_config()
-
- self.pg4.set_table_ip4(table_20.table_id)
- self.pg4.admin_up()
- self.pg4.config_ip4()
- self.pg4.resolve_arp()
-
- abf_13 = VppAbfPolicy(self, 13, acl_1,
- [VppRoutePath(self.pg4.remote_ip4,
- 0xffffffff,
- nh_table_id=table_20.table_id)])
- abf_13.add_vpp_config()
- attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
- attach_5.add_vpp_config()
-
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
-
- self.pg4.unconfig_ip4()
- self.pg4.set_table_ip4(0)
-
- def test_abf6(self):
- """ IPv6 ACL Based Forwarding
- """
-
- #
- # Simple test for matching IPv6 packets
- #
-
- #
- # Rule 1
- #
- rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
- src_prefix=IPv6Network("2001::2/128"),
- dst_prefix=IPv6Network("2001::1/128"))
- acl_1 = VppAcl(self, rules=[rule_1])
- acl_1.add_vpp_config()
-
- #
- # ABF policy for ACL 1 - path via interface 1
- #
- abf_1 = VppAbfPolicy(self, 10, acl_1,
- [VppRoutePath("3001::1",
- 0xffffffff)])
- abf_1.add_vpp_config()
-
- attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
- 45, is_ipv6=True)
- attach_1.add_vpp_config()
-
- #
- # a packet matching the rule
- #
- p = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src="2001::2", dst="2001::1") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
-
- #
- # packets are dropped because there is no route to the policy's
- # next hop
- #
- self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
-
- #
- # add a route resolving the next-hop
- #
- route = VppIpRoute(self, "3001::1", 32,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
- route.add_vpp_config()
-
- #
- # now expect packets forwarded.
- #
- self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)