aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_acl_plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_acl_plugin.py')
-rw-r--r--test/test_acl_plugin.py999
1 files changed, 592 insertions, 407 deletions
diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py
index 53d96215949..32ecedba2a6 100644
--- a/test/test_acl_plugin.py
+++ b/test/test_acl_plugin.py
@@ -22,7 +22,7 @@ from vpp_ip import INVALID_INDEX
@tag_fixme_vpp_workers
class TestACLplugin(VppTestCase):
- """ ACL plugin Test Case """
+ """ACL plugin Test Case"""
# traffic types
IP = 0
@@ -39,7 +39,7 @@ class TestACLplugin(VppTestCase):
# supported protocols
proto = [[6, 17], [1, 58]]
- proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
+ proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
ICMPv4 = 0
ICMPv6 = 1
TCP = 0
@@ -105,11 +105,11 @@ class TestACLplugin(VppTestCase):
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
- learn=1)
+ cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1)
for pg_if in cls.pg_interfaces:
cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
+ )
# Set up all interfaces
for i in cls.pg_interfaces:
@@ -135,14 +135,16 @@ class TestACLplugin(VppTestCase):
for pg_if in cls.pg_interfaces:
i += 1
start_nr = macs_per_if * i + start
- end_nr = count + start if i == (n_int - 1) \
- else macs_per_if * (i + 1) + start
+ end_nr = (
+ count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
+ )
hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
for j in range(int(start_nr), int(end_nr)):
host = Host(
"00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
"172.17.1%02x.%u" % (pg_if.sw_if_index, j),
- "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+ "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
+ )
hosts.append(host)
except Exception:
@@ -176,20 +178,32 @@ class TestACLplugin(VppTestCase):
self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
-
- def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
- s_prefix=0, s_ip=0,
- d_prefix=0, d_ip=0):
+ self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id))
+
+ def create_rule(
+ self,
+ ip=0,
+ permit_deny=0,
+ ports=PORTS_ALL,
+ proto=-1,
+ s_prefix=0,
+ s_ip=0,
+ d_prefix=0,
+ d_ip=0,
+ ):
if ip:
src_prefix = IPv6Network((s_ip, s_prefix))
dst_prefix = IPv6Network((d_ip, d_prefix))
else:
src_prefix = IPv4Network((s_ip, s_prefix))
dst_prefix = IPv4Network((d_ip, d_prefix))
- return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
- src_prefix=src_prefix, dst_prefix=dst_prefix)
+ return AclRule(
+ is_permit=permit_deny,
+ ports=ports,
+ proto=proto,
+ src_prefix=src_prefix,
+ dst_prefix=dst_prefix,
+ )
def apply_rules(self, rules, tag=None):
acl = VppAcl(self, rules, tag=tag)
@@ -198,7 +212,8 @@ class TestACLplugin(VppTestCase):
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
acl_if = VppAclInterface(
- self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
+ self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]
+ )
acl_if.add_vpp_config()
return acl.acl_index
@@ -207,8 +222,7 @@ class TestACLplugin(VppTestCase):
acl.add_vpp_config()
self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
- acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
- acls=[acl])
+ acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, acls=[acl])
return acl.acl_index
def etype_whitelist(self, whitelist, n_input, add=True):
@@ -216,9 +230,14 @@ class TestACLplugin(VppTestCase):
if add:
self._wl = []
for i in self.pg_interfaces:
- self._wl.append(VppEtypeWhitelist(
- self, sw_if_index=i.sw_if_index, whitelist=whitelist,
- n_input=n_input).add_vpp_config())
+ self._wl.append(
+ VppEtypeWhitelist(
+ self,
+ sw_if_index=i.sw_if_index,
+ whitelist=whitelist,
+ n_input=n_input,
+ ).add_vpp_config()
+ )
else:
if hasattr(self, "_wl"):
for wl in self._wl:
@@ -226,27 +245,36 @@ class TestACLplugin(VppTestCase):
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
- if p == 'UDP':
+ if p == "UDP":
if ports == 0:
- return UDP(sport=random.randint(self.udp_sport_from,
- self.udp_sport_to),
- dport=random.randint(self.udp_dport_from,
- self.udp_dport_to))
+ return UDP(
+ sport=random.randint(self.udp_sport_from, self.udp_sport_to),
+ dport=random.randint(self.udp_dport_from, self.udp_dport_to),
+ )
else:
return UDP(sport=ports, dport=ports)
- elif p == 'TCP':
+ elif p == "TCP":
if ports == 0:
- return TCP(sport=random.randint(self.tcp_sport_from,
- self.tcp_sport_to),
- dport=random.randint(self.tcp_dport_from,
- self.tcp_dport_to))
+ return TCP(
+ sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
+ dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
+ )
else:
return TCP(sport=ports, dport=ports)
- return ''
-
- def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False,
- pkt_raw=True, etype=-1):
+ return ""
+
+ def create_stream(
+ self,
+ src_if,
+ packet_sizes,
+ traffic_type=0,
+ ipv6=0,
+ proto=-1,
+ ports=0,
+ fragments=False,
+ pkt_raw=True,
+ etype=-1,
+ ):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
@@ -279,26 +307,25 @@ class TestACLplugin(VppTestCase):
payload = self.info_to_payload(pkt_info)
p = Ether(dst=dst_host.mac, src=src_host.mac)
if etype > 0:
- p = Ether(dst=dst_host.mac,
- src=src_host.mac,
- type=etype)
+ p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
if pkt_info.ip:
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
if fragments:
p /= IPv6ExtHdrFragment(offset=64, m=1)
else:
if fragments:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4,
- flags=1, frag=64)
+ p /= IP(
+ src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
+ )
else:
p /= IP(src=src_host.ip4, dst=dst_host.ip4)
if traffic_type == self.ICMP:
if pkt_info.ip:
- p /= ICMPv6EchoRequest(type=self.icmp6_type,
- code=self.icmp6_code)
+ p /= ICMPv6EchoRequest(
+ type=self.icmp6_type, code=self.icmp6_code
+ )
else:
- p /= ICMP(type=self.icmp4_type,
- code=self.icmp4_code)
+ p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
else:
p /= self.create_upper_layer(i, pkt_info.proto, ports)
if pkt_raw:
@@ -310,8 +337,7 @@ class TestACLplugin(VppTestCase):
pkts.append(p)
return pkts
- def verify_capture(self, pg_if, capture,
- traffic_type=0, ip_type=0, etype=-1):
+ def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
"""
Verify captured input packet stream for defined interface.
@@ -326,22 +352,23 @@ class TestACLplugin(VppTestCase):
for packet in capture:
if etype > 0:
if packet[Ether].type != etype:
- self.logger.error(ppp("Unexpected ethertype in packet:",
- packet))
+ self.logger.error(ppp("Unexpected ethertype in packet:", packet))
else:
continue
try:
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest], 'data')
+ packet[ICMPv6EchoRequest], "data"
+ )
payload = packet[ICMPv6EchoRequest]
else:
payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
if ip_type != 0:
@@ -355,8 +382,9 @@ class TestACLplugin(VppTestCase):
self.assertEqual(payload.type, self.icmp6_type)
self.assertEqual(payload.code, self.icmp6_code)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
else:
try:
@@ -366,12 +394,13 @@ class TestACLplugin(VppTestCase):
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src,
- packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)"
+ % (pg_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
@@ -380,29 +409,26 @@ class TestACLplugin(VppTestCase):
self.assertEqual(ip.src, saved_packet[ip_version].src)
self.assertEqual(ip.dst, saved_packet[ip_version].dst)
p = self.proto_map[payload_info.proto]
- if p == 'TCP':
+ if p == "TCP":
tcp = packet[TCP]
- self.assertEqual(tcp.sport, saved_packet[
- TCP].sport)
- self.assertEqual(tcp.dport, saved_packet[
- TCP].dport)
- elif p == 'UDP':
+ self.assertEqual(tcp.sport, saved_packet[TCP].sport)
+ self.assertEqual(tcp.dport, saved_packet[TCP].dport)
+ elif p == "UDP":
udp = packet[UDP]
- self.assertEqual(udp.sport, saved_packet[
- UDP].sport)
- self.assertEqual(udp.dport, saved_packet[
- UDP].dport)
+ self.assertEqual(udp.sport, saved_packet[UDP].sport)
+ self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error(ppp("Unexpected or invalid packet:",
- packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
+ i, dst_sw_if_index, last_info[i.sw_if_index]
+ )
self.assertTrue(
remaining_packet is None,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
+ "Port %u: Packet expected from source %u didn't arrive"
+ % (dst_sw_if_index, i.sw_if_index),
+ )
def run_traffic_no_check(self):
# Test
@@ -417,16 +443,32 @@ class TestACLplugin(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True, etype=-1):
+ def run_verify_test(
+ self,
+ traffic_type=0,
+ ip_type=0,
+ proto=-1,
+ ports=0,
+ frags=False,
+ pkt_raw=True,
+ etype=-1,
+ ):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
for i in self.pg_interfaces:
if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, pkt_raw, etype)
+ pkts = self.create_stream(
+ i,
+ self.pg_if_packet_sizes,
+ traffic_type,
+ ip_type,
+ proto,
+ ports,
+ frags,
+ pkt_raw,
+ etype,
+ )
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -442,21 +484,28 @@ class TestACLplugin(VppTestCase):
if self.flows.__contains__(src_if):
for dst_if in self.flows[src_if]:
capture = dst_if.get_capture(pkts_cnt)
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- self.verify_capture(dst_if, capture,
- traffic_type, ip_type, etype)
+ self.logger.info("Verifying capture on interface %s" % dst_if.name)
+ self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
- def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False, etype=-1):
+ def run_verify_negat_test(
+ self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
+ ):
# Test
pkts_cnt = 0
self.reset_packet_infos()
for i in self.pg_interfaces:
if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, True, etype)
+ pkts = self.create_stream(
+ i,
+ self.pg_if_packet_sizes,
+ traffic_type,
+ ip_type,
+ proto,
+ ports,
+ frags,
+ True,
+ etype,
+ )
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -471,24 +520,22 @@ class TestACLplugin(VppTestCase):
for src_if in self.pg_interfaces:
if self.flows.__contains__(src_if):
for dst_if in self.flows[src_if]:
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
+ self.logger.info("Verifying capture on interface %s" % dst_if.name)
capture = dst_if.get_capture(0)
self.assertEqual(len(capture), 0)
def test_0000_warmup_test(self):
- """ ACL plugin version check; learn MACs
- """
+ """ACL plugin version check; learn MACs"""
reply = self.vapi.papi.acl_plugin_get_version()
self.assertEqual(reply.major, 1)
- self.logger.info("Working with ACL plugin version: %d.%d" % (
- reply.major, reply.minor))
+ self.logger.info(
+ "Working with ACL plugin version: %d.%d" % (reply.major, reply.minor)
+ )
# minor version changes are non breaking
# self.assertEqual(reply.minor, 0)
def test_0001_acl_create(self):
- """ ACL create/delete test
- """
+ """ACL create/delete test"""
self.logger.info("ACLP_TEST_START_0001")
# Create a permit-1234 ACL
@@ -510,12 +557,13 @@ class TestACLplugin(VppTestCase):
for i_rule in range(0, len(r) - 1):
encoded_rule = r[i_rule].encode()
for rule_key in encoded_rule:
- self.assertEqual(rr[0].r[i_rule][rule_key],
- encoded_rule[rule_key])
+ self.assertEqual(rr[0].r[i_rule][rule_key], encoded_rule[rule_key])
# Create a deny-1234 ACL
- r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
- AclRule(is_permit=1, proto=17, ports=0)]
+ r_deny = [
+ AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+ AclRule(is_permit=1, proto=17, ports=0),
+ ]
second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
second_acl.add_vpp_config()
self.assertTrue(second_acl.query_vpp_config())
@@ -528,8 +576,8 @@ class TestACLplugin(VppTestCase):
# apply an ACL on an interface inbound, try to delete ACL, must fail
acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=1,
- acls=[first_acl])
+ self, sw_if_index=self.pg0.sw_if_index, n_input=1, acls=[first_acl]
+ )
acl_if_list.add_vpp_config()
first_acl.remove_vpp_config(expect_error=True)
# Unapply an ACL and then try to delete it - must be ok
@@ -538,8 +586,8 @@ class TestACLplugin(VppTestCase):
# apply an ACL on an interface inbound, try to delete ACL, must fail
acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=0,
- acls=[second_acl])
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[second_acl]
+ )
acl_if_list.add_vpp_config()
second_acl.remove_vpp_config(expect_error=True)
# Unapply an ACL and then try to delete it - must be ok
@@ -548,22 +596,23 @@ class TestACLplugin(VppTestCase):
# try to apply a nonexistent ACL - must fail
acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=0,
- acls=[invalid_acl])
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0, acls=[invalid_acl]
+ )
acl_if_list.add_vpp_config(expect_error=True)
self.logger.info("ACLP_TEST_FINISH_0001")
def test_0002_acl_permit_apply(self):
- """ permit ACL apply test
- """
+ """permit ACL apply test"""
self.logger.info("ACLP_TEST_START_0002")
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP])
+ )
+ rules.append(
+ self.create_rule(self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP])
+ )
# Apply rules
acl_idx = self.apply_rules(rules, "permit per-flow")
@@ -574,14 +623,14 @@ class TestACLplugin(VppTestCase):
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, -1)
- matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
self.logger.info("stat segment counters: %s" % repr(matches))
cli = "show acl-plugin acl"
self.logger.info(self.vapi.ppcli(cli))
cli = "show acl-plugin tables"
self.logger.info(self.vapi.ppcli(cli))
- total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
+ total_hits = matches[0][0]["packets"] + matches[0][1]["packets"]
self.assertEqual(total_hits, 64)
# disable counters
@@ -590,17 +639,17 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0002")
def test_0003_acl_deny_apply(self):
- """ deny ACL apply test
- """
+ """deny ACL apply test"""
self.logger.info("ACLP_TEST_START_0003")
# Add a deny-flows ACL
rules = []
- rules.append(self.create_rule(
- self.IPV4, self.DENY, self.PORTS_ALL,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_ALL, self.proto[self.IP][self.UDP]
+ )
+ )
# Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
@@ -609,30 +658,34 @@ class TestACLplugin(VppTestCase):
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPV4,
- self.proto[self.IP][self.UDP])
+ self.run_verify_negat_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
- matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx)
self.logger.info("stat segment counters: %s" % repr(matches))
cli = "show acl-plugin acl"
self.logger.info(self.vapi.ppcli(cli))
cli = "show acl-plugin tables"
self.logger.info(self.vapi.ppcli(cli))
- self.assertEqual(matches[0][0]['packets'], 64)
+ self.assertEqual(matches[0][0]["packets"], 64)
# disable counters
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
self.logger.info("ACLP_TEST_FINISH_0003")
# self.assertEqual(, 0)
def test_0004_vpp624_permit_icmpv4(self):
- """ VPP_624 permit ICMPv4
- """
+ """VPP_624 permit ICMPv4"""
self.logger.info("ACLP_TEST_START_0004")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv4]))
+ rules.append(
+ self.create_rule(
+ self.IPV4,
+ self.PERMIT,
+ self.PORTS_RANGE,
+ self.proto[self.ICMP][self.ICMPv4],
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -640,20 +693,24 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit icmpv4")
# Traffic should still pass
- self.run_verify_test(self.ICMP, self.IPV4,
- self.proto[self.ICMP][self.ICMPv4])
+ self.run_verify_test(self.ICMP, self.IPV4, self.proto[self.ICMP][self.ICMPv4])
self.logger.info("ACLP_TEST_FINISH_0004")
def test_0005_vpp624_permit_icmpv6(self):
- """ VPP_624 permit ICMPv6
- """
+ """VPP_624 permit ICMPv6"""
self.logger.info("ACLP_TEST_START_0005")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv6]))
+ rules.append(
+ self.create_rule(
+ self.IPV6,
+ self.PERMIT,
+ self.PORTS_RANGE,
+ self.proto[self.ICMP][self.ICMPv6],
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -661,22 +718,25 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit icmpv6")
# Traffic should still pass
- self.run_verify_test(self.ICMP, self.IPV6,
- self.proto[self.ICMP][self.ICMPv6])
+ self.run_verify_test(self.ICMP, self.IPV6, self.proto[self.ICMP][self.ICMPv6])
self.logger.info("ACLP_TEST_FINISH_0005")
def test_0006_vpp624_deny_icmpv4(self):
- """ VPP_624 deny ICMPv4
- """
+ """VPP_624 deny ICMPv4"""
self.logger.info("ACLP_TEST_START_0006")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv4]))
+ rules.append(
+ self.create_rule(
+ self.IPV4,
+ self.DENY,
+ self.PORTS_RANGE,
+ self.proto[self.ICMP][self.ICMPv4],
+ )
+ )
# permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny icmpv4")
@@ -687,16 +747,20 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0006")
def test_0007_vpp624_deny_icmpv6(self):
- """ VPP_624 deny ICMPv6
- """
+ """VPP_624 deny ICMPv6"""
self.logger.info("ACLP_TEST_START_0007")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv6]))
+ rules.append(
+ self.create_rule(
+ self.IPV6,
+ self.DENY,
+ self.PORTS_RANGE,
+ self.proto[self.ICMP][self.ICMPv6],
+ )
+ )
# deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny icmpv6")
@@ -707,14 +771,16 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0007")
def test_0008_tcp_permit_v4(self):
- """ permit TCPv4
- """
+ """permit TCPv4"""
self.logger.info("ACLP_TEST_START_0008")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -727,14 +793,16 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0008")
def test_0009_tcp_permit_v6(self):
- """ permit TCPv6
- """
+ """permit TCPv6"""
self.logger.info("ACLP_TEST_START_0009")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -747,14 +815,16 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0008")
def test_0010_udp_permit_v4(self):
- """ permit UDPv4
- """
+ """permit UDPv4"""
self.logger.info("ACLP_TEST_START_0010")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -767,14 +837,16 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0010")
def test_0011_udp_permit_v6(self):
- """ permit UDPv6
- """
+ """permit UDPv6"""
self.logger.info("ACLP_TEST_START_0011")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -787,81 +859,89 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0011")
def test_0012_tcp_deny(self):
- """ deny TCPv4/v6
- """
+ """deny TCPv4/v6"""
self.logger.info("ACLP_TEST_START_0012")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP])
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
+ )
self.logger.info("ACLP_TEST_FINISH_0012")
def test_0013_udp_deny(self):
- """ deny UDPv4/v6
- """
+ """deny UDPv4/v6"""
self.logger.info("ACLP_TEST_START_0013")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP])
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
+ )
self.logger.info("ACLP_TEST_FINISH_0013")
def test_0014_acl_dump(self):
- """ verify add/dump acls
- """
+ """verify add/dump acls"""
self.logger.info("ACLP_TEST_START_0014")
- r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
- [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
- [self.IPV4, self.DENY, self.PORTS_ALL, 0],
- [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
- [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
- [self.IPV6, self.DENY, self.PORTS_ALL, 0]
- ]
+ r = [
+ [self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
+ [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
+ [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+ [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+ [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
+ [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
+ [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
+ [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
+ [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
+ [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
+ [self.IPV4, self.DENY, self.PORTS_ALL, 0],
+ [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
+ [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
+ [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
+ [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
+ [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
+ [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
+ [self.IPV6, self.DENY, self.PORTS_ALL, 0],
+ ]
# Add and verify new ACLs
rules = []
@@ -886,37 +966,47 @@ class TestACLplugin(VppTestCase):
self.assertEqual(dr.srcport_or_icmptype_last, 65535)
else:
if dr.proto == self.proto[self.IP][self.TCP]:
- self.assertGreater(dr.srcport_or_icmptype_first,
- self.tcp_sport_from-1)
- self.assertLess(dr.srcport_or_icmptype_first,
- self.tcp_sport_to+1)
- self.assertGreater(dr.dstport_or_icmpcode_last,
- self.tcp_dport_from-1)
- self.assertLess(dr.dstport_or_icmpcode_last,
- self.tcp_dport_to+1)
+ self.assertGreater(
+ dr.srcport_or_icmptype_first, self.tcp_sport_from - 1
+ )
+ self.assertLess(
+ dr.srcport_or_icmptype_first, self.tcp_sport_to + 1
+ )
+ self.assertGreater(
+ dr.dstport_or_icmpcode_last, self.tcp_dport_from - 1
+ )
+ self.assertLess(
+ dr.dstport_or_icmpcode_last, self.tcp_dport_to + 1
+ )
elif dr.proto == self.proto[self.IP][self.UDP]:
- self.assertGreater(dr.srcport_or_icmptype_first,
- self.udp_sport_from-1)
- self.assertLess(dr.srcport_or_icmptype_first,
- self.udp_sport_to+1)
- self.assertGreater(dr.dstport_or_icmpcode_last,
- self.udp_dport_from-1)
- self.assertLess(dr.dstport_or_icmpcode_last,
- self.udp_dport_to+1)
+ self.assertGreater(
+ dr.srcport_or_icmptype_first, self.udp_sport_from - 1
+ )
+ self.assertLess(
+ dr.srcport_or_icmptype_first, self.udp_sport_to + 1
+ )
+ self.assertGreater(
+ dr.dstport_or_icmpcode_last, self.udp_dport_from - 1
+ )
+ self.assertLess(
+ dr.dstport_or_icmpcode_last, self.udp_dport_to + 1
+ )
i += 1
self.logger.info("ACLP_TEST_FINISH_0014")
def test_0015_tcp_permit_port_v4(self):
- """ permit single TCPv4
- """
+ """permit single TCPv4"""
self.logger.info("ACLP_TEST_START_0015")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, port, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -924,21 +1014,22 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4,
- self.proto[self.IP][self.TCP], port)
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], port)
self.logger.info("ACLP_TEST_FINISH_0015")
def test_0016_udp_permit_port_v4(self):
- """ permit single UDPv4
- """
+ """permit single UDPv4"""
self.logger.info("ACLP_TEST_START_0016")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -946,21 +1037,22 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4,
- self.proto[self.IP][self.UDP], port)
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP], port)
self.logger.info("ACLP_TEST_FINISH_0016")
def test_0017_tcp_permit_port_v6(self):
- """ permit single TCPv6
- """
+ """permit single TCPv6"""
self.logger.info("ACLP_TEST_START_0017")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, port, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -968,90 +1060,89 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6,
- self.proto[self.IP][self.TCP], port)
+ self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP], port)
self.logger.info("ACLP_TEST_FINISH_0017")
def test_0018_udp_permit_port_v6(self):
- """ permit single UDPv6
- """
+ """permit single UDPv6"""
self.logger.info("ACLP_TEST_START_0018")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6,
- self.proto[self.IP][self.UDP], port)
+ self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP], port)
self.logger.info("ACLP_TEST_FINISH_0018")
def test_0019_udp_deny_port(self):
- """ deny single TCPv4/v6
- """
+ """deny single TCPv4/v6"""
self.logger.info("ACLP_TEST_START_0019")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.TCP])
+ )
+ rules.append(
+ self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.TCP])
+ )
# Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP], port)
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP], port
+ )
self.logger.info("ACLP_TEST_FINISH_0019")
def test_0020_udp_deny_port(self):
- """ deny single UDPv4/v6
- """
+ """deny single UDPv4/v6"""
self.logger.info("ACLP_TEST_START_0020")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
+ )
+ rules.append(
+ self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
+ )
# Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP], port)
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port
+ )
self.logger.info("ACLP_TEST_FINISH_0020")
def test_0021_udp_deny_port_verify_fragment_deny(self):
- """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
+ """deny single UDPv4/v6, permit ip any, verify non-initial fragment
blocked
"""
self.logger.info("ACLP_TEST_START_0021")
@@ -1059,37 +1150,40 @@ class TestACLplugin(VppTestCase):
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(self.IPV4, self.DENY, port, self.proto[self.IP][self.UDP])
+ )
+ rules.append(
+ self.create_rule(self.IPV6, self.DENY, port, self.proto[self.IP][self.UDP])
+ )
# deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP], port, True)
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP], port, True
+ )
self.logger.info("ACLP_TEST_FINISH_0021")
def test_0022_zero_length_udp_ipv4(self):
- """ VPP-687 zero length udp ipv4 packet"""
+ """VPP-687 zero length udp ipv4 packet"""
self.logger.info("ACLP_TEST_START_0022")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
rules.append(
- self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+ self.create_rule(
+ self.IPV4, self.PERMIT, port, self.proto[self.IP][self.UDP]
+ )
+ )
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "permit empty udp ip4 %d" % port)
@@ -1097,10 +1191,16 @@ class TestACLplugin(VppTestCase):
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
- pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
- self.IP, self.IPV4,
- self.proto[self.IP][self.UDP], port,
- False, False)
+ pkts = self.create_stream(
+ self.pg0,
+ self.pg_if_packet_sizes,
+ self.IP,
+ self.IPV4,
+ self.proto[self.IP][self.UDP],
+ port,
+ False,
+ False,
+ )
if len(pkts) > 0:
self.pg0.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -1114,14 +1214,17 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0022")
def test_0023_zero_length_udp_ipv6(self):
- """ VPP-687 zero length udp ipv6 packet"""
+ """VPP-687 zero length udp ipv6 packet"""
self.logger.info("ACLP_TEST_START_0023")
port = random.randint(16384, 65535)
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, port, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -1131,10 +1234,16 @@ class TestACLplugin(VppTestCase):
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
- pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
- self.IP, self.IPV6,
- self.proto[self.IP][self.UDP], port,
- False, False)
+ pkts = self.create_stream(
+ self.pg0,
+ self.pg_if_packet_sizes,
+ self.IP,
+ self.IPV6,
+ self.proto[self.IP][self.UDP],
+ port,
+ False,
+ False,
+ )
if len(pkts) > 0:
self.pg0.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -1149,16 +1258,21 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0023")
def test_0108_tcp_permit_v4(self):
- """ permit TCPv4 + non-match range
- """
+ """permit TCPv4 + non-match range"""
self.logger.info("ACLP_TEST_START_0108")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1171,16 +1285,21 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0108")
def test_0109_tcp_permit_v6(self):
- """ permit TCPv6 + non-match range
- """
+ """permit TCPv6 + non-match range"""
self.logger.info("ACLP_TEST_START_0109")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -1193,16 +1312,21 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0109")
def test_0110_udp_permit_v4(self):
- """ permit UDPv4 + non-match range
- """
+ """permit UDPv4 + non-match range"""
self.logger.info("ACLP_TEST_START_0110")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1215,16 +1339,21 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0110")
def test_0111_udp_permit_v6(self):
- """ permit UDPv6 + non-match range
- """
+ """permit UDPv6 + non-match range"""
self.logger.info("ACLP_TEST_START_0111")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.UDP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
@@ -1237,80 +1366,113 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0111")
def test_0112_tcp_deny(self):
- """ deny TCPv4/v6 + non-match range
- """
+ """deny TCPv4/v6 + non-match range"""
self.logger.info("ACLP_TEST_START_0112")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4,
+ self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP],
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6,
+ self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP],
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP])
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.TCP]
+ )
self.logger.info("ACLP_TEST_FINISH_0112")
def test_0113_udp_deny(self):
- """ deny UDPv4/v6 + non-match range
- """
+ """deny UDPv4/v6 + non-match range"""
self.logger.info("ACLP_TEST_START_0113")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4,
+ self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP],
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6,
+ self.PERMIT,
+ self.PORTS_RANGE_2,
+ self.proto[self.IP][self.UDP],
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV6, self.DENY, self.PORTS_RANGE, self.proto[self.IP][self.UDP]
+ )
+ )
# permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP])
+ self.run_verify_negat_test(
+ self.IP, self.IPRANDOM, self.proto[self.IP][self.UDP]
+ )
self.logger.info("ACLP_TEST_FINISH_0113")
def test_0300_tcp_permit_v4_etype_aaaa(self):
- """ permit TCPv4, send 0xAAAA etype
- """
+ """permit TCPv4, send 0xAAAA etype"""
self.logger.info("ACLP_TEST_START_0300")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1318,33 +1480,39 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass also for an odd ethertype
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0xaaaa)
+ self.run_verify_test(
+ self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
+ )
self.logger.info("ACLP_TEST_FINISH_0300")
def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
- """
+ """permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked"""
self.logger.info("ACLP_TEST_START_0305")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
+ self.etype_whitelist([0xBBB], 1)
# The oddball ethertype should be blocked
- self.run_verify_negat_test(self.IP, self.IPV4,
- self.proto[self.IP][self.TCP],
- 0, False, 0xaaaa)
+ self.run_verify_negat_test(
+ self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, 0xAAAA
+ )
# remove the whitelist
self.etype_whitelist([], 0, add=False)
@@ -1352,27 +1520,33 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0305")
def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
- """
+ """permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass"""
self.logger.info("ACLP_TEST_START_0306")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
+ self.etype_whitelist([0xBBB], 1)
# The whitelisted traffic, should pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0x0bbb)
+ self.run_verify_test(
+ self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0x0BBB
+ )
# remove the whitelist, the previously blocked 0xAAAA should pass now
self.etype_whitelist([], 0, add=False)
@@ -1380,16 +1554,21 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0306")
def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
- """
+ """permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass"""
self.logger.info("ACLP_TEST_START_0307")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1397,27 +1576,33 @@ class TestACLplugin(VppTestCase):
self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
+ self.etype_whitelist([0xBBB], 1)
# remove the whitelist, the previously blocked 0xAAAA should pass now
self.etype_whitelist([], 0, add=False)
# The whitelisted traffic, should pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0xaaaa)
+ self.run_verify_test(
+ self.IP, self.IPV4, self.proto[self.IP][self.TCP], 0, False, True, 0xAAAA
+ )
self.logger.info("ACLP_TEST_FINISH_0306")
def test_0315_del_intf(self):
- """ apply an acl and delete the interface
- """
+ """apply an acl and delete the interface"""
self.logger.info("ACLP_TEST_START_0315")
# Add an ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_RANGE_2, self.proto[self.IP][self.TCP]
+ )
+ )
+ rules.append(
+ self.create_rule(
+ self.IPV4, self.PERMIT, self.PORTS_RANGE, self.proto[self.IP][self.TCP]
+ )
+ )
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1434,5 +1619,5 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0315")
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)