diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/bfd.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/bfd.py')
-rw-r--r-- | test/bfd.py | 261 |
1 files changed, 152 insertions, 109 deletions
diff --git a/test/bfd.py b/test/bfd.py index bbfa5945593..4189983b430 100644 --- a/test/bfd.py +++ b/test/bfd.py @@ -5,15 +5,22 @@ from socket import AF_INET, AF_INET6, inet_pton from scapy.all import bind_layers from scapy.layers.inet import UDP from scapy.packet import Packet -from scapy.fields import BitField, BitEnumField, XByteField, FlagsField,\ - ConditionalField, StrField +from scapy.fields import ( + BitField, + BitEnumField, + XByteField, + FlagsField, + ConditionalField, + StrField, +) from vpp_object import VppObject from util import NumericConstant from vpp_papi import VppEnum class BFDDiagCode(NumericConstant): - """ BFD Diagnostic Code """ + """BFD Diagnostic Code""" + no_diagnostic = 0 control_detection_time_expired = 1 echo_function_failed = 2 @@ -38,7 +45,8 @@ class BFDDiagCode(NumericConstant): class BFDState(NumericConstant): - """ BFD State """ + """BFD State""" + admin_down = 0 down = 1 init = 2 @@ -53,7 +61,8 @@ class BFDState(NumericConstant): class BFDAuthType(NumericConstant): - """ BFD Authentication Type """ + """BFD Authentication Type""" + no_auth = 0 simple_pwd = 1 keyed_md5 = 2 @@ -72,34 +81,38 @@ class BFDAuthType(NumericConstant): def bfd_is_auth_used(pkt): - """ is packet authenticated? """ + """is packet authenticated?""" return "A" in pkt.sprintf("%BFD.flags%") def bfd_is_simple_pwd_used(pkt): - """ is simple password authentication used? """ + """is simple password authentication used?""" return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd def bfd_is_sha1_used(pkt): - """ is sha1 authentication used? """ - return bfd_is_auth_used(pkt) and pkt.auth_type in \ - (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1) + """is sha1 authentication used?""" + return bfd_is_auth_used(pkt) and pkt.auth_type in ( + BFDAuthType.keyed_sha1, + BFDAuthType.meticulous_keyed_sha1, + ) def bfd_is_md5_used(pkt): - """ is md5 authentication used? """ - return bfd_is_auth_used(pkt) and pkt.auth_type in \ - (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5) + """is md5 authentication used?""" + return bfd_is_auth_used(pkt) and pkt.auth_type in ( + BFDAuthType.keyed_md5, + BFDAuthType.meticulous_keyed_md5, + ) def bfd_is_md5_or_sha1_used(pkt): - """ is md5 or sha1 used? """ + """is md5 or sha1 used?""" return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt) class BFD(Packet): - """ BFD protocol layer for scapy """ + """BFD protocol layer for scapy""" udp_dport = 3784 #: BFD destination port per RFC 5881 udp_dport_echo = 3785 # : BFD destination port for ECHO per RFC 5881 @@ -114,7 +127,7 @@ class BFD(Packet): BitField("version", 1, 3), BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict), BitEnumField("state", 0, 2, BFDState.desc_dict), - FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']), + FlagsField("flags", 0, 6, ["M", "D", "A", "C", "F", "P"]), XByteField("detect_mult", 0), BitField("length", bfd_pkt_len, 8), BitField("my_discriminator", 0, 32), @@ -123,22 +136,20 @@ class BFD(Packet): BitField("required_min_rx_interval", 0, 32), BitField("required_min_echo_rx_interval", 0, 32), ConditionalField( - BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict), - bfd_is_auth_used), + BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict), bfd_is_auth_used + ), ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used), ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used), - ConditionalField(BitField("auth_reserved", 0, 8), - bfd_is_md5_or_sha1_used), - ConditionalField( - BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used), + ConditionalField(BitField("auth_reserved", 0, 8), bfd_is_md5_or_sha1_used), + ConditionalField(BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used), ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used), - ConditionalField( - StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used), + ConditionalField(StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used), ] def mysummary(self): - return self.sprintf("BFD(my_disc=%BFD.my_discriminator%," - "your_disc=%BFD.your_discriminator%)") + return self.sprintf( + "BFD(my_disc=%BFD.my_discriminator%, your_disc=%BFD.your_discriminator%)" + ) # glue the BFD packet class to scapy parser @@ -146,7 +157,7 @@ bind_layers(UDP, BFD, dport=BFD.udp_dport) class BFD_vpp_echo(Packet): - """ BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one) """ + """BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one)""" udp_dport = 3785 #: BFD echo destination port per RFC 5881 name = "BFD_VPP_ECHO" @@ -154,13 +165,14 @@ class BFD_vpp_echo(Packet): fields_desc = [ BitField("discriminator", 0, 32), BitField("expire_time_clocks", 0, 64), - BitField("checksum", 0, 64) + BitField("checksum", 0, 64), ] def mysummary(self): return self.sprintf( "BFD_VPP_ECHO(disc=%BFD_VPP_ECHO.discriminator%," - "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)") + "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)" + ) # glue the BFD echo packet class to scapy parser @@ -168,7 +180,7 @@ bind_layers(UDP, BFD_vpp_echo, dport=BFD_vpp_echo.udp_dport) class VppBFDAuthKey(VppObject): - """ Represents BFD authentication key in VPP """ + """Represents BFD authentication key in VPP""" def __init__(self, test, conf_key_id, auth_type, key): self._test = test @@ -179,17 +191,17 @@ class VppBFDAuthKey(VppObject): @property def test(self): - """ Test which created this key """ + """Test which created this key""" return self._test @property def auth_type(self): - """ Authentication type for this key """ + """Authentication type for this key""" return self._auth_type @property def key(self): - """ key data """ + """key data""" return self._key @key.setter @@ -198,17 +210,20 @@ class VppBFDAuthKey(VppObject): @property def conf_key_id(self): - """ configuration key ID """ + """configuration key ID""" return self._conf_key_id def add_vpp_config(self): self.test.vapi.bfd_auth_set_key( - conf_key_id=self._conf_key_id, auth_type=self._auth_type, - key=self._key, key_len=len(self._key)) + conf_key_id=self._conf_key_id, + auth_type=self._auth_type, + key=self._key, + key_len=len(self._key), + ) self._test.registry.register(self, self.test.logger) def get_bfd_auth_keys_dump_entry(self): - """ get the entry in the auth keys dump corresponding to this key """ + """get the entry in the auth keys dump corresponding to this key""" result = self.test.vapi.bfd_auth_keys_dump() for k in result: if k.conf_key_id == self._conf_key_id: @@ -226,11 +241,22 @@ class VppBFDAuthKey(VppObject): class VppBFDUDPSession(VppObject): - """ Represents BFD UDP session in VPP """ - - def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET, - desired_min_tx=300000, required_min_rx=300000, detect_mult=3, - sha1_key=None, bfd_key_id=None, is_tunnel=False): + """Represents BFD UDP session in VPP""" + + def __init__( + self, + test, + interface, + peer_addr, + local_addr=None, + af=AF_INET, + desired_min_tx=300000, + required_min_rx=300000, + detect_mult=3, + sha1_key=None, + bfd_key_id=None, + is_tunnel=False, + ): self._test = test self._interface = interface self._af = af @@ -251,22 +277,22 @@ class VppBFDUDPSession(VppObject): @property def test(self): - """ Test which created this session """ + """Test which created this session""" return self._test @property def interface(self): - """ Interface on which this session lives """ + """Interface on which this session lives""" return self._interface @property def af(self): - """ Address family - AF_INET or AF_INET6 """ + """Address family - AF_INET or AF_INET6""" return self._af @property def local_addr(self): - """ BFD session local address (VPP address) """ + """BFD session local address (VPP address)""" if self._local_addr is None: if self.af == AF_INET: return self._interface.local_ip4 @@ -278,28 +304,32 @@ class VppBFDUDPSession(VppObject): @property def peer_addr(self): - """ BFD session peer address """ + """BFD session peer address""" return self._peer_addr def get_bfd_udp_session_dump_entry(self): - """ get the namedtuple entry from bfd udp session dump """ + """get the namedtuple entry from bfd udp session dump""" result = self.test.vapi.bfd_udp_session_dump() for s in result: self.test.logger.debug("session entry: %s" % str(s)) if s.sw_if_index == self.interface.sw_if_index: - if self.af == AF_INET \ - and self.interface.local_ip4 == str(s.local_addr) \ - and self.interface.remote_ip4 == str(s.peer_addr): + if ( + self.af == AF_INET + and self.interface.local_ip4 == str(s.local_addr) + and self.interface.remote_ip4 == str(s.peer_addr) + ): return s - if self.af == AF_INET6 \ - and self.interface.local_ip6 == str(s.local_addr) \ - and self.interface.remote_ip6 == str(s.peer_addr): + if ( + self.af == AF_INET6 + and self.interface.local_ip6 == str(s.local_addr) + and self.interface.remote_ip6 == str(s.peer_addr) + ): return s return None @property def state(self): - """ BFD session state """ + """BFD session state""" session = self.get_bfd_udp_session_dump_entry() if session is None: raise Exception("Could not find BFD session in VPP response") @@ -307,27 +337,27 @@ class VppBFDUDPSession(VppObject): @property def desired_min_tx(self): - """ desired minimum tx interval """ + """desired minimum tx interval""" return self._desired_min_tx @property def required_min_rx(self): - """ required minimum rx interval """ + """required minimum rx interval""" return self._required_min_rx @property def detect_mult(self): - """ detect multiplier """ + """detect multiplier""" return self._detect_mult @property def sha1_key(self): - """ sha1 key """ + """sha1 key""" return self._sha1_key @property def bfd_key_id(self): - """ bfd key id in use """ + """bfd key id in use""" return self._bfd_key_id @property @@ -335,7 +365,7 @@ class VppBFDUDPSession(VppObject): return self._is_tunnel def activate_auth(self, key, bfd_key_id=None, delayed=False): - """ activate authentication for this session """ + """activate authentication for this session""" self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255) self._sha1_key = key conf_key_id = self._sha1_key.conf_key_id @@ -346,10 +376,11 @@ class VppBFDUDPSession(VppObject): peer_addr=self.peer_addr, bfd_key_id=self._bfd_key_id, conf_key_id=conf_key_id, - is_delayed=is_delayed) + is_delayed=is_delayed, + ) def deactivate_auth(self, delayed=False): - """ deactivate authentication """ + """deactivate authentication""" self._bfd_key_id = None self._sha1_key = None is_delayed = 1 if delayed else 0 @@ -357,45 +388,48 @@ class VppBFDUDPSession(VppObject): sw_if_index=self._interface.sw_if_index, local_addr=self.local_addr, peer_addr=self.peer_addr, - is_delayed=is_delayed) + is_delayed=is_delayed, + ) - def modify_parameters(self, - detect_mult=None, - desired_min_tx=None, - required_min_rx=None): - """ modify session parameters """ + def modify_parameters( + self, detect_mult=None, desired_min_tx=None, required_min_rx=None + ): + """modify session parameters""" if detect_mult: self._detect_mult = detect_mult if desired_min_tx: self._desired_min_tx = desired_min_tx if required_min_rx: self._required_min_rx = required_min_rx - self.test.vapi.bfd_udp_mod(sw_if_index=self._interface.sw_if_index, - desired_min_tx=self.desired_min_tx, - required_min_rx=self.required_min_rx, - detect_mult=self.detect_mult, - local_addr=self.local_addr, - peer_addr=self.peer_addr) + self.test.vapi.bfd_udp_mod( + sw_if_index=self._interface.sw_if_index, + desired_min_tx=self.desired_min_tx, + required_min_rx=self.required_min_rx, + detect_mult=self.detect_mult, + local_addr=self.local_addr, + peer_addr=self.peer_addr, + ) def add_vpp_config(self): bfd_key_id = self._bfd_key_id if self._sha1_key else None conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None is_authenticated = True if self._sha1_key else False - self.test.vapi.bfd_udp_add(sw_if_index=self._interface.sw_if_index, - desired_min_tx=self.desired_min_tx, - required_min_rx=self.required_min_rx, - detect_mult=self.detect_mult, - local_addr=self.local_addr, - peer_addr=self.peer_addr, - bfd_key_id=bfd_key_id, - conf_key_id=conf_key_id, - is_authenticated=is_authenticated) + self.test.vapi.bfd_udp_add( + sw_if_index=self._interface.sw_if_index, + desired_min_tx=self.desired_min_tx, + required_min_rx=self.required_min_rx, + detect_mult=self.detect_mult, + local_addr=self.local_addr, + peer_addr=self.peer_addr, + bfd_key_id=bfd_key_id, + conf_key_id=conf_key_id, + is_authenticated=is_authenticated, + ) self._test.registry.register(self, self.test.logger) - def upd_vpp_config(self, - detect_mult=None, - desired_min_tx=None, - required_min_rx=None): + def upd_vpp_config( + self, detect_mult=None, desired_min_tx=None, required_min_rx=None + ): if desired_min_tx: self._desired_min_tx = desired_min_tx if required_min_rx: @@ -405,15 +439,17 @@ class VppBFDUDPSession(VppObject): bfd_key_id = self._bfd_key_id if self._sha1_key else None conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None is_authenticated = True if self._sha1_key else False - self.test.vapi.bfd_udp_upd(sw_if_index=self._interface.sw_if_index, - desired_min_tx=self.desired_min_tx, - required_min_rx=self.required_min_rx, - detect_mult=self.detect_mult, - local_addr=self.local_addr, - peer_addr=self.peer_addr, - bfd_key_id=bfd_key_id, - conf_key_id=conf_key_id, - is_authenticated=is_authenticated) + self.test.vapi.bfd_udp_upd( + sw_if_index=self._interface.sw_if_index, + desired_min_tx=self.desired_min_tx, + required_min_rx=self.required_min_rx, + detect_mult=self.detect_mult, + local_addr=self.local_addr, + peer_addr=self.peer_addr, + bfd_key_id=bfd_key_id, + conf_key_id=conf_key_id, + is_authenticated=is_authenticated, + ) self._test.registry.register(self, self.test.logger) def query_vpp_config(self): @@ -421,27 +457,34 @@ class VppBFDUDPSession(VppObject): return session is not None def remove_vpp_config(self): - self.test.vapi.bfd_udp_del(self._interface.sw_if_index, - local_addr=self.local_addr, - peer_addr=self.peer_addr) + self.test.vapi.bfd_udp_del( + self._interface.sw_if_index, + local_addr=self.local_addr, + peer_addr=self.peer_addr, + ) def object_id(self): - return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index, - self.local_addr, - self.peer_addr, - self.af) + return "bfd-udp-%s-%s-%s-%s" % ( + self._interface.sw_if_index, + self.local_addr, + self.peer_addr, + self.af, + ) def admin_up(self): - """ set bfd session admin-up """ + """set bfd session admin-up""" self.test.vapi.bfd_udp_session_set_flags( flags=VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_ADMIN_UP, sw_if_index=self._interface.sw_if_index, local_addr=self.local_addr, - peer_addr=self.peer_addr) + peer_addr=self.peer_addr, + ) def admin_down(self): - """ set bfd session admin-down """ + """set bfd session admin-down""" self.test.vapi.bfd_udp_session_set_flags( - flags=0, sw_if_index=self._interface.sw_if_index, + flags=0, + sw_if_index=self._interface.sw_if_index, local_addr=self.local_addr, - peer_addr=self.peer_addr) + peer_addr=self.peer_addr, + ) |