aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_classify_l2_acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_classify_l2_acl.py')
-rw-r--r--test/test_classify_l2_acl.py324
1 files changed, 183 insertions, 141 deletions
diff --git a/test/test_classify_l2_acl.py b/test/test_classify_l2_acl.py
index b1309881e58..cc3617fb232 100644
--- a/test/test_classify_l2_acl.py
+++ b/test/test_classify_l2_acl.py
@@ -4,9 +4,6 @@
import unittest
import random
-import binascii
-import socket
-
from scapy.packet import Raw
from scapy.data import ETH_P_IP
@@ -14,13 +11,13 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6ExtHdrFragment
-from framework import VppTestCase, VppTestRunner
+from asfframework import VppTestRunner
from util import Host, ppp
from template_classifier import TestClassifier
class TestClassifyAcl(TestClassifier):
- """ Classifier-based L2 input and output ACL Test Case """
+ """Classifier-based L2 input and output ACL Test Case"""
# traffic types
IP = 0
@@ -37,7 +34,7 @@ class TestClassifyAcl(TestClassifier):
# supported protocols
proto = [[6, 17], [1, 58]]
- proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
+ proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"}
ICMPv4 = 0
ICMPv6 = 1
TCP = 0
@@ -104,11 +101,13 @@ class TestClassifyAcl(TestClassifier):
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
- learn=1)
+ cls.vapi.bridge_domain_add_del_v2(
+ bd_id=cls.bd_id, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
+ )
for pg_if in cls.pg_interfaces:
cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id
+ )
# Set up all interfaces
for i in cls.pg_interfaces:
@@ -128,7 +127,7 @@ class TestClassifyAcl(TestClassifier):
# self.warmup_test()
# Holder of the active classify table key
- cls.acl_active_table = ''
+ cls.acl_active_table = ""
except Exception:
super(TestClassifyAcl, cls).tearDownClass()
@@ -147,25 +146,30 @@ class TestClassifyAcl(TestClassifier):
Show various debug prints after each test.
"""
if not self.vpp_dead:
- if self.acl_active_table == 'mac_inout':
+ if self.acl_active_table == "mac_inout":
self.output_acl_set_interface(
- self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
+ self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
- elif self.acl_active_table == 'mac_out':
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ self.acl_active_table = ""
+ elif self.acl_active_table == "mac_out":
self.output_acl_set_interface(
- self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
- elif self.acl_active_table == 'mac_in':
+ self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ self.acl_active_table = ""
+ elif self.acl_active_table == "mac_in":
self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ self.acl_active_table = ""
super(TestClassifyAcl, self).tearDown()
- def create_classify_session(self, intf, table_index, match,
- hit_next_index=0xffffffff, is_add=1):
+ def create_classify_session(
+ self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1
+ ):
"""Create Classify Session
:param VppInterface intf: Interface to apply classify session.
@@ -180,8 +184,9 @@ class TestClassifyAcl(TestClassifier):
table_index=table_index,
match=mask_match,
match_len=mask_match_len,
- hit_next_index=hit_next_index)
- self.assertIsNotNone(r, 'No response msg for add_del_session')
+ hit_next_index=hit_next_index,
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_session")
def create_hosts(self, count, start=0):
"""
@@ -197,39 +202,50 @@ class TestClassifyAcl(TestClassifier):
for pg_if in self.pg_interfaces:
i += 1
start_nr = macs_per_if * i + start
- end_nr = count + start if i == (n_int - 1) \
- else macs_per_if * (i + 1) + start
+ end_nr = (
+ count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start
+ )
hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
for j in range(start_nr, end_nr):
host = Host(
"00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
"172.17.1%02x.%u" % (pg_if.sw_if_index, j),
- "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+ "2017:dead:%02x::%u" % (pg_if.sw_if_index, j),
+ )
hosts.append(host)
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
- if p == 'UDP':
+ if p == "UDP":
if ports == 0:
- return UDP(sport=random.randint(self.udp_sport_from,
- self.udp_sport_to),
- dport=random.randint(self.udp_dport_from,
- self.udp_dport_to))
+ return UDP(
+ sport=random.randint(self.udp_sport_from, self.udp_sport_to),
+ dport=random.randint(self.udp_dport_from, self.udp_dport_to),
+ )
else:
return UDP(sport=ports, dport=ports)
- elif p == 'TCP':
+ elif p == "TCP":
if ports == 0:
- return TCP(sport=random.randint(self.tcp_sport_from,
- self.tcp_sport_to),
- dport=random.randint(self.tcp_dport_from,
- self.tcp_dport_to))
+ return TCP(
+ sport=random.randint(self.tcp_sport_from, self.tcp_sport_to),
+ dport=random.randint(self.tcp_dport_from, self.tcp_dport_to),
+ )
else:
return TCP(sport=ports, dport=ports)
- return ''
-
- def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False,
- pkt_raw=True, etype=-1):
+ return ""
+
+ def create_stream(
+ self,
+ src_if,
+ packet_sizes,
+ traffic_type=0,
+ ipv6=0,
+ proto=-1,
+ ports=0,
+ fragments=False,
+ pkt_raw=True,
+ etype=-1,
+ ):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
@@ -262,26 +278,25 @@ class TestClassifyAcl(TestClassifier):
payload = self.info_to_payload(pkt_info)
p = Ether(dst=dst_host.mac, src=src_host.mac)
if etype > 0:
- p = Ether(dst=dst_host.mac,
- src=src_host.mac,
- type=etype)
+ p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype)
if pkt_info.ip:
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
if fragments:
p /= IPv6ExtHdrFragment(offset=64, m=1)
else:
if fragments:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4,
- flags=1, frag=64)
+ p /= IP(
+ src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64
+ )
else:
p /= IP(src=src_host.ip4, dst=dst_host.ip4)
if traffic_type == self.ICMP:
if pkt_info.ip:
- p /= ICMPv6EchoRequest(type=self.icmp6_type,
- code=self.icmp6_code)
+ p /= ICMPv6EchoRequest(
+ type=self.icmp6_type, code=self.icmp6_code
+ )
else:
- p /= ICMP(type=self.icmp4_type,
- code=self.icmp4_code)
+ p /= ICMP(type=self.icmp4_type, code=self.icmp4_code)
else:
p /= self.create_upper_layer(i, pkt_info.proto, ports)
if pkt_raw:
@@ -293,8 +308,7 @@ class TestClassifyAcl(TestClassifier):
pkts.append(p)
return pkts
- def verify_capture(self, pg_if, capture,
- traffic_type=0, ip_type=0, etype=-1):
+ def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1):
"""
Verify captured input packet stream for defined interface.
@@ -309,22 +323,21 @@ class TestClassifyAcl(TestClassifier):
for packet in capture:
if etype > 0:
if packet[Ether].type != etype:
- self.logger.error(ppp("Unexpected ethertype in packet:",
- packet))
+ self.logger.error(ppp("Unexpected ethertype in packet:", packet))
else:
continue
try:
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
- payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest].data)
+ payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data)
payload = packet[ICMPv6EchoRequest]
else:
payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
if ip_type != 0:
@@ -338,8 +351,9 @@ class TestClassifyAcl(TestClassifier):
self.assertEqual(payload.type, self.icmp6_type)
self.assertEqual(payload.code, self.icmp6_code)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
else:
try:
@@ -349,12 +363,13 @@ class TestClassifyAcl(TestClassifier):
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src,
- packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)"
+ % (pg_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
@@ -363,29 +378,26 @@ class TestClassifyAcl(TestClassifier):
self.assertEqual(ip.src, saved_packet[ip_version].src)
self.assertEqual(ip.dst, saved_packet[ip_version].dst)
p = self.proto_map[payload_info.proto]
- if p == 'TCP':
+ if p == "TCP":
tcp = packet[TCP]
- self.assertEqual(tcp.sport, saved_packet[
- TCP].sport)
- self.assertEqual(tcp.dport, saved_packet[
- TCP].dport)
- elif p == 'UDP':
+ self.assertEqual(tcp.sport, saved_packet[TCP].sport)
+ self.assertEqual(tcp.dport, saved_packet[TCP].dport)
+ elif p == "UDP":
udp = packet[UDP]
- self.assertEqual(udp.sport, saved_packet[
- UDP].sport)
- self.assertEqual(udp.dport, saved_packet[
- UDP].dport)
+ self.assertEqual(udp.sport, saved_packet[UDP].sport)
+ self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error(ppp("Unexpected or invalid packet:",
- packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
+ i, dst_sw_if_index, last_info[i.sw_if_index]
+ )
self.assertTrue(
remaining_packet is None,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
+ "Port %u: Packet expected from source %u didn't arrive"
+ % (dst_sw_if_index, i.sw_if_index),
+ )
def run_traffic_no_check(self):
# Test
@@ -400,16 +412,32 @@ class TestClassifyAcl(TestClassifier):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True, etype=-1):
+ def run_verify_test(
+ self,
+ traffic_type=0,
+ ip_type=0,
+ proto=-1,
+ ports=0,
+ frags=False,
+ pkt_raw=True,
+ etype=-1,
+ ):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
for i in self.pg_interfaces:
if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, pkt_raw, etype)
+ pkts = self.create_stream(
+ i,
+ self.pg_if_packet_sizes,
+ traffic_type,
+ ip_type,
+ proto,
+ ports,
+ frags,
+ pkt_raw,
+ etype,
+ )
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -424,20 +452,27 @@ class TestClassifyAcl(TestClassifier):
if self.flows.__contains__(src_if):
for dst_if in self.flows[src_if]:
capture = dst_if.get_capture(pkts_cnt)
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- self.verify_capture(dst_if, capture,
- traffic_type, ip_type, etype)
+ self.logger.info("Verifying capture on interface %s" % dst_if.name)
+ self.verify_capture(dst_if, capture, traffic_type, ip_type, etype)
- def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False, etype=-1):
+ def run_verify_negat_test(
+ self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1
+ ):
# Test
self.reset_packet_infos()
for i in self.pg_interfaces:
if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, True, etype)
+ pkts = self.create_stream(
+ i,
+ self.pg_if_packet_sizes,
+ traffic_type,
+ ip_type,
+ proto,
+ ports,
+ frags,
+ True,
+ etype,
+ )
if len(pkts) > 0:
i.add_stream(pkts)
@@ -450,101 +485,110 @@ class TestClassifyAcl(TestClassifier):
for src_if in self.pg_interfaces:
if self.flows.__contains__(src_if):
for dst_if in self.flows[src_if]:
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
+ self.logger.info("Verifying capture on interface %s" % dst_if.name)
capture = dst_if.get_capture(0)
self.assertEqual(len(capture), 0)
- def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
- etype='', key='mac', hit_next_index=0xffffffff):
+ def build_classify_table(
+ self,
+ src_mac="",
+ dst_mac="",
+ ether_type="",
+ etype="",
+ key="mac",
+ hit_next_index=0xFFFFFFFF,
+ ):
# Basic ACL testing
- a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
- ether_type=ether_type)
+ a_mask = self.build_mac_mask(
+ src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type
+ )
self.create_classify_table(key, a_mask)
for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
- s_mac = host.mac if src_mac else ''
+ s_mac = host.mac if src_mac else ""
if dst_mac:
for dst_if in self.flows[self.pg0]:
for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
self.create_classify_session(
- self.pg0, self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=s_mac,
- dst_mac=dst_host.mac,
- ether_type=etype),
- hit_next_index=hit_next_index)
+ self.pg0,
+ self.acl_tbl_idx.get(key),
+ self.build_mac_match(
+ src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype
+ ),
+ hit_next_index=hit_next_index,
+ )
else:
self.create_classify_session(
- self.pg0, self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=s_mac, dst_mac='',
- ether_type=etype),
- hit_next_index=hit_next_index)
+ self.pg0,
+ self.acl_tbl_idx.get(key),
+ self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype),
+ hit_next_index=hit_next_index,
+ )
def test_0000_warmup_test(self):
- """ Learn the MAC addresses
- """
+ """Learn the MAC addresses"""
self.create_hosts(2)
self.run_traffic_no_check()
def test_0010_inacl_permit_src_mac(self):
- """ Input L2 ACL test - permit source MAC
+ """Input L2 ACL test - permit source MAC
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_in'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
+ key = "mac_in"
+ self.build_classify_table(src_mac="ffffffffffff", key=key)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
def test_0011_inacl_permit_dst_mac(self):
- """ Input L2 ACL test - permit destination MAC
+ """Input L2 ACL test - permit destination MAC
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with destination MAC address.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_in'
- self.build_classify_table(dst_mac='ffffffffffff', key=key)
+ key = "mac_in"
+ self.build_classify_table(dst_mac="ffffffffffff", key=key)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
def test_0012_inacl_permit_src_dst_mac(self):
- """ Input L2 ACL test - permit source and destination MAC
+ """Input L2 ACL test - permit source and destination MAC
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with source and destination MAC addresses.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_in'
+ key = "mac_in"
self.build_classify_table(
- src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
def test_0013_inacl_permit_ether_type(self):
- """ Input L2 ACL test - permit ether_type
+ """Input L2 ACL test - permit ether_type
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with destination MAC address.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_in'
- self.build_classify_table(
- ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
+ key = "mac_in"
+ self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
def test_0015_inacl_deny(self):
- """ Input L2 ACL test - deny
+ """Input L2 ACL test - deny
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -552,57 +596,55 @@ class TestClassifyAcl(TestClassifier):
- Create ACL with source MAC address.
- Send and verify no received packets on pg1 interface.
"""
- key = 'mac_in'
- self.build_classify_table(
- src_mac='ffffffffffff', hit_next_index=0, key=key)
+ key = "mac_in"
+ self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_negat_test(self.IP, self.IPV4, -1)
def test_0020_outacl_permit(self):
- """ Output L2 ACL test - permit
+ """Output L2 ACL test - permit
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_out'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
+ key = "mac_out"
+ self.build_classify_table(src_mac="ffffffffffff", key=key)
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
def test_0025_outacl_deny(self):
- """ Output L2 ACL test - deny
+ """Output L2 ACL test - deny
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACL with source MAC address.
- Send and verify no received packets on pg1 interface.
"""
- key = 'mac_out'
- self.build_classify_table(
- src_mac='ffffffffffff', hit_next_index=0, key=key)
+ key = "mac_out"
+ self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key)
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_negat_test(self.IP, self.IPV4, -1)
def test_0030_inoutacl_permit(self):
- """ Input+Output L2 ACL test - permit
+ """Input+Output L2 ACL test - permit
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
- Create ACLs with source MAC address.
- Send and verify received packets on pg1 interface.
"""
- key = 'mac_inout'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
+ key = "mac_inout"
+ self.build_classify_table(src_mac="ffffffffffff", key=key)
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)