aboutsummaryrefslogtreecommitdiffstats
path: root/test/util.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/util.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/util.py')
-rw-r--r--test/util.py188
1 files changed, 110 insertions, 78 deletions
diff --git a/test/util.py b/test/util.py
index 2c24571c350..249bd864259 100644
--- a/test/util.py
+++ b/test/util.py
@@ -11,8 +11,12 @@ from collections import UserDict
import scapy.compat
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
-from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
- IPv6ExtHdrHopByHop
+from scapy.layers.inet6 import (
+ IPv6,
+ IPv6ExtHdrFragment,
+ IPv6ExtHdrRouting,
+ IPv6ExtHdrHopByHop,
+)
from scapy.packet import Raw
from scapy.utils import hexdump
from scapy.utils6 import in6_mactoifaceid
@@ -21,7 +25,7 @@ from io import BytesIO
from vpp_papi import mac_pton
# Set up an empty logger for the testcase that can be overridden as necessary
-null_logger = logging.getLogger('VppTestCase.util')
+null_logger = logging.getLogger("VppTestCase.util")
null_logger.addHandler(logging.NullHandler())
@@ -30,14 +34,16 @@ def pr(packet):
def ppp(headline, packet):
- """ Return string containing headline and output of scapy packet.show() """
- return '%s\n%s\n\n%s\n' % (headline,
- hexdump(packet, dump=True),
- packet.show(dump=True))
+ """Return string containing headline and output of scapy packet.show()"""
+ return "%s\n%s\n\n%s\n" % (
+ headline,
+ hexdump(packet, dump=True),
+ packet.show(dump=True),
+ )
def ppc(headline, capture, limit=10):
- """ Return string containing ppp() printout for a capture.
+ """Return string containing ppp() printout for a capture.
:param headline: printed as first line of output
:param capture: packets to print
@@ -48,14 +54,17 @@ def ppc(headline, capture, limit=10):
tail = ""
if limit < len(capture):
tail = "\nPrint limit reached, %s out of %s packets printed" % (
- limit, len(capture))
- body = "".join([ppp("Packet #%s:" % count, p)
- for count, p in zip(range(0, limit), capture)])
+ limit,
+ len(capture),
+ )
+ body = "".join(
+ [ppp("Packet #%s:" % count, p) for count, p in zip(range(0, limit), capture)]
+ )
return "%s\n%s%s" % (headline, body, tail)
def ip4_range(ip4, s, e):
- tmp = ip4.rsplit('.', 1)[0]
+ tmp = ip4.rsplit(".", 1)[0]
return ("%s.%d" % (tmp, i) for i in range(s, e))
@@ -65,14 +74,18 @@ def mcast_ip_to_mac(ip):
raise ValueError("Must be multicast address.")
ip_as_int = int(ip)
if ip.version == 4:
- mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
- (ip_as_int >> 8) & 0xff,
- ip_as_int & 0xff)
+ mcast_mac = "01:00:5e:%02x:%02x:%02x" % (
+ (ip_as_int >> 16) & 0x7F,
+ (ip_as_int >> 8) & 0xFF,
+ ip_as_int & 0xFF,
+ )
else:
- mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
- (ip_as_int >> 16) & 0xff,
- (ip_as_int >> 8) & 0xff,
- ip_as_int & 0xff)
+ mcast_mac = "33:33:%02x:%02x:%02x:%02x" % (
+ (ip_as_int >> 24) & 0xFF,
+ (ip_as_int >> 16) & 0xFF,
+ (ip_as_int >> 8) & 0xFF,
+ ip_as_int & 0xFF,
+ )
return mcast_mac
@@ -84,8 +97,7 @@ def mk_ll_addr(mac):
def ip6_normalize(ip6):
- return socket.inet_ntop(socket.AF_INET6,
- socket.inet_pton(socket.AF_INET6, ip6))
+ return socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6, ip6))
def get_core_path(tempdir):
@@ -107,13 +119,14 @@ def check_core_path(logger, core_path):
if corefmt.startswith("|"):
logger.error(
"WARNING: redirecting the core dump through a"
- " filter may result in truncated dumps.")
+ " filter may result in truncated dumps."
+ )
logger.error(
" You may want to check the filter settings"
" or uninstall it and edit the"
- " /proc/sys/kernel/core_pattern accordingly.")
- logger.error(
- " current core pattern is: %s" % corefmt)
+ " /proc/sys/kernel/core_pattern accordingly."
+ )
+ logger.error(" current core pattern is: %s" % corefmt)
class NumericConstant:
@@ -136,55 +149,57 @@ class NumericConstant:
class Host:
- """ Generic test host "connected" to VPPs interface. """
+ """Generic test host "connected" to VPPs interface."""
@property
def mac(self):
- """ MAC address """
+ """MAC address"""
return self._mac
@property
def bin_mac(self):
- """ MAC address """
+ """MAC address"""
return mac_pton(self._mac)
@property
def ip4(self):
- """ IPv4 address - string """
+ """IPv4 address - string"""
return self._ip4
@property
def ip4n(self):
- """ IPv4 address of remote host - raw, suitable as API parameter."""
+ """IPv4 address of remote host - raw, suitable as API parameter."""
return socket.inet_pton(socket.AF_INET, self._ip4)
@property
def ip6(self):
- """ IPv6 address - string """
+ """IPv6 address - string"""
return self._ip6
@property
def ip6n(self):
- """ IPv6 address of remote host - raw, suitable as API parameter."""
+ """IPv6 address of remote host - raw, suitable as API parameter."""
return socket.inet_pton(socket.AF_INET6, self._ip6)
@property
def ip6_ll(self):
- """ IPv6 link-local address - string """
+ """IPv6 link-local address - string"""
return self._ip6_ll
@property
def ip6n_ll(self):
- """ IPv6 link-local address of remote host -
+ """IPv6 link-local address of remote host -
raw, suitable as API parameter."""
return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
def __eq__(self, other):
if isinstance(other, Host):
- return (self.mac == other.mac and
- self.ip4 == other.ip4 and
- self.ip6 == other.ip6 and
- self.ip6_ll == other.ip6_ll)
+ return (
+ self.mac == other.mac
+ and self.ip4 == other.ip4
+ and self.ip6 == other.ip6
+ and self.ip6_ll == other.ip6_ll
+ )
else:
return False
@@ -192,10 +207,12 @@ class Host:
return not self.__eq__(other)
def __repr__(self):
- return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
- self.ip4,
- self.ip6,
- self.ip6_ll)
+ return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (
+ self.mac,
+ self.ip4,
+ self.ip6,
+ self.ip6_ll,
+ )
def __hash__(self):
return hash(self.__repr__())
@@ -207,8 +224,8 @@ class Host:
self._ip6_ll = ip6_ll
-class L4_Conn():
- """ L4 'connection' tied to two VPP interfaces """
+class L4_Conn:
+ """L4 'connection' tied to two VPP interfaces"""
def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
self.testcase = testcase
@@ -228,22 +245,25 @@ class L4_Conn():
s1 = 1 - side
src_if = self.ifs[s0]
dst_if = self.ifs[s1]
- layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
- IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
- merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
+ layer_3 = [
+ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+ IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6),
+ ]
+ merged_l4args = {"sport": self.ports[s0], "dport": self.ports[s1]}
merged_l4args.update(l4args)
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- layer_3[is_ip6] /
- self.l4proto(**merged_l4args) /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / layer_3[is_ip6]
+ / self.l4proto(**merged_l4args)
+ / Raw(payload)
+ )
return p
def send(self, side, flags=None, payload=""):
l4args = {}
if flags is not None:
- l4args['flags'] = flags
- self.ifs[side].add_stream(self.pkt(side,
- l4args=l4args, payload=payload))
+ l4args["flags"] = flags
+ self.ifs[side].add_stream(self.pkt(side, l4args=l4args, payload=payload))
self.ifs[1 - side].enable_capture()
self.testcase.pg_start()
@@ -285,8 +305,8 @@ def fragment_rfc791(packet, fragsize, logger=null_logger):
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
hex_packet = scapy.compat.raw(packet)
- hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
- hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
+ hex_headers = hex_packet[: (pre_ip_len + ip_header_len)]
+ hex_payload = hex_packet[(pre_ip_len + ip_header_len) :]
pkts = []
ihl = packet[IP].ihl
@@ -294,14 +314,14 @@ def fragment_rfc791(packet, fragsize, logger=null_logger):
nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
fo = packet[IP].frag
- p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
+ p = packet.__class__(hex_headers + hex_payload[: nfb * 8])
p[IP].flags = "MF"
p[IP].frag = fo
p[IP].len = ihl * 4 + nfb * 8
del p[IP].chksum
pkts.append(p)
- p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
+ p = packet.__class__(hex_headers + hex_payload[nfb * 8 :])
p[IP].len = otl - nfb * 8
p[IP].frag = fo + nfb
del p[IP].chksum
@@ -345,15 +365,19 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
routing_hdr = counter
elif l.__class__ is IPv6ExtHdrHopByHop:
hop_by_hop_hdr = counter
- elif seen_ipv6 and not upper_layer and \
- not l.__class__.__name__.startswith('IPv6ExtHdr'):
+ elif (
+ seen_ipv6
+ and not upper_layer
+ and not l.__class__.__name__.startswith("IPv6ExtHdr")
+ ):
upper_layer = counter
counter = counter + 1
l = packet.getlayer(counter)
logger.debug(
- "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
- (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
+ "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)"
+ % (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer)
+ )
if upper_layer is None:
raise Exception("Upper layer header not found in IPv6 packet")
@@ -379,18 +403,27 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
logger.debug(ppp("Fragment header:", fragment_ext_hdr))
len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
- if not len_ext_and_upper_layer_payload and \
- hasattr(ext_and_upper_layer, "data"):
+ if not len_ext_and_upper_layer_payload and hasattr(ext_and_upper_layer, "data"):
len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
- if len(per_fragment_headers) + len(fragment_ext_hdr) +\
- len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
- > fragsize:
- raise Exception("Cannot fragment this packet - MTU too small "
- "(%s, %s, %s, %s, %s)" % (
- len(per_fragment_headers), len(fragment_ext_hdr),
- len(ext_and_upper_layer),
- len_ext_and_upper_layer_payload, fragsize))
+ if (
+ len(per_fragment_headers)
+ + len(fragment_ext_hdr)
+ + len(ext_and_upper_layer)
+ - len_ext_and_upper_layer_payload
+ > fragsize
+ ):
+ raise Exception(
+ "Cannot fragment this packet - MTU too small "
+ "(%s, %s, %s, %s, %s)"
+ % (
+ len(per_fragment_headers),
+ len(fragment_ext_hdr),
+ len(ext_and_upper_layer),
+ len_ext_and_upper_layer_payload,
+ fragsize,
+ )
+ )
orig_nh = packet[IPv6].nh
p = per_fragment_headers
@@ -399,7 +432,7 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
p = p / fragment_ext_hdr
del p[IPv6ExtHdrFragment].nh
first_payload_len_nfb = int((fragsize - len(p)) / 8)
- p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
+ p = p / Raw(hex_payload[: first_payload_len_nfb * 8])
del p[IPv6].plen
p[IPv6ExtHdrFragment].nh = orig_nh
p[IPv6ExtHdrFragment].id = identification
@@ -417,7 +450,7 @@ def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
p = p / fragment_ext_hdr
del p[IPv6ExtHdrFragment].nh
l_nfb = int((fragsize - len(p)) / 8)
- p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
+ p = p / Raw(hex_payload[offset : offset + l_nfb * 8])
p[IPv6ExtHdrFragment].nh = orig_nh
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = int(offset / 8)
@@ -437,11 +470,11 @@ def reassemble4_core(listoffragments, return_ip):
first = listoffragments[0]
buffer.seek(20)
for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
+ buffer.seek(pkt[IP].frag * 8)
buffer.write(bytes(pkt[IP].payload))
first.len = len(buffer.getvalue()) + 20
first.flags = 0
- del(first.chksum)
+ del first.chksum
if return_ip:
header = bytes(first[IP])[:20]
return first[IP].__class__(header + buffer.getvalue())
@@ -472,8 +505,7 @@ def recursive_dict_merge(dict_base, dict_update):
for key in dict_update:
if key in dict_base:
if type(dict_update[key]) is dict:
- dict_base[key] = recursive_dict_merge(dict_base[key],
- dict_update[key])
+ dict_base[key] = recursive_dict_merge(dict_base[key], dict_update[key])
else:
dict_base[key] = dict_update[key]
else: