summaryrefslogtreecommitdiffstats
path: root/test/test_abf.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_abf.py')
-rw-r--r--test/test_abf.py200
1 files changed, 109 insertions, 91 deletions
diff --git a/test/test_abf.py b/test/test_abf.py
index 097476b879a..856d02a8185 100644
--- a/test/test_abf.py
+++ b/test/test_abf.py
@@ -5,8 +5,13 @@ import unittest
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
- VppIpTable, FibPathProto
+from vpp_ip_route import (
+ VppIpRoute,
+ VppRoutePath,
+ VppMplsLabel,
+ VppIpTable,
+ FibPathProto,
+)
from vpp_acl import AclRule, VppAcl
from scapy.packet import Raw
@@ -31,19 +36,13 @@ def find_abf_policy(test, id):
def find_abf_itf_attach(test, id, sw_if_index):
attachs = test.vapi.abf_itf_attach_dump()
for a in attachs:
- if id == a.attach.policy_id and \
- sw_if_index == a.attach.sw_if_index:
+ if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
return True
return False
class VppAbfPolicy(VppObject):
-
- def __init__(self,
- test,
- policy_id,
- acl,
- paths):
+ def __init__(self, test, policy_id, acl, paths):
self._test = test
self.policy_id = policy_id
self.acl = acl
@@ -55,35 +54,35 @@ class VppAbfPolicy(VppObject):
def add_vpp_config(self):
self._test.vapi.abf_policy_add_del(
1,
- {'policy_id': self.policy_id,
- 'acl_index': self.acl.acl_index,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
+ {
+ "policy_id": self.policy_id,
+ "acl_index": self.acl.acl_index,
+ "n_paths": len(self.paths),
+ "paths": self.encoded_paths,
+ },
+ )
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.abf_policy_add_del(
0,
- {'policy_id': self.policy_id,
- 'acl_index': self.acl.acl_index,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
+ {
+ "policy_id": self.policy_id,
+ "acl_index": self.acl.acl_index,
+ "n_paths": len(self.paths),
+ "paths": self.encoded_paths,
+ },
+ )
def query_vpp_config(self):
return find_abf_policy(self._test, self.policy_id)
def object_id(self):
- return ("abf-policy-%d" % self.policy_id)
+ return "abf-policy-%d" % self.policy_id
class VppAbfAttach(VppObject):
-
- def __init__(self,
- test,
- policy_id,
- sw_if_index,
- priority,
- is_ipv6=0):
+ def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
self._test = test
self.policy_id = policy_id
self.sw_if_index = sw_if_index
@@ -93,31 +92,35 @@ class VppAbfAttach(VppObject):
def add_vpp_config(self):
self._test.vapi.abf_itf_attach_add_del(
1,
- {'policy_id': self.policy_id,
- 'sw_if_index': self.sw_if_index,
- 'priority': self.priority,
- 'is_ipv6': self.is_ipv6})
+ {
+ "policy_id": self.policy_id,
+ "sw_if_index": self.sw_if_index,
+ "priority": self.priority,
+ "is_ipv6": self.is_ipv6,
+ },
+ )
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.abf_itf_attach_add_del(
0,
- {'policy_id': self.policy_id,
- 'sw_if_index': self.sw_if_index,
- 'priority': self.priority,
- 'is_ipv6': self.is_ipv6})
+ {
+ "policy_id": self.policy_id,
+ "sw_if_index": self.sw_if_index,
+ "priority": self.priority,
+ "is_ipv6": self.is_ipv6,
+ },
+ )
def query_vpp_config(self):
- return find_abf_itf_attach(self._test,
- self.policy_id,
- self.sw_if_index)
+ return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
def object_id(self):
- return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
+ return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
class TestAbf(VppTestCase):
- """ ABF Test Case """
+ """ABF Test Case"""
@classmethod
def setUpClass(cls):
@@ -147,8 +150,7 @@ class TestAbf(VppTestCase):
super(TestAbf, self).tearDown()
def test_abf4(self):
- """ IPv4 ACL Based Forwarding
- """
+ """IPv4 ACL Based Forwarding"""
#
# We are not testing the various matching capabilities
@@ -163,18 +165,22 @@ class TestAbf(VppTestCase):
#
# Rule 1
#
- rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
- src_prefix=IPv4Network("1.1.1.1/32"),
- dst_prefix=IPv4Network("1.1.1.2/32"))
+ rule_1 = AclRule(
+ is_permit=1,
+ proto=17,
+ ports=1234,
+ src_prefix=IPv4Network("1.1.1.1/32"),
+ dst_prefix=IPv4Network("1.1.1.2/32"),
+ )
acl_1 = VppAcl(self, rules=[rule_1])
acl_1.add_vpp_config()
#
# ABF policy for ACL 1 - path via interface 1
#
- abf_1 = VppAbfPolicy(self, 10, acl_1,
- [VppRoutePath(self.pg1.remote_ip4,
- self.pg1.sw_if_index)])
+ abf_1 = VppAbfPolicy(
+ self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
+ )
abf_1.add_vpp_config()
#
@@ -187,42 +193,43 @@ class TestAbf(VppTestCase):
# fire in packet matching the ACL src,dst. If it's forwarded
# then the ABF was successful, since default routing will drop it
#
- p_1 = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
+ p_1 = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
#
# Attach a 'better' priority policy to the same interface
#
- abf_2 = VppAbfPolicy(self, 11, acl_1,
- [VppRoutePath(self.pg2.remote_ip4,
- self.pg2.sw_if_index)])
+ abf_2 = VppAbfPolicy(
+ self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
+ )
abf_2.add_vpp_config()
attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
attach_2.add_vpp_config()
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+ self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
#
# Attach a policy with priority in the middle
#
- abf_3 = VppAbfPolicy(self, 12, acl_1,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)])
+ abf_3 = VppAbfPolicy(
+ self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
+ )
abf_3.add_vpp_config()
attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
attach_3.add_vpp_config()
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
+ self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
#
# remove the best priority
#
attach_2.remove_vpp_config()
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
+ self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
#
# Attach one of the same policies to Pg1
@@ -230,11 +237,12 @@ class TestAbf(VppTestCase):
attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
attach_4.add_vpp_config()
- p_2 = (Ether(src=self.pg1.remote_mac,
- dst=self.pg1.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
+ p_2 = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
#
@@ -255,22 +263,27 @@ class TestAbf(VppTestCase):
self.pg4.config_ip4()
self.pg4.resolve_arp()
- abf_13 = VppAbfPolicy(self, 13, acl_1,
- [VppRoutePath(self.pg4.remote_ip4,
- 0xffffffff,
- nh_table_id=table_20.table_id)])
+ abf_13 = VppAbfPolicy(
+ self,
+ 13,
+ acl_1,
+ [
+ VppRoutePath(
+ self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
+ )
+ ],
+ )
abf_13.add_vpp_config()
attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
attach_5.add_vpp_config()
- self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
+ self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
self.pg4.unconfig_ip4()
self.pg4.set_table_ip4(0)
def test_abf6(self):
- """ IPv6 ACL Based Forwarding
- """
+ """IPv6 ACL Based Forwarding"""
#
# Simple test for matching IPv6 packets
@@ -279,32 +292,34 @@ class TestAbf(VppTestCase):
#
# Rule 1
#
- rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
- src_prefix=IPv6Network("2001::2/128"),
- dst_prefix=IPv6Network("2001::1/128"))
+ rule_1 = AclRule(
+ is_permit=1,
+ proto=17,
+ ports=1234,
+ src_prefix=IPv6Network("2001::2/128"),
+ dst_prefix=IPv6Network("2001::1/128"),
+ )
acl_1 = VppAcl(self, rules=[rule_1])
acl_1.add_vpp_config()
#
# ABF policy for ACL 1 - path via interface 1
#
- abf_1 = VppAbfPolicy(self, 10, acl_1,
- [VppRoutePath("3001::1",
- 0xffffffff)])
+ abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
abf_1.add_vpp_config()
- attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
- 45, is_ipv6=True)
+ attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
attach_1.add_vpp_config()
#
# a packet matching the rule
#
- p = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src="2001::2", dst="2001::1") /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src="2001::2", dst="2001::1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
#
# packets are dropped because there is no route to the policy's
@@ -315,9 +330,12 @@ class TestAbf(VppTestCase):
#
# add a route resolving the next-hop
#
- route = VppIpRoute(self, "3001::1", 32,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self,
+ "3001::1",
+ 32,
+ [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+ )
route.add_vpp_config()
#
@@ -326,5 +344,5 @@ class TestAbf(VppTestCase):
self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)