diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/util.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/util.py')
-rw-r--r-- | test/util.py | 188 |
1 files changed, 110 insertions, 78 deletions
diff --git a/test/util.py b/test/util.py index 2c24571c350..249bd864259 100644 --- a/test/util.py +++ b/test/util.py @@ -11,8 +11,12 @@ from collections import UserDict import scapy.compat from scapy.layers.l2 import Ether from scapy.layers.inet import IP -from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\ - IPv6ExtHdrHopByHop +from scapy.layers.inet6 import ( + IPv6, + IPv6ExtHdrFragment, + IPv6ExtHdrRouting, + IPv6ExtHdrHopByHop, +) from scapy.packet import Raw from scapy.utils import hexdump from scapy.utils6 import in6_mactoifaceid @@ -21,7 +25,7 @@ from io import BytesIO from vpp_papi import mac_pton # Set up an empty logger for the testcase that can be overridden as necessary -null_logger = logging.getLogger('VppTestCase.util') +null_logger = logging.getLogger("VppTestCase.util") null_logger.addHandler(logging.NullHandler()) @@ -30,14 +34,16 @@ def pr(packet): def ppp(headline, packet): - """ Return string containing headline and output of scapy packet.show() """ - return '%s\n%s\n\n%s\n' % (headline, - hexdump(packet, dump=True), - packet.show(dump=True)) + """Return string containing headline and output of scapy packet.show()""" + return "%s\n%s\n\n%s\n" % ( + headline, + hexdump(packet, dump=True), + packet.show(dump=True), + ) def ppc(headline, capture, limit=10): - """ Return string containing ppp() printout for a capture. + """Return string containing ppp() printout for a capture. :param headline: printed as first line of output :param capture: packets to print @@ -48,14 +54,17 @@ def ppc(headline, capture, limit=10): tail = "" if limit < len(capture): tail = "\nPrint limit reached, %s out of %s packets printed" % ( - limit, len(capture)) - body = "".join([ppp("Packet #%s:" % count, p) - for count, p in zip(range(0, limit), capture)]) + limit, + len(capture), + ) + body = "".join( + [ppp("Packet #%s:" % count, p) for count, p in zip(range(0, limit), capture)] + ) return "%s\n%s%s" % (headline, body, tail) def ip4_range(ip4, s, e): - tmp = ip4.rsplit('.', 1)[0] + tmp = ip4.rsplit(".", 1)[0] return ("%s.%d" % (tmp, i) for i in range(s, e)) @@ -65,14 +74,18 @@ def mcast_ip_to_mac(ip): raise ValueError("Must be multicast address.") ip_as_int = int(ip) if ip.version == 4: - mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f, - (ip_as_int >> 8) & 0xff, - ip_as_int & 0xff) + mcast_mac = "01:00:5e:%02x:%02x:%02x" % ( + (ip_as_int >> 16) & 0x7F, + (ip_as_int >> 8) & 0xFF, + ip_as_int & 0xFF, + ) else: - mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff, - (ip_as_int >> 16) & 0xff, - (ip_as_int >> 8) & 0xff, - ip_as_int & 0xff) + mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ( + (ip_as_int >> 24) & 0xFF, + (ip_as_int >> 16) & 0xFF, + (ip_as_int >> 8) & 0xFF, + ip_as_int & 0xFF, + ) return mcast_mac @@ -84,8 +97,7 @@ def mk_ll_addr(mac): def ip6_normalize(ip6): - return socket.inet_ntop(socket.AF_INET6, - socket.inet_pton(socket.AF_INET6, ip6)) + return socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6, ip6)) def get_core_path(tempdir): @@ -107,13 +119,14 @@ def check_core_path(logger, core_path): if corefmt.startswith("|"): logger.error( "WARNING: redirecting the core dump through a" - " filter may result in truncated dumps.") + " filter may result in truncated dumps." + ) logger.error( " You may want to check the filter settings" " or uninstall it and edit the" - " /proc/sys/kernel/core_pattern accordingly.") - logger.error( - " current core pattern is: %s" % corefmt) + " /proc/sys/kernel/core_pattern accordingly." + ) + logger.error(" current core pattern is: %s" % corefmt) class NumericConstant: @@ -136,55 +149,57 @@ class NumericConstant: class Host: - """ Generic test host "connected" to VPPs interface. """ + """Generic test host "connected" to VPPs interface.""" @property def mac(self): - """ MAC address """ + """MAC address""" return self._mac @property def bin_mac(self): - """ MAC address """ + """MAC address""" return mac_pton(self._mac) @property def ip4(self): - """ IPv4 address - string """ + """IPv4 address - string""" return self._ip4 @property def ip4n(self): - """ IPv4 address of remote host - raw, suitable as API parameter.""" + """IPv4 address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET, self._ip4) @property def ip6(self): - """ IPv6 address - string """ + """IPv6 address - string""" return self._ip6 @property def ip6n(self): - """ IPv6 address of remote host - raw, suitable as API parameter.""" + """IPv6 address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET6, self._ip6) @property def ip6_ll(self): - """ IPv6 link-local address - string """ + """IPv6 link-local address - string""" return self._ip6_ll @property def ip6n_ll(self): - """ IPv6 link-local address of remote host - + """IPv6 link-local address of remote host - raw, suitable as API parameter.""" return socket.inet_pton(socket.AF_INET6, self._ip6_ll) def __eq__(self, other): if isinstance(other, Host): - return (self.mac == other.mac and - self.ip4 == other.ip4 and - self.ip6 == other.ip6 and - self.ip6_ll == other.ip6_ll) + return ( + self.mac == other.mac + and self.ip4 == other.ip4 + and self.ip6 == other.ip6 + and self.ip6_ll == other.ip6_ll + ) else: return False @@ -192,10 +207,12 @@ class Host: return not self.__eq__(other) def __repr__(self): - return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac, - self.ip4, - self.ip6, - self.ip6_ll) + return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % ( + self.mac, + self.ip4, + self.ip6, + self.ip6_ll, + ) def __hash__(self): return hash(self.__repr__()) @@ -207,8 +224,8 @@ class Host: self._ip6_ll = ip6_ll -class L4_Conn(): - """ L4 'connection' tied to two VPP interfaces """ +class L4_Conn: + """L4 'connection' tied to two VPP interfaces""" def __init__(self, testcase, if1, if2, af, l4proto, port1, port2): self.testcase = testcase @@ -228,22 +245,25 @@ class L4_Conn(): s1 = 1 - side src_if = self.ifs[s0] dst_if = self.ifs[s1] - layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), - IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)] - merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]} + layer_3 = [ + IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), + IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6), + ] + merged_l4args = {"sport": self.ports[s0], "dport": self.ports[s1]} merged_l4args.update(l4args) - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - layer_3[is_ip6] / - self.l4proto(**merged_l4args) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / layer_3[is_ip6] + / self.l4proto(**merged_l4args) + / Raw(payload) + ) return p def send(self, side, flags=None, payload=""): l4args = {} if flags is not None: - l4args['flags'] = flags - self.ifs[side].add_stream(self.pkt(side, - l4args=l4args, payload=payload)) + l4args["flags"] = flags + self.ifs[side].add_stream(self.pkt(side, l4args=l4args, payload=payload)) self.ifs[1 - side].enable_capture() self.testcase.pg_start() @@ -285,8 +305,8 @@ def fragment_rfc791(packet, fragsize, logger=null_logger): pre_ip_len = len(packet) - len(packet[IP]) ip_header_len = packet[IP].ihl * 4 hex_packet = scapy.compat.raw(packet) - hex_headers = hex_packet[:(pre_ip_len + ip_header_len)] - hex_payload = hex_packet[(pre_ip_len + ip_header_len):] + hex_headers = hex_packet[: (pre_ip_len + ip_header_len)] + hex_payload = hex_packet[(pre_ip_len + ip_header_len) :] pkts = [] ihl = packet[IP].ihl @@ -294,14 +314,14 @@ def fragment_rfc791(packet, fragsize, logger=null_logger): nfb = int((fragsize - pre_ip_len - ihl * 4) / 8) fo = packet[IP].frag - p = packet.__class__(hex_headers + hex_payload[:nfb * 8]) + p = packet.__class__(hex_headers + hex_payload[: nfb * 8]) p[IP].flags = "MF" p[IP].frag = fo p[IP].len = ihl * 4 + nfb * 8 del p[IP].chksum pkts.append(p) - p = packet.__class__(hex_headers + hex_payload[nfb * 8:]) + p = packet.__class__(hex_headers + hex_payload[nfb * 8 :]) p[IP].len = otl - nfb * 8 p[IP].frag = fo + nfb del p[IP].chksum @@ -345,15 +365,19 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger): routing_hdr = counter elif l.__class__ is IPv6ExtHdrHopByHop: hop_by_hop_hdr = counter - elif seen_ipv6 and not upper_layer and \ - not l.__class__.__name__.startswith('IPv6ExtHdr'): + elif ( + seen_ipv6 + and not upper_layer + and not l.__class__.__name__.startswith("IPv6ExtHdr") + ): upper_layer = counter counter = counter + 1 l = packet.getlayer(counter) logger.debug( - "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" % - (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer)) + "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" + % (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer) + ) if upper_layer is None: raise Exception("Upper layer header not found in IPv6 packet") @@ -379,18 +403,27 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger): logger.debug(ppp("Fragment header:", fragment_ext_hdr)) len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload) - if not len_ext_and_upper_layer_payload and \ - hasattr(ext_and_upper_layer, "data"): + if not len_ext_and_upper_layer_payload and hasattr(ext_and_upper_layer, "data"): len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data) - if len(per_fragment_headers) + len(fragment_ext_hdr) +\ - len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\ - > fragsize: - raise Exception("Cannot fragment this packet - MTU too small " - "(%s, %s, %s, %s, %s)" % ( - len(per_fragment_headers), len(fragment_ext_hdr), - len(ext_and_upper_layer), - len_ext_and_upper_layer_payload, fragsize)) + if ( + len(per_fragment_headers) + + len(fragment_ext_hdr) + + len(ext_and_upper_layer) + - len_ext_and_upper_layer_payload + > fragsize + ): + raise Exception( + "Cannot fragment this packet - MTU too small " + "(%s, %s, %s, %s, %s)" + % ( + len(per_fragment_headers), + len(fragment_ext_hdr), + len(ext_and_upper_layer), + len_ext_and_upper_layer_payload, + fragsize, + ) + ) orig_nh = packet[IPv6].nh p = per_fragment_headers @@ -399,7 +432,7 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger): p = p / fragment_ext_hdr del p[IPv6ExtHdrFragment].nh first_payload_len_nfb = int((fragsize - len(p)) / 8) - p = p / Raw(hex_payload[:first_payload_len_nfb * 8]) + p = p / Raw(hex_payload[: first_payload_len_nfb * 8]) del p[IPv6].plen p[IPv6ExtHdrFragment].nh = orig_nh p[IPv6ExtHdrFragment].id = identification @@ -417,7 +450,7 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger): p = p / fragment_ext_hdr del p[IPv6ExtHdrFragment].nh l_nfb = int((fragsize - len(p)) / 8) - p = p / Raw(hex_payload[offset:offset + l_nfb * 8]) + p = p / Raw(hex_payload[offset : offset + l_nfb * 8]) p[IPv6ExtHdrFragment].nh = orig_nh p[IPv6ExtHdrFragment].id = identification p[IPv6ExtHdrFragment].offset = int(offset / 8) @@ -437,11 +470,11 @@ def reassemble4_core(listoffragments, return_ip): first = listoffragments[0] buffer.seek(20) for pkt in listoffragments: - buffer.seek(pkt[IP].frag*8) + buffer.seek(pkt[IP].frag * 8) buffer.write(bytes(pkt[IP].payload)) first.len = len(buffer.getvalue()) + 20 first.flags = 0 - del(first.chksum) + del first.chksum if return_ip: header = bytes(first[IP])[:20] return first[IP].__class__(header + buffer.getvalue()) @@ -472,8 +505,7 @@ def recursive_dict_merge(dict_base, dict_update): for key in dict_update: if key in dict_base: if type(dict_update[key]) is dict: - dict_base[key] = recursive_dict_merge(dict_base[key], - dict_update[key]) + dict_base[key] = recursive_dict_merge(dict_base[key], dict_update[key]) else: dict_base[key] = dict_update[key] else: |