diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_vrrp.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_vrrp.py')
-rw-r--r-- | test/test_vrrp.py | 626 |
1 files changed, 362 insertions, 264 deletions
diff --git a/test/test_vrrp.py b/test/test_vrrp.py index 6a62a88c2a1..9319b0fa6da 100644 --- a/test/test_vrrp.py +++ b/test/test_vrrp.py @@ -17,9 +17,18 @@ from vpp_papi import VppEnum from scapy.packet import raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, ICMP, icmptypes -from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \ - ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \ - ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply +from scapy.layers.inet6 import ( + IPv6, + ipv6nh, + IPv6ExtHdrHopByHop, + ICMPv6MLReport2, + ICMPv6ND_NA, + ICMPv6ND_NS, + ICMPv6NDOptDstLLAddr, + ICMPv6NDOptSrcLLAddr, + ICMPv6EchoRequest, + ICMPv6EchoReply, +) from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3 from scapy.utils6 import in6_getnsma, in6_getnsmac @@ -37,11 +46,11 @@ VRRP_VR_STATE_BACKUP = 1 VRRP_VR_STATE_MASTER = 2 VRRP_VR_STATE_INTF_DOWN = 3 -VRRP_INDEX_INVALID = 0xffffffff +VRRP_INDEX_INVALID = 0xFFFFFFFF def is_non_arp(p): - """ Want to filter out advertisements, igmp, etc""" + """Want to filter out advertisements, igmp, etc""" if p.haslayer(ARP): return False @@ -49,7 +58,7 @@ def is_non_arp(p): def is_not_adv(p): - """ Filter out everything but advertisements. E.g. multicast RD/ND """ + """Filter out everything but advertisements. E.g. multicast RD/ND""" if p.haslayer(VRRPv3): return False @@ -57,7 +66,7 @@ def is_not_adv(p): def is_not_echo_reply(p): - """ filter out advertisements and other while waiting for echo reply """ + """filter out advertisements and other while waiting for echo reply""" if p.haslayer(IP) and p.haslayer(ICMP): if icmptypes[p[ICMP].type] == "echo-reply": return False @@ -68,15 +77,16 @@ def is_not_echo_reply(p): class VppVRRPVirtualRouter(VppObject): - - def __init__(self, - test, - intf, - vr_id, - prio=100, - intvl=100, - flags=VRRP_VR_FLAG_PREEMPT, - vips=None): + def __init__( + self, + test, + intf, + vr_id, + prio=100, + intvl=100, + flags=VRRP_VR_FLAG_PREEMPT, + vips=None, + ): self._test = test self._intf = intf self._sw_if_index = self._intf.sw_if_index @@ -84,40 +94,44 @@ class VppVRRPVirtualRouter(VppObject): self._prio = prio self._intvl = intvl self._flags = flags - if (flags & VRRP_VR_FLAG_IPV6): + if flags & VRRP_VR_FLAG_IPV6: self._is_ipv6 = 1 self._adv_dest_mac = "33:33:00:00:00:12" self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id self._adv_dest_ip = "ff02::12" - self._vips = ([intf.local_ip6] if vips is None else vips) + self._vips = [intf.local_ip6] if vips is None else vips else: self._is_ipv6 = 0 self._adv_dest_mac = "01:00:5e:00:00:12" self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id self._adv_dest_ip = "224.0.0.18" - self._vips = ([intf.local_ip4] if vips is None else vips) + self._vips = [intf.local_ip4] if vips is None else vips self._tracked_ifs = [] self._vrrp_index = VRRP_INDEX_INVALID def add_vpp_config(self): - self._test.vapi.vrrp_vr_add_del(is_add=1, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - priority=self._prio, - interval=self._intvl, - flags=self._flags, - n_addrs=len(self._vips), - addrs=self._vips) + self._test.vapi.vrrp_vr_add_del( + is_add=1, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) def update_vpp_config(self): - r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - priority=self._prio, - interval=self._intvl, - flags=self._flags, - n_addrs=len(self._vips), - addrs=self._vips) + r = self._test.vapi.vrrp_vr_update( + vrrp_index=self._vrrp_index, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) self._vrrp_index = r.vrrp_index def delete_vpp_config(self): @@ -129,7 +143,7 @@ class VppVRRPVirtualRouter(VppObject): if vr.config.vr_id != self._vr_id: continue - is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0) + is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0 if is_ipv6 != self._is_ipv6: continue @@ -138,41 +152,45 @@ class VppVRRPVirtualRouter(VppObject): return None def remove_vpp_config(self): - self._test.vapi.vrrp_vr_add_del(is_add=0, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - priority=self._prio, - interval=self._intvl, - flags=self._flags, - n_addrs=len(self._vips), - addrs=self._vips) + self._test.vapi.vrrp_vr_add_del( + is_add=0, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + priority=self._prio, + interval=self._intvl, + flags=self._flags, + n_addrs=len(self._vips), + addrs=self._vips, + ) def start_stop(self, is_start): - self._test.vapi.vrrp_vr_start_stop(is_start=is_start, - sw_if_index=self._intf.sw_if_index, - vr_id=self._vr_id, - is_ipv6=self._is_ipv6) - self._start_time = (time.time() if is_start else None) + self._test.vapi.vrrp_vr_start_stop( + is_start=is_start, + sw_if_index=self._intf.sw_if_index, + vr_id=self._vr_id, + is_ipv6=self._is_ipv6, + ) + self._start_time = time.time() if is_start else None def add_del_tracked_interface(self, is_add, sw_if_index, prio): args = { - 'sw_if_index': self._intf.sw_if_index, - 'is_ipv6': self._is_ipv6, - 'vr_id': self._vr_id, - 'is_add': is_add, - 'n_ifs': 1, - 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}] + "sw_if_index": self._intf.sw_if_index, + "is_ipv6": self._is_ipv6, + "vr_id": self._vr_id, + "is_add": is_add, + "n_ifs": 1, + "ifs": [{"sw_if_index": sw_if_index, "priority": prio}], } self._test.vapi.vrrp_vr_track_if_add_del(**args) - self._tracked_ifs.append(args['ifs'][0]) + self._tracked_ifs.append(args["ifs"][0]) def set_unicast_peers(self, addrs): args = { - 'sw_if_index': self._intf.sw_if_index, - 'is_ipv6': self._is_ipv6, - 'vr_id': self._vr_id, - 'n_addrs': len(addrs), - 'addrs': addrs + "sw_if_index": self._intf.sw_if_index, + "is_ipv6": self._is_ipv6, + "vr_id": self._vr_id, + "n_addrs": len(addrs), + "addrs": addrs, } self._test.vapi.vrrp_vr_set_peers(**args) self._unicast_peers = addrs @@ -210,21 +228,22 @@ class VppVRRPVirtualRouter(VppObject): def master_down_seconds(self): vr_details = self.query_vpp_config() - return (vr_details.runtime.master_down_int * 0.01) + return vr_details.runtime.master_down_int * 0.01 def vrrp_adv_packet(self, prio=None, src_ip=None): dst_ip = self._adv_dest_ip if prio is None: prio = self._prio eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac) - vrrp = VRRPv3(vrid=self._vr_id, priority=prio, - ipcount=len(self._vips), adv=self._intvl) + vrrp = VRRPv3( + vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl + ) if self._is_ipv6: - src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip) + src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255) vrrp.addrlist = self._vips else: - src_ip = (self._intf.local_ip4 if src_ip is None else src_ip) + src_ip = self._intf.local_ip4 if src_ip is None else src_ip ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0) vrrp.addrlist = self._vips @@ -234,7 +253,7 @@ class VppVRRPVirtualRouter(VppObject): class TestVRRP4(VppTestCase): - """ IPv4 VRRP Test Case """ + """IPv4 VRRP Test Case""" @classmethod def setUpClass(cls): @@ -284,8 +303,7 @@ class TestVRRP4(VppTestCase): self.assertEqual(ip.proto, 2) igmp = pkt[IGMPv3] - self.assertEqual(IGMPv3.igmpv3types[igmp.type], - "Version 3 Membership Report") + self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report") igmpmr = pkt[IGMPv3mr] self.assertEqual(igmpmr.numgrp, 1) @@ -330,15 +348,15 @@ class TestVRRP4(VppTestCase): # become master and start advertising immediately. @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv(self): - """ IPv4 Master VR advertises """ + """IPv4 Master VR advertises""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.add_vpp_config() vr.start_stop(is_start=1) @@ -362,25 +380,30 @@ class TestVRRP4(VppTestCase): # of parameters to test that too @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv_update(self): - """ IPv4 Master VR adv + Update to Backup """ + """IPv4 Master VR adv + Update to Backup""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.update_vpp_config() vr.start_stop(is_start=1) self.logger.info(self.vapi.cli("show vrrp vr")) # Update VR with lower prio and larger interval # we need to keep old VR for the adv checks - upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=100, intvl=2*intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip4]) + upd_vr = VppVRRPVirtualRouter( + self, + self.pg0, + 100, + prio=100, + intvl=2 * intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip4], + ) upd_vr._vrrp_index = vr._vrrp_index upd_vr.update_vpp_config() start_time = time.time() @@ -403,7 +426,7 @@ class TestVRRP4(VppTestCase): src_ip = self.pg0.remote_ip4 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] while time.time() < end_time: - self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl*0.01) + self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01) self.logger.info(self.vapi.cli("show trace")) upd_vr.start_stop(is_start=0) @@ -413,7 +436,7 @@ class TestVRRP4(VppTestCase): # long as it receives higher priority advertisements @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noadv(self): - """ IPv4 Backup VR does not advertise """ + """IPv4 Backup VR does not advertise""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -421,10 +444,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip4]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip4], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -440,7 +468,7 @@ class TestVRRP4(VppTestCase): # send higher prio advertisements, should not receive any src_ip = self.pg0.remote_ip4 - pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)] while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -451,16 +479,16 @@ class TestVRRP4(VppTestCase): self._vrs = [] def test_vrrp4_master_arp(self): - """ IPv4 Master VR replies to ARP """ + """IPv4 Master VR replies to ARP""" self.pg_start() # VR virtual IP is the default, which is the pg local IP vr_id = 100 prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) self._vrs.append(vr) vr.add_vpp_config() @@ -484,7 +512,7 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noarp(self): - """ IPv4 Backup VR ignores ARP """ + """IPv4 Backup VR ignores ARP""" # We need an address for a virtual IP that is not the IP that # ARP requests will originate from @@ -492,16 +520,24 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[1].ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() - arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / - ARP(op=ARP.who_has, pdst=vip, - psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac)) + arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( + op=ARP.who_has, + pdst=vip, + psrc=self.pg0.remote_ip4, + hwsrc=self.pg0.remote_mac, + ) # Before the VR is started make sure no reply to request for VIP self.pg_start() @@ -510,7 +546,7 @@ class TestVRRP4(VppTestCase): # VR should start in backup state and still should not reply to ARP # send a higher priority adv to make sure it does not become master - adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4) + adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4) vr.start_stop(is_start=1) self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1) @@ -520,17 +556,22 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_election(self): - """ IPv4 Backup VR becomes master if no advertisements received """ + """IPv4 Backup VR becomes master if no advertisements received""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -557,17 +598,22 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_preempts(self): - """ IPv4 Backup VR preempts lower priority master """ + """IPv4 Backup VR preempts lower priority master""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -583,7 +629,7 @@ class TestVRRP4(VppTestCase): # send lower prio advertisements until timer expires src_ip = self.pg0.remote_ip4 - pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)] while time.time() + intvl_s < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -595,7 +641,7 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_preempted(self): - """ IPv4 Master VR preempted by higher priority backup """ + """IPv4 Master VR preempted by higher priority backup""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -603,10 +649,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -632,7 +683,7 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_disabled(self): - """ IPv4 Master VR does not reply for VIP w/ accept mode off """ + """IPv4 Master VR does not reply for VIP w/ accept mode off""" # accept mode only matters when prio < 255, so it will have to # come up as a backup and take over as master after the timeout @@ -640,10 +691,15 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -661,9 +717,11 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IP(dst=vip, src=self.pg0.remote_ip4) / - ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IP(dst=vip, src=self.pg0.remote_ip4) + / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. none should be received @@ -672,7 +730,7 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_enabled(self): - """ IPv4 Master VR replies for VIP w/ accept mode on """ + """IPv4 Master VR replies for VIP w/ accept mode on""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -680,11 +738,10 @@ class TestVRRP4(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 - flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT) - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() @@ -702,15 +759,18 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IP(dst=vip, src=self.pg0.remote_ip4) / - ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IP(dst=vip, src=self.pg0.remote_ip4) + / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. time.sleep(1) - rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, - filter_out_fn=is_not_echo_reply) + rx_pkts = self.pg0.get_capture( + expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply + ) self.assertEqual(rx_pkts[0][IP].src, vip) self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4) @@ -720,17 +780,22 @@ class TestVRRP4(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_intf_tracking(self): - """ IPv4 Master VR adjusts priority based on tracked interface """ + """IPv4 Master VR adjusts priority based on tracked interface""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip4 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -740,9 +805,9 @@ class TestVRRP4(VppTestCase): # add pg1 as a tracked interface and start the VR adjustment = 50 adjusted_prio = prio - adjustment - vr.add_del_tracked_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index, - prio=adjustment) + vr.add_del_tracked_interface( + is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment + ) vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @@ -751,53 +816,47 @@ class TestVRRP4(VppTestCase): # tracked intf is up -> advertised priority == configured priority self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # take down pg1, verify priority is now being adjusted self.pg1.admin_down() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # bring up pg1, verify priority now matches configured value self.pg1.admin_up() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # remove IP address from pg1, verify priority now being adjusted self.pg1.unconfig_ip4() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # add IP address to pg1, verify priority now matches configured value self.pg1.config_ip4() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv_unicast(self): - """ IPv4 Master VR advertises (unicast) """ + """IPv4 Master VR advertises (unicast)""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip4 - flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) + flags = self._default_flags | VRRP_VR_FLAG_UNICAST unicast_peer = self.pg0.remote_hosts[4] - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() vr.set_unicast_peers([unicast_peer.ip4]) @@ -810,8 +869,7 @@ class TestVRRP4(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertTrue(rx.haslayer(Ether)) self.assertTrue(rx.haslayer(IP)) @@ -827,7 +885,7 @@ class TestVRRP4(VppTestCase): class TestVRRP6(VppTestCase): - """ IPv6 VRRP Test Case """ + """IPv6 VRRP Test Case""" @classmethod def setUpClass(cls): @@ -849,7 +907,7 @@ class TestVRRP6(VppTestCase): i.configure_ipv6_neighbors() self._vrs = [] - self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT) + self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT self._default_adv = 100 def tearDown(self): @@ -922,15 +980,15 @@ class TestVRRP6(VppTestCase): # become master and start advertising immediately. @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_adv(self): - """ IPv6 Master VR advertises """ + """IPv6 Master VR advertises""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) self._vrs.append(vr) vr.add_vpp_config() @@ -956,25 +1014,30 @@ class TestVRRP6(VppTestCase): # of parameters to test that too @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_adv_update(self): - """ IPv6 Master VR adv + Update to Backup """ + """IPv6 Master VR adv + Update to Backup""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.update_vpp_config() vr.start_stop(is_start=1) self.logger.info(self.vapi.cli("show vrrp vr")) # Update VR with lower prio and larger interval # we need to keep old VR for the adv checks - upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=100, intvl=2*intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip6]) + upd_vr = VppVRRPVirtualRouter( + self, + self.pg0, + 100, + prio=100, + intvl=2 * intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip6], + ) upd_vr._vrrp_index = vr._vrrp_index upd_vr.update_vpp_config() start_time = time.time() @@ -1000,7 +1063,8 @@ class TestVRRP6(VppTestCase): end_time = start_time + 2 * upd_vr.master_down_seconds() while time.time() < end_time: self.send_and_assert_no_replies( - self.pg0, pkts, timeout=0.01*upd_vr._intvl) + self.pg0, pkts, timeout=0.01 * upd_vr._intvl + ) self.logger.info(self.vapi.cli("show trace")) vr.start_stop(is_start=0) @@ -1010,7 +1074,7 @@ class TestVRRP6(VppTestCase): # long as it receives higher priority advertisements @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_noadv(self): - """ IPv6 Backup VR does not advertise """ + """IPv6 Backup VR does not advertise""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1018,10 +1082,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[self.pg0.remote_ip6]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[self.pg0.remote_ip6], + ) vr.add_vpp_config() self._vrs.append(vr) @@ -1037,7 +1106,7 @@ class TestVRRP6(VppTestCase): # send higher prio advertisements, should not see VPP send any src_ip = self.pg0.remote_ip6_ll num_advs = 5 - pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)] self.logger.info(self.vapi.cli("show vlib graph")) while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) @@ -1050,16 +1119,16 @@ class TestVRRP6(VppTestCase): self._vrs = [] def test_vrrp6_master_nd(self): - """ IPv6 Master VR replies to NDP """ + """IPv6 Master VR replies to NDP""" self.pg_start() # VR virtual IP is the default, which is the pg local IP vr_id = 100 prio = 255 intvl = self._default_adv - vr = VppVRRPVirtualRouter(self, self.pg0, 100, - prio=prio, intvl=intvl, - flags=self._default_flags) + vr = VppVRRPVirtualRouter( + self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags + ) vr.add_vpp_config() self._vrs.append(vr) @@ -1082,7 +1151,7 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_nond(self): - """ IPv6 Backup VR ignores NDP """ + """IPv6 Backup VR ignores NDP""" # We need an address for a virtual IP that is not the IP that # ARP requests will originate from @@ -1091,10 +1160,15 @@ class TestVRRP6(VppTestCase): intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_hosts[1].ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) vr.add_vpp_config() self._vrs.append(vr) @@ -1102,36 +1176,43 @@ class TestVRRP6(VppTestCase): dmac = in6_getnsmac(nsma) dst_ip = inet_ntop(socket.AF_INET6, nsma) - ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) / - IPv6(dst=dst_ip, src=self.pg0.remote_ip6) / - ICMPv6ND_NS(tgt=vip) / - ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + ndp_req = ( + Ether(dst=dmac, src=self.pg0.remote_mac) + / IPv6(dst=dst_ip, src=self.pg0.remote_ip6) + / ICMPv6ND_NS(tgt=vip) + / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac) + ) # Before the VR is started make sure no reply to request for VIP self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1) # VR should start in backup state and still should not reply to NDP # send a higher priority adv to make sure it does not become master - adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6) + adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6) pkts = [adv, ndp_req] vr.start_stop(is_start=1) - self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) + self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) vr.start_stop(is_start=0) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_election(self): - """ IPv6 Backup VR becomes master if no advertisements received """ + """IPv6 Backup VR becomes master if no advertisements received""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1158,17 +1239,22 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_backup_preempts(self): - """ IPv6 Backup VR preempts lower priority master """ + """IPv6 Backup VR preempts lower priority master""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1184,7 +1270,7 @@ class TestVRRP6(VppTestCase): # send lower prio advertisements until timer expires src_ip = self.pg0.remote_ip6 - pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] + pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)] while (time.time() + intvl_s) < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) @@ -1196,7 +1282,7 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_preempted(self): - """ IPv6 Master VR preempted by higher priority backup """ + """IPv6 Master VR preempted by higher priority backup""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -1204,10 +1290,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1233,7 +1324,7 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_accept_mode_disabled(self): - """ IPv6 Master VR does not reply for VIP w/ accept mode off """ + """IPv6 Master VR does not reply for VIP w/ accept mode off""" # accept mode only matters when prio < 255, so it will have to # come up as a backup and take over as master after the timeout @@ -1241,10 +1332,15 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1262,9 +1358,11 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMPv6 echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IPv6(dst=vip, src=self.pg0.remote_ip6) / - ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IPv6(dst=vip, src=self.pg0.remote_ip6) + / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index) + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. none should be received @@ -1273,7 +1371,7 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_accept_mode_enabled(self): - """ IPv6 Master VR replies for VIP w/ accept mode on """ + """IPv6 Master VR replies for VIP w/ accept mode on""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over @@ -1281,11 +1379,10 @@ class TestVRRP6(VppTestCase): prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip6 - flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT) - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + flags = self._default_flags | VRRP_VR_FLAG_ACCEPT + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1303,15 +1400,18 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address - echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / - IPv6(dst=vip, src=self.pg0.remote_ip6) / - ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) + echo = ( + Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) + / IPv6(dst=vip, src=self.pg0.remote_ip6) + / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index) + ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. time.sleep(1) - rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, - filter_out_fn=is_not_echo_reply) + rx_pkts = self.pg0.get_capture( + expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply + ) self.assertEqual(rx_pkts[0][IPv6].src, vip) self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6) @@ -1320,17 +1420,22 @@ class TestVRRP6(VppTestCase): @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_intf_tracking(self): - """ IPv6 Master VR adjusts priority based on tracked interface """ + """IPv6 Master VR adjusts priority based on tracked interface""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip6 - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=self._default_flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, + self.pg0, + vr_id, + prio=prio, + intvl=intvl, + flags=self._default_flags, + vips=[vip], + ) self._vrs.append(vr) vr.add_vpp_config() @@ -1340,9 +1445,9 @@ class TestVRRP6(VppTestCase): # add pg1 as a tracked interface and start the VR adjustment = 50 adjusted_prio = prio - adjustment - vr.add_del_tracked_interface(is_add=1, - sw_if_index=self.pg1.sw_if_index, - prio=adjustment) + vr.add_del_tracked_interface( + is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment + ) vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @@ -1351,53 +1456,47 @@ class TestVRRP6(VppTestCase): # tracked intf is up -> advertised priority == configured priority self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # take down pg1, verify priority is now being adjusted self.pg1.admin_down() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # bring up pg1, verify priority now matches configured value self.pg1.admin_up() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # remove IP address from pg1, verify priority now being adjusted self.pg1.unconfig_ip6() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # add IP address to pg1, verify priority now matches configured value self.pg1.config_ip6() self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp6_master_adv_unicast(self): - """ IPv6 Master VR advertises (unicast) """ + """IPv6 Master VR advertises (unicast)""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip6 - flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) + flags = self._default_flags | VRRP_VR_FLAG_UNICAST unicast_peer = self.pg0.remote_hosts[4] - vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, - prio=prio, intvl=intvl, - flags=flags, - vips=[vip]) + vr = VppVRRPVirtualRouter( + self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] + ) self._vrs.append(vr) vr.add_vpp_config() vr.set_unicast_peers([unicast_peer.ip6]) @@ -1410,23 +1509,22 @@ class TestVRRP6(VppTestCase): vr.assert_state_equals(VRRP_VR_STATE_MASTER) self.pg0.enable_capture() - rx = self.pg0.wait_for_packet(timeout=intvl_s, - filter_out_fn=is_not_adv) + rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertTrue(rx.haslayer(Ether)) self.assertTrue(rx.haslayer(IPv6)) self.assertTrue(rx.haslayer(VRRPv3)) self.assertEqual(rx[Ether].src, self.pg0.local_mac) self.assertEqual(rx[Ether].dst, unicast_peer.mac) - self.assertEqual(ip6_normalize(rx[IPv6].src), - ip6_normalize(self.pg0.local_ip6_ll)) - self.assertEqual(ip6_normalize(rx[IPv6].dst), - ip6_normalize(unicast_peer.ip6)) + self.assertEqual( + ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll) + ) + self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6)) self.assertEqual(rx[VRRPv3].vrid, vr_id) self.assertEqual(rx[VRRPv3].priority, prio) self.assertEqual(rx[VRRPv3].ipcount, 1) self.assertEqual(rx[VRRPv3].addrlist, [vip]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |