diff options
Diffstat (limited to 'src/plugins/nat')
-rw-r--r-- | src/plugins/nat/test/test_nat.py | 65 |
1 files changed, 51 insertions, 14 deletions
diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index ac9c65dd0f5..d5d41288c42 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -33,7 +33,6 @@ from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ from ipaddress import IPv6Network from util import ppc, ppp from socket import inet_pton, AF_INET -from vpp_acl import AclRule, VppAcl, VppAclInterface # NAT HA protocol event data @@ -6526,24 +6525,53 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) # Create an ACL blocking everything - out2in_deny_rule = AclRule(is_permit=0) - out2in_acl = VppAcl(self, rules=[out2in_deny_rule]) - out2in_acl.add_vpp_config() - - # create an ACL to permit/reflect everything - in2out_reflect_rule = AclRule(is_permit=2) - in2out_acl = VppAcl(self, rules=[in2out_reflect_rule]) - in2out_acl.add_vpp_config() + out2in_deny_rule = { + 'is_permit': 0, + 'is_ipv6': 0, + 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), + 'src_ip_prefix_len': 0, + 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), + 'dst_ip_prefix_len': 0, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 65535, + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 65535, + 'proto': 0, + } + out2in_rules = [out2in_deny_rule] + res = self.vapi.acl_add_replace(0xffffffff, out2in_rules) + self.assertEqual(res.retval, 0, "error adding out2in ACL") + out2in_acl = res.acl_index # apply as input acl on interface and confirm it blocks everything - acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=1, acls=[out2in_acl]) - acl_if.add_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[out2in_acl]) self.send_and_assert_no_replies(self.pg1, pkts_out2in) + # create an ACL to permit/reflect everything + in2out_reflect_rule = { + 'is_permit': 2, + 'is_ipv6': 0, + 'src_ip_addr': inet_pton(AF_INET, "0.0.0.0"), + 'src_ip_prefix_len': 0, + 'dst_ip_addr': inet_pton(AF_INET, "0.0.0.0"), + 'dst_ip_prefix_len': 0, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 65535, + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 65535, + 'proto': 0, + } + in2out_rules = [in2out_reflect_rule] + res = self.vapi.acl_add_replace(0xffffffff, in2out_rules) + self.assertEqual(res.retval, 0, "error adding in2out ACL") + in2out_acl = res.acl_index + # apply output acl - acl_if.acls = [out2in_acl, in2out_acl] - acl_if.add_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[out2in_acl, in2out_acl]) # send in2out to generate ACL state (NAT state was created earlier) capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, len(pkts_in2out)) @@ -6559,6 +6587,15 @@ class TestNAT44EndpointDependent(MethodHolder): self.verify_capture_in(capture, self.pg0) self.logger.info(self.vapi.cli("show trace")) + # Clean up + # Remove ACLs from interface + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=0, + acls=[]) + # delete ACLs + self.vapi.acl_del(acl_index=out2in_acl, expected_retval=0) + self.vapi.acl_del(acl_index=in2out_acl, expected_retval=0) + def test_multiple_vrf(self): """ Multiple VRF setup """ external_addr = '1.2.3.4' |