aboutsummaryrefslogtreecommitdiffstats
path: root/test/bfd.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/bfd.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/bfd.py')
-rw-r--r--test/bfd.py261
1 files changed, 152 insertions, 109 deletions
diff --git a/test/bfd.py b/test/bfd.py
index bbfa5945593..4189983b430 100644
--- a/test/bfd.py
+++ b/test/bfd.py
@@ -5,15 +5,22 @@ from socket import AF_INET, AF_INET6, inet_pton
from scapy.all import bind_layers
from scapy.layers.inet import UDP
from scapy.packet import Packet
-from scapy.fields import BitField, BitEnumField, XByteField, FlagsField,\
- ConditionalField, StrField
+from scapy.fields import (
+ BitField,
+ BitEnumField,
+ XByteField,
+ FlagsField,
+ ConditionalField,
+ StrField,
+)
from vpp_object import VppObject
from util import NumericConstant
from vpp_papi import VppEnum
class BFDDiagCode(NumericConstant):
- """ BFD Diagnostic Code """
+ """BFD Diagnostic Code"""
+
no_diagnostic = 0
control_detection_time_expired = 1
echo_function_failed = 2
@@ -38,7 +45,8 @@ class BFDDiagCode(NumericConstant):
class BFDState(NumericConstant):
- """ BFD State """
+ """BFD State"""
+
admin_down = 0
down = 1
init = 2
@@ -53,7 +61,8 @@ class BFDState(NumericConstant):
class BFDAuthType(NumericConstant):
- """ BFD Authentication Type """
+ """BFD Authentication Type"""
+
no_auth = 0
simple_pwd = 1
keyed_md5 = 2
@@ -72,34 +81,38 @@ class BFDAuthType(NumericConstant):
def bfd_is_auth_used(pkt):
- """ is packet authenticated? """
+ """is packet authenticated?"""
return "A" in pkt.sprintf("%BFD.flags%")
def bfd_is_simple_pwd_used(pkt):
- """ is simple password authentication used? """
+ """is simple password authentication used?"""
return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd
def bfd_is_sha1_used(pkt):
- """ is sha1 authentication used? """
- return bfd_is_auth_used(pkt) and pkt.auth_type in \
- (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1)
+ """is sha1 authentication used?"""
+ return bfd_is_auth_used(pkt) and pkt.auth_type in (
+ BFDAuthType.keyed_sha1,
+ BFDAuthType.meticulous_keyed_sha1,
+ )
def bfd_is_md5_used(pkt):
- """ is md5 authentication used? """
- return bfd_is_auth_used(pkt) and pkt.auth_type in \
- (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5)
+ """is md5 authentication used?"""
+ return bfd_is_auth_used(pkt) and pkt.auth_type in (
+ BFDAuthType.keyed_md5,
+ BFDAuthType.meticulous_keyed_md5,
+ )
def bfd_is_md5_or_sha1_used(pkt):
- """ is md5 or sha1 used? """
+ """is md5 or sha1 used?"""
return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)
class BFD(Packet):
- """ BFD protocol layer for scapy """
+ """BFD protocol layer for scapy"""
udp_dport = 3784 #: BFD destination port per RFC 5881
udp_dport_echo = 3785 # : BFD destination port for ECHO per RFC 5881
@@ -114,7 +127,7 @@ class BFD(Packet):
BitField("version", 1, 3),
BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
BitEnumField("state", 0, 2, BFDState.desc_dict),
- FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']),
+ FlagsField("flags", 0, 6, ["M", "D", "A", "C", "F", "P"]),
XByteField("detect_mult", 0),
BitField("length", bfd_pkt_len, 8),
BitField("my_discriminator", 0, 32),
@@ -123,22 +136,20 @@ class BFD(Packet):
BitField("required_min_rx_interval", 0, 32),
BitField("required_min_echo_rx_interval", 0, 32),
ConditionalField(
- BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict),
- bfd_is_auth_used),
+ BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict), bfd_is_auth_used
+ ),
ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
- ConditionalField(BitField("auth_reserved", 0, 8),
- bfd_is_md5_or_sha1_used),
- ConditionalField(
- BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
+ ConditionalField(BitField("auth_reserved", 0, 8), bfd_is_md5_or_sha1_used),
+ ConditionalField(BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
- ConditionalField(
- StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
+ ConditionalField(StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
]
def mysummary(self):
- return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
- "your_disc=%BFD.your_discriminator%)")
+ return self.sprintf(
+ "BFD(my_disc=%BFD.my_discriminator%, your_disc=%BFD.your_discriminator%)"
+ )
# glue the BFD packet class to scapy parser
@@ -146,7 +157,7 @@ bind_layers(UDP, BFD, dport=BFD.udp_dport)
class BFD_vpp_echo(Packet):
- """ BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one) """
+ """BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one)"""
udp_dport = 3785 #: BFD echo destination port per RFC 5881
name = "BFD_VPP_ECHO"
@@ -154,13 +165,14 @@ class BFD_vpp_echo(Packet):
fields_desc = [
BitField("discriminator", 0, 32),
BitField("expire_time_clocks", 0, 64),
- BitField("checksum", 0, 64)
+ BitField("checksum", 0, 64),
]
def mysummary(self):
return self.sprintf(
"BFD_VPP_ECHO(disc=%BFD_VPP_ECHO.discriminator%,"
- "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)")
+ "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)"
+ )
# glue the BFD echo packet class to scapy parser
@@ -168,7 +180,7 @@ bind_layers(UDP, BFD_vpp_echo, dport=BFD_vpp_echo.udp_dport)
class VppBFDAuthKey(VppObject):
- """ Represents BFD authentication key in VPP """
+ """Represents BFD authentication key in VPP"""
def __init__(self, test, conf_key_id, auth_type, key):
self._test = test
@@ -179,17 +191,17 @@ class VppBFDAuthKey(VppObject):
@property
def test(self):
- """ Test which created this key """
+ """Test which created this key"""
return self._test
@property
def auth_type(self):
- """ Authentication type for this key """
+ """Authentication type for this key"""
return self._auth_type
@property
def key(self):
- """ key data """
+ """key data"""
return self._key
@key.setter
@@ -198,17 +210,20 @@ class VppBFDAuthKey(VppObject):
@property
def conf_key_id(self):
- """ configuration key ID """
+ """configuration key ID"""
return self._conf_key_id
def add_vpp_config(self):
self.test.vapi.bfd_auth_set_key(
- conf_key_id=self._conf_key_id, auth_type=self._auth_type,
- key=self._key, key_len=len(self._key))
+ conf_key_id=self._conf_key_id,
+ auth_type=self._auth_type,
+ key=self._key,
+ key_len=len(self._key),
+ )
self._test.registry.register(self, self.test.logger)
def get_bfd_auth_keys_dump_entry(self):
- """ get the entry in the auth keys dump corresponding to this key """
+ """get the entry in the auth keys dump corresponding to this key"""
result = self.test.vapi.bfd_auth_keys_dump()
for k in result:
if k.conf_key_id == self._conf_key_id:
@@ -226,11 +241,22 @@ class VppBFDAuthKey(VppObject):
class VppBFDUDPSession(VppObject):
- """ Represents BFD UDP session in VPP """
-
- def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
- desired_min_tx=300000, required_min_rx=300000, detect_mult=3,
- sha1_key=None, bfd_key_id=None, is_tunnel=False):
+ """Represents BFD UDP session in VPP"""
+
+ def __init__(
+ self,
+ test,
+ interface,
+ peer_addr,
+ local_addr=None,
+ af=AF_INET,
+ desired_min_tx=300000,
+ required_min_rx=300000,
+ detect_mult=3,
+ sha1_key=None,
+ bfd_key_id=None,
+ is_tunnel=False,
+ ):
self._test = test
self._interface = interface
self._af = af
@@ -251,22 +277,22 @@ class VppBFDUDPSession(VppObject):
@property
def test(self):
- """ Test which created this session """
+ """Test which created this session"""
return self._test
@property
def interface(self):
- """ Interface on which this session lives """
+ """Interface on which this session lives"""
return self._interface
@property
def af(self):
- """ Address family - AF_INET or AF_INET6 """
+ """Address family - AF_INET or AF_INET6"""
return self._af
@property
def local_addr(self):
- """ BFD session local address (VPP address) """
+ """BFD session local address (VPP address)"""
if self._local_addr is None:
if self.af == AF_INET:
return self._interface.local_ip4
@@ -278,28 +304,32 @@ class VppBFDUDPSession(VppObject):
@property
def peer_addr(self):
- """ BFD session peer address """
+ """BFD session peer address"""
return self._peer_addr
def get_bfd_udp_session_dump_entry(self):
- """ get the namedtuple entry from bfd udp session dump """
+ """get the namedtuple entry from bfd udp session dump"""
result = self.test.vapi.bfd_udp_session_dump()
for s in result:
self.test.logger.debug("session entry: %s" % str(s))
if s.sw_if_index == self.interface.sw_if_index:
- if self.af == AF_INET \
- and self.interface.local_ip4 == str(s.local_addr) \
- and self.interface.remote_ip4 == str(s.peer_addr):
+ if (
+ self.af == AF_INET
+ and self.interface.local_ip4 == str(s.local_addr)
+ and self.interface.remote_ip4 == str(s.peer_addr)
+ ):
return s
- if self.af == AF_INET6 \
- and self.interface.local_ip6 == str(s.local_addr) \
- and self.interface.remote_ip6 == str(s.peer_addr):
+ if (
+ self.af == AF_INET6
+ and self.interface.local_ip6 == str(s.local_addr)
+ and self.interface.remote_ip6 == str(s.peer_addr)
+ ):
return s
return None
@property
def state(self):
- """ BFD session state """
+ """BFD session state"""
session = self.get_bfd_udp_session_dump_entry()
if session is None:
raise Exception("Could not find BFD session in VPP response")
@@ -307,27 +337,27 @@ class VppBFDUDPSession(VppObject):
@property
def desired_min_tx(self):
- """ desired minimum tx interval """
+ """desired minimum tx interval"""
return self._desired_min_tx
@property
def required_min_rx(self):
- """ required minimum rx interval """
+ """required minimum rx interval"""
return self._required_min_rx
@property
def detect_mult(self):
- """ detect multiplier """
+ """detect multiplier"""
return self._detect_mult
@property
def sha1_key(self):
- """ sha1 key """
+ """sha1 key"""
return self._sha1_key
@property
def bfd_key_id(self):
- """ bfd key id in use """
+ """bfd key id in use"""
return self._bfd_key_id
@property
@@ -335,7 +365,7 @@ class VppBFDUDPSession(VppObject):
return self._is_tunnel
def activate_auth(self, key, bfd_key_id=None, delayed=False):
- """ activate authentication for this session """
+ """activate authentication for this session"""
self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
self._sha1_key = key
conf_key_id = self._sha1_key.conf_key_id
@@ -346,10 +376,11 @@ class VppBFDUDPSession(VppObject):
peer_addr=self.peer_addr,
bfd_key_id=self._bfd_key_id,
conf_key_id=conf_key_id,
- is_delayed=is_delayed)
+ is_delayed=is_delayed,
+ )
def deactivate_auth(self, delayed=False):
- """ deactivate authentication """
+ """deactivate authentication"""
self._bfd_key_id = None
self._sha1_key = None
is_delayed = 1 if delayed else 0
@@ -357,45 +388,48 @@ class VppBFDUDPSession(VppObject):
sw_if_index=self._interface.sw_if_index,
local_addr=self.local_addr,
peer_addr=self.peer_addr,
- is_delayed=is_delayed)
+ is_delayed=is_delayed,
+ )
- def modify_parameters(self,
- detect_mult=None,
- desired_min_tx=None,
- required_min_rx=None):
- """ modify session parameters """
+ def modify_parameters(
+ self, detect_mult=None, desired_min_tx=None, required_min_rx=None
+ ):
+ """modify session parameters"""
if detect_mult:
self._detect_mult = detect_mult
if desired_min_tx:
self._desired_min_tx = desired_min_tx
if required_min_rx:
self._required_min_rx = required_min_rx
- self.test.vapi.bfd_udp_mod(sw_if_index=self._interface.sw_if_index,
- desired_min_tx=self.desired_min_tx,
- required_min_rx=self.required_min_rx,
- detect_mult=self.detect_mult,
- local_addr=self.local_addr,
- peer_addr=self.peer_addr)
+ self.test.vapi.bfd_udp_mod(
+ sw_if_index=self._interface.sw_if_index,
+ desired_min_tx=self.desired_min_tx,
+ required_min_rx=self.required_min_rx,
+ detect_mult=self.detect_mult,
+ local_addr=self.local_addr,
+ peer_addr=self.peer_addr,
+ )
def add_vpp_config(self):
bfd_key_id = self._bfd_key_id if self._sha1_key else None
conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
is_authenticated = True if self._sha1_key else False
- self.test.vapi.bfd_udp_add(sw_if_index=self._interface.sw_if_index,
- desired_min_tx=self.desired_min_tx,
- required_min_rx=self.required_min_rx,
- detect_mult=self.detect_mult,
- local_addr=self.local_addr,
- peer_addr=self.peer_addr,
- bfd_key_id=bfd_key_id,
- conf_key_id=conf_key_id,
- is_authenticated=is_authenticated)
+ self.test.vapi.bfd_udp_add(
+ sw_if_index=self._interface.sw_if_index,
+ desired_min_tx=self.desired_min_tx,
+ required_min_rx=self.required_min_rx,
+ detect_mult=self.detect_mult,
+ local_addr=self.local_addr,
+ peer_addr=self.peer_addr,
+ bfd_key_id=bfd_key_id,
+ conf_key_id=conf_key_id,
+ is_authenticated=is_authenticated,
+ )
self._test.registry.register(self, self.test.logger)
- def upd_vpp_config(self,
- detect_mult=None,
- desired_min_tx=None,
- required_min_rx=None):
+ def upd_vpp_config(
+ self, detect_mult=None, desired_min_tx=None, required_min_rx=None
+ ):
if desired_min_tx:
self._desired_min_tx = desired_min_tx
if required_min_rx:
@@ -405,15 +439,17 @@ class VppBFDUDPSession(VppObject):
bfd_key_id = self._bfd_key_id if self._sha1_key else None
conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
is_authenticated = True if self._sha1_key else False
- self.test.vapi.bfd_udp_upd(sw_if_index=self._interface.sw_if_index,
- desired_min_tx=self.desired_min_tx,
- required_min_rx=self.required_min_rx,
- detect_mult=self.detect_mult,
- local_addr=self.local_addr,
- peer_addr=self.peer_addr,
- bfd_key_id=bfd_key_id,
- conf_key_id=conf_key_id,
- is_authenticated=is_authenticated)
+ self.test.vapi.bfd_udp_upd(
+ sw_if_index=self._interface.sw_if_index,
+ desired_min_tx=self.desired_min_tx,
+ required_min_rx=self.required_min_rx,
+ detect_mult=self.detect_mult,
+ local_addr=self.local_addr,
+ peer_addr=self.peer_addr,
+ bfd_key_id=bfd_key_id,
+ conf_key_id=conf_key_id,
+ is_authenticated=is_authenticated,
+ )
self._test.registry.register(self, self.test.logger)
def query_vpp_config(self):
@@ -421,27 +457,34 @@ class VppBFDUDPSession(VppObject):
return session is not None
def remove_vpp_config(self):
- self.test.vapi.bfd_udp_del(self._interface.sw_if_index,
- local_addr=self.local_addr,
- peer_addr=self.peer_addr)
+ self.test.vapi.bfd_udp_del(
+ self._interface.sw_if_index,
+ local_addr=self.local_addr,
+ peer_addr=self.peer_addr,
+ )
def object_id(self):
- return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index,
- self.local_addr,
- self.peer_addr,
- self.af)
+ return "bfd-udp-%s-%s-%s-%s" % (
+ self._interface.sw_if_index,
+ self.local_addr,
+ self.peer_addr,
+ self.af,
+ )
def admin_up(self):
- """ set bfd session admin-up """
+ """set bfd session admin-up"""
self.test.vapi.bfd_udp_session_set_flags(
flags=VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_ADMIN_UP,
sw_if_index=self._interface.sw_if_index,
local_addr=self.local_addr,
- peer_addr=self.peer_addr)
+ peer_addr=self.peer_addr,
+ )
def admin_down(self):
- """ set bfd session admin-down """
+ """set bfd session admin-down"""
self.test.vapi.bfd_udp_session_set_flags(
- flags=0, sw_if_index=self._interface.sw_if_index,
+ flags=0,
+ sw_if_index=self._interface.sw_if_index,
local_addr=self.local_addr,
- peer_addr=self.peer_addr)
+ peer_addr=self.peer_addr,
+ )