aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_vrrp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_vrrp.py')
-rw-r--r--test/test_vrrp.py723
1 files changed, 481 insertions, 242 deletions
diff --git a/test/test_vrrp.py b/test/test_vrrp.py
index 241ca0d3cf4..8575016c326 100644
--- a/test/test_vrrp.py
+++ b/test/test_vrrp.py
@@ -12,18 +12,28 @@ import socket
from socket import inet_pton, inet_ntop
from vpp_object import VppObject
-from vpp_papi import VppEnum
from scapy.packet import raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, ICMP, icmptypes
-from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \
- ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \
- ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply
-from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
+from scapy.layers.inet6 import (
+ IPv6,
+ ipv6nh,
+ IPv6ExtHdrHopByHop,
+ ICMPv6MLReport2,
+ ICMPv6ND_NA,
+ ICMPv6ND_NS,
+ ICMPv6NDOptDstLLAddr,
+ ICMPv6NDOptSrcLLAddr,
+ ICMPv6EchoRequest,
+ ICMPv6EchoReply,
+)
+from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr
from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
from scapy.utils6 import in6_getnsma, in6_getnsmac
-from framework import VppTestCase, VppTestRunner, running_extended_tests
+from config import config
+from framework import VppTestCase
+from asfframework import VppTestRunner
from util import ip6_normalize
VRRP_VR_FLAG_PREEMPT = 1
@@ -36,9 +46,11 @@ VRRP_VR_STATE_BACKUP = 1
VRRP_VR_STATE_MASTER = 2
VRRP_VR_STATE_INTF_DOWN = 3
+VRRP_INDEX_INVALID = 0xFFFFFFFF
+
def is_non_arp(p):
- """ Want to filter out advertisements, igmp, etc"""
+ """Want to filter out advertisements, igmp, etc"""
if p.haslayer(ARP):
return False
@@ -46,7 +58,7 @@ def is_non_arp(p):
def is_not_adv(p):
- """ Filter out everything but advertisements. E.g. multicast RD/ND """
+ """Filter out everything but advertisements. E.g. multicast RD/ND"""
if p.haslayer(VRRPv3):
return False
@@ -54,7 +66,7 @@ def is_not_adv(p):
def is_not_echo_reply(p):
- """ filter out advertisements and other while waiting for echo reply """
+ """filter out advertisements and other while waiting for echo reply"""
if p.haslayer(IP) and p.haslayer(ICMP):
if icmptypes[p[ICMP].type] == "echo-reply":
return False
@@ -65,15 +77,16 @@ def is_not_echo_reply(p):
class VppVRRPVirtualRouter(VppObject):
-
- def __init__(self,
- test,
- intf,
- vr_id,
- prio=100,
- intvl=100,
- flags=VRRP_VR_FLAG_PREEMPT,
- vips=None):
+ def __init__(
+ self,
+ test,
+ intf,
+ vr_id,
+ prio=100,
+ intvl=100,
+ flags=VRRP_VR_FLAG_PREEMPT,
+ vips=None,
+ ):
self._test = test
self._intf = intf
self._sw_if_index = self._intf.sw_if_index
@@ -81,29 +94,48 @@ class VppVRRPVirtualRouter(VppObject):
self._prio = prio
self._intvl = intvl
self._flags = flags
- if (flags & VRRP_VR_FLAG_IPV6):
+ if flags & VRRP_VR_FLAG_IPV6:
self._is_ipv6 = 1
self._adv_dest_mac = "33:33:00:00:00:12"
self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
self._adv_dest_ip = "ff02::12"
- self._vips = ([intf.local_ip6] if vips is None else vips)
+ self._vips = [intf.local_ip6] if vips is None else vips
else:
self._is_ipv6 = 0
self._adv_dest_mac = "01:00:5e:00:00:12"
self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
self._adv_dest_ip = "224.0.0.18"
- self._vips = ([intf.local_ip4] if vips is None else vips)
+ self._vips = [intf.local_ip4] if vips is None else vips
self._tracked_ifs = []
+ self._vrrp_index = VRRP_INDEX_INVALID
def add_vpp_config(self):
- self._test.vapi.vrrp_vr_add_del(is_add=1,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- priority=self._prio,
- interval=self._intvl,
- flags=self._flags,
- n_addrs=len(self._vips),
- addrs=self._vips)
+ self._test.vapi.vrrp_vr_add_del(
+ is_add=1,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
+
+ def update_vpp_config(self):
+ r = self._test.vapi.vrrp_vr_update(
+ vrrp_index=self._vrrp_index,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
+ self._vrrp_index = r.vrrp_index
+
+ def delete_vpp_config(self):
+ self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
def query_vpp_config(self):
vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
@@ -111,7 +143,7 @@ class VppVRRPVirtualRouter(VppObject):
if vr.config.vr_id != self._vr_id:
continue
- is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0)
+ is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
if is_ipv6 != self._is_ipv6:
continue
@@ -120,41 +152,45 @@ class VppVRRPVirtualRouter(VppObject):
return None
def remove_vpp_config(self):
- self._test.vapi.vrrp_vr_add_del(is_add=0,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- priority=self._prio,
- interval=self._intvl,
- flags=self._flags,
- n_addrs=len(self._vips),
- addrs=self._vips)
+ self._test.vapi.vrrp_vr_add_del(
+ is_add=0,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ priority=self._prio,
+ interval=self._intvl,
+ flags=self._flags,
+ n_addrs=len(self._vips),
+ addrs=self._vips,
+ )
def start_stop(self, is_start):
- self._test.vapi.vrrp_vr_start_stop(is_start=is_start,
- sw_if_index=self._intf.sw_if_index,
- vr_id=self._vr_id,
- is_ipv6=self._is_ipv6)
- self._start_time = (time.time() if is_start else None)
+ self._test.vapi.vrrp_vr_start_stop(
+ is_start=is_start,
+ sw_if_index=self._intf.sw_if_index,
+ vr_id=self._vr_id,
+ is_ipv6=self._is_ipv6,
+ )
+ self._start_time = time.time() if is_start else None
def add_del_tracked_interface(self, is_add, sw_if_index, prio):
args = {
- 'sw_if_index': self._intf.sw_if_index,
- 'is_ipv6': self._is_ipv6,
- 'vr_id': self._vr_id,
- 'is_add': is_add,
- 'n_ifs': 1,
- 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}]
+ "sw_if_index": self._intf.sw_if_index,
+ "is_ipv6": self._is_ipv6,
+ "vr_id": self._vr_id,
+ "is_add": is_add,
+ "n_ifs": 1,
+ "ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
}
self._test.vapi.vrrp_vr_track_if_add_del(**args)
- self._tracked_ifs.append(args['ifs'][0])
+ self._tracked_ifs.append(args["ifs"][0])
def set_unicast_peers(self, addrs):
args = {
- 'sw_if_index': self._intf.sw_if_index,
- 'is_ipv6': self._is_ipv6,
- 'vr_id': self._vr_id,
- 'n_addrs': len(addrs),
- 'addrs': addrs
+ "sw_if_index": self._intf.sw_if_index,
+ "is_ipv6": self._is_ipv6,
+ "vr_id": self._vr_id,
+ "n_addrs": len(addrs),
+ "addrs": addrs,
}
self._test.vapi.vrrp_vr_set_peers(**args)
self._unicast_peers = addrs
@@ -192,21 +228,22 @@ class VppVRRPVirtualRouter(VppObject):
def master_down_seconds(self):
vr_details = self.query_vpp_config()
- return (vr_details.runtime.master_down_int * 0.01)
+ return vr_details.runtime.master_down_int * 0.01
def vrrp_adv_packet(self, prio=None, src_ip=None):
dst_ip = self._adv_dest_ip
if prio is None:
prio = self._prio
eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
- vrrp = VRRPv3(vrid=self._vr_id, priority=prio,
- ipcount=len(self._vips), adv=self._intvl)
+ vrrp = VRRPv3(
+ vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
+ )
if self._is_ipv6:
- src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip)
+ src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip
ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
vrrp.addrlist = self._vips
else:
- src_ip = (self._intf.local_ip4 if src_ip is None else src_ip)
+ src_ip = self._intf.local_ip4 if src_ip is None else src_ip
ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
vrrp.addrlist = self._vips
@@ -215,9 +252,8 @@ class VppVRRPVirtualRouter(VppObject):
return pkt
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
class TestVRRP4(VppTestCase):
- """ IPv4 VRRP Test Case """
+ """IPv4 VRRP Test Case"""
@classmethod
def setUpClass(cls):
@@ -267,8 +303,7 @@ class TestVRRP4(VppTestCase):
self.assertEqual(ip.proto, 2)
igmp = pkt[IGMPv3]
- self.assertEqual(IGMPv3.igmpv3types[igmp.type],
- "Version 3 Membership Report")
+ self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
igmpmr = pkt[IGMPv3mr]
self.assertEqual(igmpmr.numgrp, 1)
@@ -311,16 +346,17 @@ class TestVRRP4(VppTestCase):
# VR with priority 255 owns the virtual address and should
# become master and start advertising immediately.
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv(self):
- """ IPv4 Master VR advertises """
+ """IPv4 Master VR advertises"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=prio, intvl=intvl,
- flags=self._default_flags)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.add_vpp_config()
vr.start_stop(is_start=1)
@@ -340,10 +376,67 @@ class TestVRRP4(VppTestCase):
vr.remove_vpp_config()
self._vrs = []
+ # Same as above but with the update API, and add a change
+ # of parameters to test that too
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vrrp4_master_adv_update(self):
+ """IPv4 Master VR adv + Update to Backup"""
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ prio = 255
+ intvl = self._default_adv
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
+
+ vr.update_vpp_config()
+ vr.start_stop(is_start=1)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ # Update VR with lower prio and larger interval
+ # we need to keep old VR for the adv checks
+ upd_vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ 100,
+ prio=100,
+ intvl=2 * intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip4],
+ )
+ upd_vr._vrrp_index = vr._vrrp_index
+ upd_vr.update_vpp_config()
+ start_time = time.time()
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ self._vrs = [upd_vr]
+
+ pkts = self.pg0.get_capture(5)
+ # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
+ self.verify_vrrp4_igmp(pkts[0])
+ self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
+ self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
+ # Master -> Init: Adv with priority 0 sent to force an election
+ self.verify_vrrp4_adv(pkts[3], vr, prio=0)
+ # Init -> Backup: An IGMP join should be sent
+ self.verify_vrrp4_igmp(pkts[4])
+
+ # send higher prio advertisements, should not receive any
+ end_time = start_time + 2 * upd_vr.master_down_seconds()
+ src_ip = self.pg0.remote_ip4
+ pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
+ while time.time() < end_time:
+ self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01)
+ self.logger.info(self.vapi.cli("show trace"))
+
+ upd_vr.start_stop(is_start=0)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+
# VR with priority < 255 enters backup state and does not advertise as
# long as it receives higher priority advertisements
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noadv(self):
- """ IPv4 Backup VR does not advertise """
+ """IPv4 Backup VR does not advertise"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -351,10 +444,15 @@ class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip4])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip4],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -370,7 +468,7 @@ class TestVRRP4(VppTestCase):
# send higher prio advertisements, should not receive any
src_ip = self.pg0.remote_ip4
- pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@ -381,16 +479,16 @@ class TestVRRP4(VppTestCase):
self._vrs = []
def test_vrrp4_master_arp(self):
- """ IPv4 Master VR replies to ARP """
+ """IPv4 Master VR replies to ARP"""
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=prio, intvl=intvl,
- flags=self._default_flags)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -412,8 +510,9 @@ class TestVRRP4(VppTestCase):
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_noarp(self):
- """ IPv4 Backup VR ignores ARP """
+ """IPv4 Backup VR ignores ARP"""
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
@@ -421,16 +520,24 @@ class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[1].ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
- arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
- ARP(op=ARP.who_has, pdst=vip,
- psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac))
+ arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
+ op=ARP.who_has,
+ pdst=vip,
+ psrc=self.pg0.remote_ip4,
+ hwsrc=self.pg0.remote_mac,
+ )
# Before the VR is started make sure no reply to request for VIP
self.pg_start()
@@ -439,7 +546,7 @@ class TestVRRP4(VppTestCase):
# VR should start in backup state and still should not reply to ARP
# send a higher priority adv to make sure it does not become master
- adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4)
+ adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4)
vr.start_stop(is_start=1)
self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
@@ -447,18 +554,24 @@ class TestVRRP4(VppTestCase):
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_election(self):
- """ IPv4 Backup VR becomes master if no advertisements received """
+ """IPv4 Backup VR becomes master if no advertisements received"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -483,18 +596,24 @@ class TestVRRP4(VppTestCase):
self.pg0.wait_for_packet(intvl_s, is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_backup_preempts(self):
- """ IPv4 Backup VR preempts lower priority master """
+ """IPv4 Backup VR preempts lower priority master"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -510,7 +629,7 @@ class TestVRRP4(VppTestCase):
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip4
- pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
while time.time() + intvl_s < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@ -520,8 +639,9 @@ class TestVRRP4(VppTestCase):
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_preempted(self):
- """ IPv4 Master VR preempted by higher priority backup """
+ """IPv4 Master VR preempted by higher priority backup"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@ -529,10 +649,15 @@ class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -556,8 +681,9 @@ class TestVRRP4(VppTestCase):
# VR should be in backup state again
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_disabled(self):
- """ IPv4 Master VR does not reply for VIP w/ accept mode off """
+ """IPv4 Master VR does not reply for VIP w/ accept mode off"""
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
@@ -565,10 +691,15 @@ class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -586,17 +717,20 @@ class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IP(dst=vip, src=self.pg0.remote_ip4) /
- ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IP(dst=vip, src=self.pg0.remote_ip4)
+ / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
time.sleep(1)
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_accept_mode_enabled(self):
- """ IPv4 Master VR replies for VIP w/ accept mode on """
+ """IPv4 Master VR replies for VIP w/ accept mode on"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@ -604,11 +738,10 @@ class TestVRRP4(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip4
- flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT)
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -626,15 +759,18 @@ class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IP(dst=vip, src=self.pg0.remote_ip4) /
- ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request'))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IP(dst=vip, src=self.pg0.remote_ip4)
+ / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
- rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
- filter_out_fn=is_not_echo_reply)
+ rx_pkts = self.pg0.get_capture(
+ expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+ )
self.assertEqual(rx_pkts[0][IP].src, vip)
self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
@@ -642,18 +778,24 @@ class TestVRRP4(VppTestCase):
self.assertEqual(rx_pkts[0][ICMP].seq, 1)
self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_intf_tracking(self):
- """ IPv4 Master VR adjusts priority based on tracked interface """
+ """IPv4 Master VR adjusts priority based on tracked interface"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -663,9 +805,9 @@ class TestVRRP4(VppTestCase):
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
- vr.add_del_tracked_interface(is_add=1,
- sw_if_index=self.pg1.sw_if_index,
- prio=adjustment)
+ vr.add_del_tracked_interface(
+ is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+ )
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
@@ -674,52 +816,47 @@ class TestVRRP4(VppTestCase):
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip4()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip4()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp4_master_adv_unicast(self):
- """ IPv4 Master VR advertises (unicast) """
+ """IPv4 Master VR advertises (unicast)"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip4
- flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
+ flags = self._default_flags | VRRP_VR_FLAG_UNICAST
unicast_peer = self.pg0.remote_hosts[4]
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip4])
@@ -732,8 +869,7 @@ class TestVRRP4(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IP))
@@ -748,9 +884,8 @@ class TestVRRP4(VppTestCase):
self.assertEqual(rx[VRRPv3].addrlist, [vip])
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
class TestVRRP6(VppTestCase):
- """ IPv6 VRRP Test Case """
+ """IPv6 VRRP Test Case"""
@classmethod
def setUpClass(cls):
@@ -772,7 +907,7 @@ class TestVRRP6(VppTestCase):
i.configure_ipv6_neighbors()
self._vrs = []
- self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT)
+ self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
self._default_adv = 100
def tearDown(self):
@@ -843,16 +978,17 @@ class TestVRRP6(VppTestCase):
# VR with priority 255 owns the virtual address and should
# become master and start advertising immediately.
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv(self):
- """ IPv6 Master VR advertises """
+ """IPv6 Master VR advertises"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=prio, intvl=intvl,
- flags=self._default_flags)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -874,10 +1010,71 @@ class TestVRRP6(VppTestCase):
vr.remove_vpp_config()
self._vrs = []
+ # Same as above but with the update API, and add a change
+ # of parameters to test that too
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_vrrp6_master_adv_update(self):
+ """IPv6 Master VR adv + Update to Backup"""
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ prio = 255
+ intvl = self._default_adv
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
+
+ vr.update_vpp_config()
+ vr.start_stop(is_start=1)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ # Update VR with lower prio and larger interval
+ # we need to keep old VR for the adv checks
+ upd_vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ 100,
+ prio=100,
+ intvl=2 * intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip6],
+ )
+ upd_vr._vrrp_index = vr._vrrp_index
+ upd_vr.update_vpp_config()
+ start_time = time.time()
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+ upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ self._vrs = [upd_vr]
+
+ pkts = self.pg0.get_capture(5, filter_out_fn=None)
+
+ # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
+ self.verify_vrrp6_mlr(pkts[0], vr)
+ self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
+ self.verify_vrrp6_gna(pkts[2], vr)
+ # Master -> Init: Adv with priority 0 sent to force an election
+ self.verify_vrrp6_adv(pkts[3], vr, prio=0)
+ # Init -> Backup: A multicast listener report should be sent
+ # not actually verified in the test below, where I took this from
+
+ # send higher prio advertisements, should not see VPP send any
+ src_ip = self.pg0.remote_ip6_ll
+ pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
+ self.logger.info(self.vapi.cli("show vlib graph"))
+ end_time = start_time + 2 * upd_vr.master_down_seconds()
+ while time.time() < end_time:
+ self.send_and_assert_no_replies(
+ self.pg0, pkts, timeout=0.01 * upd_vr._intvl
+ )
+ self.logger.info(self.vapi.cli("show trace"))
+
+ vr.start_stop(is_start=0)
+ self.logger.info(self.vapi.cli("show vrrp vr"))
+
# VR with priority < 255 enters backup state and does not advertise as
# long as it receives higher priority advertisements
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_noadv(self):
- """ IPv6 Backup VR does not advertise """
+ """IPv6 Backup VR does not advertise"""
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -885,10 +1082,15 @@ class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[self.pg0.remote_ip6])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[self.pg0.remote_ip6],
+ )
vr.add_vpp_config()
self._vrs.append(vr)
@@ -904,7 +1106,7 @@ class TestVRRP6(VppTestCase):
# send higher prio advertisements, should not see VPP send any
src_ip = self.pg0.remote_ip6_ll
num_advs = 5
- pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
self.logger.info(self.vapi.cli("show vlib graph"))
while time.time() < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
@@ -917,16 +1119,16 @@ class TestVRRP6(VppTestCase):
self._vrs = []
def test_vrrp6_master_nd(self):
- """ IPv6 Master VR replies to NDP """
+ """IPv6 Master VR replies to NDP"""
self.pg_start()
# VR virtual IP is the default, which is the pg local IP
vr_id = 100
prio = 255
intvl = self._default_adv
- vr = VppVRRPVirtualRouter(self, self.pg0, 100,
- prio=prio, intvl=intvl,
- flags=self._default_flags)
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
+ )
vr.add_vpp_config()
self._vrs.append(vr)
@@ -947,8 +1149,9 @@ class TestVRRP6(VppTestCase):
vr.remove_vpp_config()
self._vrs = []
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_nond(self):
- """ IPv6 Backup VR ignores NDP """
+ """IPv6 Backup VR ignores NDP"""
# We need an address for a virtual IP that is not the IP that
# ARP requests will originate from
@@ -957,10 +1160,15 @@ class TestVRRP6(VppTestCase):
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_hosts[1].ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
vr.add_vpp_config()
self._vrs.append(vr)
@@ -968,35 +1176,43 @@ class TestVRRP6(VppTestCase):
dmac = in6_getnsmac(nsma)
dst_ip = inet_ntop(socket.AF_INET6, nsma)
- ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) /
- IPv6(dst=dst_ip, src=self.pg0.remote_ip6) /
- ICMPv6ND_NS(tgt=vip) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ndp_req = (
+ Ether(dst=dmac, src=self.pg0.remote_mac)
+ / IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
+ / ICMPv6ND_NS(tgt=vip)
+ / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
+ )
# Before the VR is started make sure no reply to request for VIP
self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
# VR should start in backup state and still should not reply to NDP
# send a higher priority adv to make sure it does not become master
- adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6)
+ adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6)
pkts = [adv, ndp_req]
vr.start_stop(is_start=1)
- self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
+ self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
vr.start_stop(is_start=0)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_election(self):
- """ IPv6 Backup VR becomes master if no advertisements received """
+ """IPv6 Backup VR becomes master if no advertisements received"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1021,18 +1237,24 @@ class TestVRRP6(VppTestCase):
self.pg0.wait_for_packet(intvl_s, is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_backup_preempts(self):
- """ IPv6 Backup VR preempts lower priority master """
+ """IPv6 Backup VR preempts lower priority master"""
vr_id = 100
prio = 100
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1048,7 +1270,7 @@ class TestVRRP6(VppTestCase):
# send lower prio advertisements until timer expires
src_ip = self.pg0.remote_ip6
- pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)]
+ pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
while (time.time() + intvl_s) < end_time:
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
self.logger.info(self.vapi.cli("show trace"))
@@ -1058,8 +1280,9 @@ class TestVRRP6(VppTestCase):
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_preempted(self):
- """ IPv6 Master VR preempted by higher priority backup """
+ """IPv6 Master VR preempted by higher priority backup"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@ -1067,10 +1290,15 @@ class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1094,8 +1322,9 @@ class TestVRRP6(VppTestCase):
# VR should be in backup state again
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_disabled(self):
- """ IPv6 Master VR does not reply for VIP w/ accept mode off """
+ """IPv6 Master VR does not reply for VIP w/ accept mode off"""
# accept mode only matters when prio < 255, so it will have to
# come up as a backup and take over as master after the timeout
@@ -1103,10 +1332,15 @@ class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1124,17 +1358,20 @@ class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMPv6 echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IPv6(dst=vip, src=self.pg0.remote_ip6) /
- ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IPv6(dst=vip, src=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply. none should be received
time.sleep(1)
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_accept_mode_enabled(self):
- """ IPv6 Master VR replies for VIP w/ accept mode on """
+ """IPv6 Master VR replies for VIP w/ accept mode on"""
# A prio 255 VR cannot be preempted so the prio has to be lower and
# we have to wait for it to take over
@@ -1142,11 +1379,10 @@ class TestVRRP6(VppTestCase):
prio = 100
intvl = self._default_adv
vip = self.pg0.remote_hosts[4].ip6
- flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT)
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1164,33 +1400,42 @@ class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
# send an ICMP echo to the VR virtual IP address
- echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) /
- IPv6(dst=vip, src=self.pg0.remote_ip6) /
- ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index))
+ echo = (
+ Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
+ / IPv6(dst=vip, src=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
+ )
self.pg_send(self.pg0, [echo])
# wait for an echo reply.
time.sleep(1)
- rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1,
- filter_out_fn=is_not_echo_reply)
+ rx_pkts = self.pg0.get_capture(
+ expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
+ )
self.assertEqual(rx_pkts[0][IPv6].src, vip)
self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_intf_tracking(self):
- """ IPv6 Master VR adjusts priority based on tracked interface """
+ """IPv6 Master VR adjusts priority based on tracked interface"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=self._default_flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self,
+ self.pg0,
+ vr_id,
+ prio=prio,
+ intvl=intvl,
+ flags=self._default_flags,
+ vips=[vip],
+ )
self._vrs.append(vr)
vr.add_vpp_config()
@@ -1200,9 +1445,9 @@ class TestVRRP6(VppTestCase):
# add pg1 as a tracked interface and start the VR
adjustment = 50
adjusted_prio = prio - adjustment
- vr.add_del_tracked_interface(is_add=1,
- sw_if_index=self.pg1.sw_if_index,
- prio=adjustment)
+ vr.add_del_tracked_interface(
+ is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
+ )
vr.start_stop(is_start=1)
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
@@ -1211,52 +1456,47 @@ class TestVRRP6(VppTestCase):
# tracked intf is up -> advertised priority == configured priority
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# take down pg1, verify priority is now being adjusted
self.pg1.admin_down()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# bring up pg1, verify priority now matches configured value
self.pg1.admin_up()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
# remove IP address from pg1, verify priority now being adjusted
self.pg1.unconfig_ip6()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_adjusted)
# add IP address to pg1, verify priority now matches configured value
self.pg1.config_ip6()
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertEqual(rx, adv_configured)
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_vrrp6_master_adv_unicast(self):
- """ IPv6 Master VR advertises (unicast) """
+ """IPv6 Master VR advertises (unicast)"""
vr_id = 100
prio = 255
intvl = self._default_adv
intvl_s = intvl * 0.01
vip = self.pg0.local_ip6
- flags = (self._default_flags | VRRP_VR_FLAG_UNICAST)
+ flags = self._default_flags | VRRP_VR_FLAG_UNICAST
unicast_peer = self.pg0.remote_hosts[4]
- vr = VppVRRPVirtualRouter(self, self.pg0, vr_id,
- prio=prio, intvl=intvl,
- flags=flags,
- vips=[vip])
+ vr = VppVRRPVirtualRouter(
+ self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
+ )
self._vrs.append(vr)
vr.add_vpp_config()
vr.set_unicast_peers([unicast_peer.ip6])
@@ -1269,23 +1509,22 @@ class TestVRRP6(VppTestCase):
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
self.pg0.enable_capture()
- rx = self.pg0.wait_for_packet(timeout=intvl_s,
- filter_out_fn=is_not_adv)
+ rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
self.assertTrue(rx.haslayer(Ether))
self.assertTrue(rx.haslayer(IPv6))
self.assertTrue(rx.haslayer(VRRPv3))
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
self.assertEqual(rx[Ether].dst, unicast_peer.mac)
- self.assertEqual(ip6_normalize(rx[IPv6].src),
- ip6_normalize(self.pg0.local_ip6_ll))
- self.assertEqual(ip6_normalize(rx[IPv6].dst),
- ip6_normalize(unicast_peer.ip6))
+ self.assertEqual(
+ ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll)
+ )
+ self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6))
self.assertEqual(rx[VRRPv3].vrid, vr_id)
self.assertEqual(rx[VRRPv3].priority, prio)
self.assertEqual(rx[VRRPv3].ipcount, 1)
self.assertEqual(rx[VRRPv3].addrlist, [vip])
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)