summaryrefslogtreecommitdiffstats
path: root/test/template_classifier.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/template_classifier.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/template_classifier.py')
-rw-r--r--test/template_classifier.py216
1 files changed, 117 insertions, 99 deletions
diff --git a/test/template_classifier.py b/test/template_classifier.py
index 7ce69d436f5..ec2a4143eec 100644
--- a/test/template_classifier.py
+++ b/test/template_classifier.py
@@ -30,13 +30,11 @@ class VarMatch:
class TestClassifier(VppTestCase):
-
@staticmethod
def _resolve_mask_match(mask_match):
mask_match = binascii.unhexlify(mask_match)
mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
- mask_match = mask_match + b'\0' * \
- (mask_match_len - len(mask_match))
+ mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match))
return mask_match, mask_match_len
@classmethod
@@ -47,7 +45,7 @@ class TestClassifier(VppTestCase):
variables and configure VPP.
"""
super(TestClassifier, cls).setUpClass()
- cls.acl_active_table = ''
+ cls.acl_active_table = ""
cls.af = AF_INET
def setUp(self):
@@ -113,13 +111,15 @@ class TestClassifier(VppTestCase):
self.logger.info(self.vapi.cli("show ip fib"))
self.logger.info(self.vapi.cli("show error"))
- if self.acl_active_table.endswith('out'):
+ if self.acl_active_table.endswith("out"):
self.output_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- elif self.acl_active_table != '':
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ elif self.acl_active_table != "":
self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ self.acl_active_table = ""
for intf in self.interfaces:
if self.af == AF_INET:
@@ -131,7 +131,7 @@ class TestClassifier(VppTestCase):
super(TestClassifier, self).tearDown()
@staticmethod
- def build_mac_match(dst_mac='', src_mac='', ether_type=''):
+ def build_mac_match(dst_mac="", src_mac="", ether_type=""):
"""Build MAC ACL match data with hexstring format.
:param str dst_mac: source MAC address <x:x:x:x:x:x>
@@ -139,15 +139,16 @@ class TestClassifier(VppTestCase):
:param str ether_type: ethernet type <0-ffff>
"""
if dst_mac:
- dst_mac = dst_mac.replace(':', '')
+ dst_mac = dst_mac.replace(":", "")
if src_mac:
- src_mac = src_mac.replace(':', '')
+ src_mac = src_mac.replace(":", "")
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip()
+ return (
+ "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
+ ).rstrip()
@staticmethod
- def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
+ def build_mac_mask(dst_mac="", src_mac="", ether_type=""):
"""Build MAC ACL mask data with hexstring format.
:param str dst_mac: source MAC address <0-ffffffffffff>
@@ -155,12 +156,12 @@ class TestClassifier(VppTestCase):
:param str ether_type: ethernet type <0-ffff>
"""
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip()
+ return (
+ "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
+ ).rstrip()
@staticmethod
- def build_ip_mask(proto='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
+ def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""):
"""Build IP ACL mask data with hexstring format.
:param str proto: protocol number <0-ff>
@@ -170,12 +171,14 @@ class TestClassifier(VppTestCase):
:param str dst_port: destination port number <0-ffff>
"""
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ return (
+ "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
+ proto, src_ip, dst_ip, src_port, dst_port
+ )
+ ).rstrip("0")
@staticmethod
- def build_ip6_mask(nh='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
+ def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""):
"""Build IPv6 ACL mask data with hexstring format.
:param str nh: next header number <0-ff>
@@ -185,24 +188,26 @@ class TestClassifier(VppTestCase):
:param str dst_port: destination port number <0-ffff>
"""
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ return (
+ "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
+ nh, src_ip, dst_ip, src_port, dst_port
+ )
+ ).rstrip("0")
@staticmethod
def build_payload_mask(masks):
- payload_mask = ''
+ payload_mask = ""
for mask in masks:
# offset is specified in bytes, convert to hex format.
length = (mask.offset * 2) + len(mask.spec)
- format_spec = '{!s:0>' + str(length) + '}'
+ format_spec = "{!s:0>" + str(length) + "}"
payload_mask += format_spec.format(mask.spec)
- return payload_mask.rstrip('0')
+ return payload_mask.rstrip("0")
@staticmethod
- def build_ip_match(proto=0, src_ip='', dst_ip='',
- src_port=0, dst_port=0):
+ def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
"""Build IP ACL match data with hexstring format.
:param int proto: protocol number with valid option "x"
@@ -212,17 +217,18 @@ class TestClassifier(VppTestCase):
:param int dst_port: destination port number "x"
"""
if src_ip:
- src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
+ src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii")
if dst_ip:
- dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
+ dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii")
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ return (
+ "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
+ hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
+ )
+ ).rstrip("0")
@staticmethod
- def build_ip6_match(nh=0, src_ip='', dst_ip='',
- src_port=0, dst_port=0):
+ def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
"""Build IPv6 ACL match data with hexstring format.
:param int nh: next header number with valid option "x"
@@ -234,39 +240,44 @@ class TestClassifier(VppTestCase):
"""
if src_ip:
if sys.version_info[0] == 2:
- src_ip = binascii.hexlify(socket.inet_pton(
- socket.AF_INET6, src_ip))
+ src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip))
else:
src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
if dst_ip:
if sys.version_info[0] == 2:
- dst_ip = binascii.hexlify(socket.inet_pton(
- socket.AF_INET6, dst_ip))
+ dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip))
else:
dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ return (
+ "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
+ hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
+ )
+ ).rstrip("0")
@staticmethod
def build_payload_match(matches):
- payload_match = ''
+ payload_match = ""
for match in matches:
sval = str(hex(match.value)[2:])
# offset is specified in bytes, convert to hex format.
length = (match.offset + match.length) * 2
- format_spec = '{!s:0>' + str(length) + '}'
+ format_spec = "{!s:0>" + str(length) + "}"
payload_match += format_spec.format(sval)
- return payload_match.rstrip('0')
+ return payload_match.rstrip("0")
- def create_stream(self, src_if, dst_if, packet_sizes,
- proto_l=UDP(sport=1234, dport=5678),
- payload_ex=None):
+ def create_stream(
+ self,
+ src_if,
+ dst_if,
+ packet_sizes,
+ proto_l=UDP(sport=1234, dport=5678),
+ payload_ex=None,
+ ):
"""Create input packet stream for defined interfaces.
:param VppInterface src_if: Source Interface for packet stream.
@@ -285,15 +296,19 @@ class TestClassifier(VppTestCase):
payload += payload_ex
if self.af == AF_INET:
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
- proto_l /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / proto_l
+ / Raw(payload)
+ )
elif self.af == AF_INET6:
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
- proto_l /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / proto_l
+ / Raw(payload)
+ )
info.data = p.copy()
self.extend_packet(p, size)
pkts.append(p)
@@ -322,11 +337,12 @@ class TestClassifier(VppTestCase):
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
- "Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ "Got packet on port %s: src=%u (id=%u)"
+ % (dst_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
@@ -343,13 +359,15 @@ class TestClassifier(VppTestCase):
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
-
- def create_classify_table(self, key, mask, data_offset=0,
- next_table_index=None):
+ i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
+ )
+ self.assertTrue(
+ remaining_packet is None,
+ "Interface %s: Packet expected from interface %s "
+ "didn't arrive" % (dst_if.name, i.name),
+ )
+
+ def create_classify_table(self, key, mask, data_offset=0, next_table_index=None):
"""Create Classify Table
:param str key: key for classify table (ex, ACL name).
@@ -366,12 +384,14 @@ class TestClassifier(VppTestCase):
miss_next_index=0,
current_data_flag=1,
current_data_offset=data_offset,
- next_table_index=next_table_index)
- self.assertIsNotNone(r, 'No response msg for add_del_table')
+ next_table_index=next_table_index,
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_table")
self.acl_tbl_idx[key] = r.new_table_index
- def create_classify_session(self, table_index, match, pbr_option=0,
- vrfid=0, is_add=1):
+ def create_classify_session(
+ self, table_index, match, pbr_option=0, vrfid=0, is_add=1
+ ):
"""Create Classify Session
:param int table_index: table index to identify classify table.
@@ -389,8 +409,9 @@ class TestClassifier(VppTestCase):
match_len=mask_match_len,
opaque_index=0,
action=pbr_option,
- metadata=vrfid)
- self.assertIsNotNone(r, 'No response msg for add_del_session')
+ metadata=vrfid,
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_session")
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
@@ -403,20 +424,17 @@ class TestClassifier(VppTestCase):
r = None
if self.af == AF_INET:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip4_table_index=table_index)
+ is_add, intf.sw_if_index, ip4_table_index=table_index
+ )
elif self.af == AF_INET6:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip6_table_index=table_index)
+ is_add, intf.sw_if_index, ip6_table_index=table_index
+ )
else:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- l2_table_index=table_index)
- self.assertIsNotNone(r, 'No response msg for acl_set_interface')
+ is_add, intf.sw_if_index, l2_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for acl_set_interface")
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
@@ -429,20 +447,17 @@ class TestClassifier(VppTestCase):
r = None
if self.af == AF_INET:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip4_table_index=table_index)
+ is_add, intf.sw_if_index, ip4_table_index=table_index
+ )
elif self.af == AF_INET6:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip6_table_index=table_index)
+ is_add, intf.sw_if_index, ip6_table_index=table_index
+ )
else:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- l2_table_index=table_index)
- self.assertIsNotNone(r, 'No response msg for acl_set_interface')
+ is_add, intf.sw_if_index, l2_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for acl_set_interface")
def config_pbr_fib_entry(self, intf, is_add=1):
"""Configure fib entry to route traffic toward PBR VRF table
@@ -451,10 +466,13 @@ class TestClassifier(VppTestCase):
"""
addr_len = 24
- self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
- dst_address_length=addr_len,
- next_hop_address=intf.remote_ip4,
- table_id=self.pbr_vrfid, is_add=is_add)
+ self.vapi.ip_add_del_route(
+ dst_address=intf.local_ip4,
+ dst_address_length=addr_len,
+ next_hop_address=intf.remote_ip4,
+ table_id=self.pbr_vrfid,
+ is_add=is_add,
+ )
def verify_vrf(self, vrf_id):
"""