aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_dvr.py31
-rw-r--r--test/test_pipe.py64
-rw-r--r--test/vpp_papi_provider.py120
3 files changed, 176 insertions, 39 deletions
diff --git a/test/test_dvr.py b/test/test_dvr.py
index 8531b8553ca..d5ffd3b1577 100644
--- a/test/test_dvr.py
+++ b/test/test_dvr.py
@@ -5,13 +5,11 @@ from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
from vpp_l2 import L2_PORT_TYPE
from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
-from vpp_acl import AclRule, VppAcl, VppAclInterface
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
from socket import AF_INET, inet_pton
-from ipaddress import IPv4Network
NUM_PKTS = 67
@@ -188,18 +186,26 @@ class TestDVR(VppTestCase):
#
# Add an output L3 ACL that will block the traffic
#
- rule_1 = AclRule(is_permit=0, proto=17, ports=1234,
- src_prefix=IPv4Network((any_src_addr, 32)),
- dst_prefix=IPv4Network((ip_non_tag_bridged, 32)))
- acl = VppAcl(self, rules=[rule_1])
- acl.add_vpp_config()
+ rule_1 = ({'is_permit': 0,
+ 'is_ipv6': 0,
+ 'proto': 17,
+ 'srcport_or_icmptype_first': 1234,
+ 'srcport_or_icmptype_last': 1234,
+ 'src_ip_prefix_len': 32,
+ 'src_ip_addr': inet_pton(AF_INET, any_src_addr),
+ 'dstport_or_icmpcode_first': 1234,
+ 'dstport_or_icmpcode_last': 1234,
+ 'dst_ip_prefix_len': 32,
+ 'dst_ip_addr': inet_pton(AF_INET, ip_non_tag_bridged)})
+ acl = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=[rule_1])
#
# Apply the ACL on the output interface
#
- acl_if1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=0, acls=[acl])
- acl_if1.add_vpp_config()
+ self.vapi.acl_interface_set_acl_list(self.pg1.sw_if_index,
+ 0,
+ [acl.acl_index])
#
# Send packet's that should match the ACL and be dropped
@@ -210,8 +216,9 @@ class TestDVR(VppTestCase):
#
# cleanup
#
- acl_if1.remove_vpp_config()
- acl.remove_vpp_config()
+ self.vapi.acl_interface_set_acl_list(self.pg1.sw_if_index,
+ 0, [])
+ self.vapi.acl_del(acl.acl_index)
self.vapi.sw_interface_set_l2_bridge(
rx_sw_if_index=self.pg0.sw_if_index, bd_id=1, enable=0)
diff --git a/test/test_pipe.py b/test/test_pipe.py
index 0e766654d2a..6e3edca3c50 100644
--- a/test/test_pipe.py
+++ b/test/test_pipe.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
from socket import AF_INET, AF_INET6, inet_pton
import unittest
-from ipaddress import IPv4Network
from scapy.packet import Raw
from scapy.layers.l2 import Ether
@@ -10,7 +9,6 @@ from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
from vpp_interface import VppInterface
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
-from vpp_acl import AclRule, VppAcl, VppAclInterface
NUM_PKTS = 67
@@ -124,30 +122,39 @@ class TestPipe(VppTestCase):
#
# Attach ACL to ensure features are run on the pipe
#
- rule_1 = AclRule(is_permit=0, proto=17,
- src_prefix=IPv4Network("1.1.1.1/32"),
- dst_prefix=IPv4Network("1.1.1.2/32"), ports=1234)
- acl = VppAcl(self, rules=[rule_1])
- acl.add_vpp_config()
+ rule_1 = ({'is_permit': 0,
+ 'is_ipv6': 0,
+ 'proto': 17,
+ 'srcport_or_icmptype_first': 1234,
+ 'srcport_or_icmptype_last': 1234,
+ 'src_ip_prefix_len': 32,
+ 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
+ 'dstport_or_icmpcode_first': 1234,
+ 'dstport_or_icmpcode_last': 1234,
+ 'dst_ip_prefix_len': 32,
+ 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
+ acl = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=[rule_1])
# Apply the ACL on the pipe on output
- acl_if_e = VppAclInterface(self, sw_if_index=pipes[0].east, n_input=0,
- acls=[acl])
- acl_if_e.add_vpp_config()
-
+ self.vapi.acl_interface_set_acl_list(pipes[0].east,
+ 0,
+ [acl.acl_index])
self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS)
self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
# remove from output and apply on input
- acl_if_e.remove_vpp_config()
- acl_if_w = VppAclInterface(self, sw_if_index=pipes[0].west, n_input=1,
- acls=[acl])
- acl_if_w.add_vpp_config()
-
+ self.vapi.acl_interface_set_acl_list(pipes[0].east,
+ 0,
+ [])
+ self.vapi.acl_interface_set_acl_list(pipes[0].west,
+ 1,
+ [acl.acl_index])
self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS)
self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
-
- acl_if_w.remove_vpp_config()
+ self.vapi.acl_interface_set_acl_list(pipes[0].west,
+ 0,
+ [])
self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
@@ -220,21 +227,24 @@ class TestPipe(VppTestCase):
#
# Use ACLs to test features run on the Pipes
#
- acl_if_e1 = VppAclInterface(self, sw_if_index=pipes[1].east, n_input=0,
- acls=[acl])
- acl_if_e1.add_vpp_config()
+ self.vapi.acl_interface_set_acl_list(pipes[1].east,
+ 0,
+ [acl.acl_index])
self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS)
self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
# remove from output and apply on input
- acl_if_e1.remove_vpp_config()
- acl_if_w1 = VppAclInterface(self, sw_if_index=pipes[1].west, n_input=1,
- acls=[acl])
- acl_if_w1.add_vpp_config()
+ self.vapi.acl_interface_set_acl_list(pipes[1].east,
+ 0,
+ [])
+ self.vapi.acl_interface_set_acl_list(pipes[1].west,
+ 1,
+ [acl.acl_index])
self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS)
self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
- acl_if_w1.remove_vpp_config()
-
+ self.vapi.acl_interface_set_acl_list(pipes[1].west,
+ 0,
+ [])
self.send_and_expect(self.pg2, p_east * NUM_PKTS, self.pg3)
self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2)
diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py
index 1e0226c878e..651e07a98b1 100644
--- a/test/vpp_papi_provider.py
+++ b/test/vpp_papi_provider.py
@@ -925,6 +925,126 @@ class VppPapiProvider(object):
return self.api(self.papi.sr_mpls_policy_del,
{'bsid': bsid})
+ def acl_add_replace(self, acl_index, r, tag='',
+ expected_retval=0):
+ """Add/replace an ACL
+ :param int acl_index: ACL index to replace, 2^32-1 to create new ACL.
+ :param acl_rule r: ACL rules array.
+ :param str tag: symbolic tag (description) for this ACL.
+ :param int count: number of rules.
+ """
+ return self.api(self.papi.acl_add_replace,
+ {'acl_index': acl_index,
+ 'r': r,
+ 'count': len(r),
+ 'tag': tag},
+ expected_retval=expected_retval)
+
+ def acl_del(self, acl_index, expected_retval=0):
+ """
+
+ :param acl_index:
+ :return:
+ """
+ return self.api(self.papi.acl_del,
+ {'acl_index': acl_index},
+ expected_retval=expected_retval)
+
+ def acl_interface_set_acl_list(self, sw_if_index, n_input, acls,
+ expected_retval=0):
+ return self.api(self.papi.acl_interface_set_acl_list,
+ {'sw_if_index': sw_if_index,
+ 'count': len(acls),
+ 'n_input': n_input,
+ 'acls': acls},
+ expected_retval=expected_retval)
+
+ def acl_interface_set_etype_whitelist(self, sw_if_index,
+ n_input, whitelist,
+ expected_retval=0):
+ return self.api(self.papi.acl_interface_set_etype_whitelist,
+ {'sw_if_index': sw_if_index,
+ 'count': len(whitelist),
+ 'n_input': n_input,
+ 'whitelist': whitelist},
+ expected_retval=expected_retval)
+
+ def acl_interface_add_del(self,
+ sw_if_index,
+ acl_index,
+ is_add=1):
+ """ Add/Delete ACL to/from interface
+
+ :param sw_if_index:
+ :param acl_index:
+ :param is_add: (Default value = 1)
+ """
+
+ return self.api(self.papi.acl_interface_add_del,
+ {'is_add': is_add,
+ 'is_input': 1,
+ 'sw_if_index': sw_if_index,
+ 'acl_index': acl_index})
+
+ def acl_dump(self, acl_index, expected_retval=0):
+ return self.api(self.papi.acl_dump,
+ {'acl_index': acl_index},
+ expected_retval=expected_retval)
+
+ def acl_interface_list_dump(self, sw_if_index=0xFFFFFFFF,
+ expected_retval=0):
+ return self.api(self.papi.acl_interface_list_dump,
+ {'sw_if_index': sw_if_index},
+ expected_retval=expected_retval)
+
+ def macip_acl_add(self, rules, tag=""):
+ """ Add MACIP acl
+
+ :param rules: list of rules for given acl
+ :param tag: acl tag
+ """
+
+ return self.api(self.papi.macip_acl_add,
+ {'r': rules,
+ 'count': len(rules),
+ 'tag': tag})
+
+ def macip_acl_add_replace(self, rules, acl_index=0xFFFFFFFF, tag=""):
+ """ Add MACIP acl
+
+ :param rules: list of rules for given acl
+ :param tag: acl tag
+ """
+
+ return self.api(self.papi.macip_acl_add_replace,
+ {'acl_index': acl_index,
+ 'r': rules,
+ 'count': len(rules),
+ 'tag': tag})
+
+ def macip_acl_interface_add_del(self,
+ sw_if_index,
+ acl_index,
+ is_add=1):
+ """ Add MACIP acl to interface
+
+ :param sw_if_index:
+ :param acl_index:
+ :param is_add: (Default value = 1)
+ """
+
+ return self.api(self.papi.macip_acl_interface_add_del,
+ {'is_add': is_add,
+ 'sw_if_index': sw_if_index,
+ 'acl_index': acl_index})
+
+ def macip_acl_dump(self, acl_index=4294967295):
+ """ Return MACIP acl dump
+ """
+
+ return self.api(
+ self.papi.macip_acl_dump, {'acl_index': acl_index})
+
def ip_punt_police(self,
policer_index,
is_ip6=0,