aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/nat
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/nat')
-rw-r--r--src/plugins/nat/test/test_nat.py65
1 files changed, 51 insertions, 14 deletions
diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py
index ac9c65dd0f5..d5d41288c42 100644
--- a/src/plugins/nat/test/test_nat.py
+++ b/src/plugins/nat/test/test_nat.py
@@ -33,7 +33,6 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
from ipaddress import IPv6Network
from util import ppc, ppp
from socket import inet_pton, AF_INET
-from vpp_acl import AclRule, VppAcl, VppAclInterface
# NAT HA protocol event data
@@ -6526,24 +6525,53 @@ class TestNAT44EndpointDependent(MethodHolder):
self.verify_capture_in(capture, self.pg0)
# Create an ACL blocking everything
- out2in_deny_rule = AclRule(is_permit=0)
- out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
- out2in_acl.add_vpp_config()
-
- # create an ACL to permit/reflect everything
- in2out_reflect_rule = AclRule(is_permit=2)
- in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
- in2out_acl.add_vpp_config()
+ out2in_deny_rule = {
+ 'is_permit': 0,
+ 'is_ipv6': 0,
+ 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+ 'src_ip_prefix_len': 0,
+ 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+ 'dst_ip_prefix_len': 0,
+ 'srcport_or_icmptype_first': 0,
+ 'srcport_or_icmptype_last': 65535,
+ 'dstport_or_icmpcode_first': 0,
+ 'dstport_or_icmpcode_last': 65535,
+ 'proto': 0,
+ }
+ out2in_rules = [out2in_deny_rule]
+ res = self.vapi.acl_add_replace(0xffffffff, out2in_rules)
+ self.assertEqual(res.retval, 0, "error adding out2in ACL")
+ out2in_acl = res.acl_index
# apply as input acl on interface and confirm it blocks everything
- acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=1, acls=[out2in_acl])
- acl_if.add_vpp_config()
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[out2in_acl])
self.send_and_assert_no_replies(self.pg1, pkts_out2in)
+ # create an ACL to permit/reflect everything
+ in2out_reflect_rule = {
+ 'is_permit': 2,
+ 'is_ipv6': 0,
+ 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+ 'src_ip_prefix_len': 0,
+ 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"),
+ 'dst_ip_prefix_len': 0,
+ 'srcport_or_icmptype_first': 0,
+ 'srcport_or_icmptype_last': 65535,
+ 'dstport_or_icmpcode_first': 0,
+ 'dstport_or_icmpcode_last': 65535,
+ 'proto': 0,
+ }
+ in2out_rules = [in2out_reflect_rule]
+ res = self.vapi.acl_add_replace(0xffffffff, in2out_rules)
+ self.assertEqual(res.retval, 0, "error adding in2out ACL")
+ in2out_acl = res.acl_index
+
# apply output acl
- acl_if.acls = [out2in_acl, in2out_acl]
- acl_if.add_vpp_config()
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[out2in_acl, in2out_acl])
# send in2out to generate ACL state (NAT state was created earlier)
capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
len(pkts_in2out))
@@ -6559,6 +6587,15 @@ class TestNAT44EndpointDependent(MethodHolder):
self.verify_capture_in(capture, self.pg0)
self.logger.info(self.vapi.cli("show trace"))
+ # Clean up
+ # Remove ACLs from interface
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=0,
+ acls=[])
+ # delete ACLs
+ self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0)
+ self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0)
+
def test_multiple_vrf(self):
""" Multiple VRF setup """
external_addr = '1.2.3.4'