#!/usr/bin/env python3 # # Copyright 2019-2020 Rubicon Communications, LLC (Netgate) # # SPDX-License-Identifier: Apache-2.0 # import unittest import time import socket from socket import inet_pton, inet_ntop from vpp_object import VppObject from scapy.packet import raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, ICMP, icmptypes from scapy.layers.inet6 import ( IPv6, ipv6nh, IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply, ) from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3 from scapy.utils6 import in6_getnsma, in6_getnsmac from config import config from framework import VppTestCase from asfframework import VppTestRunner from util import ip6_normalize VRRP_VR_FLAG_PREEMPT = 1 VRRP_VR_FLAG_ACCEPT = 2 VRRP_VR_FLAG_UNICAST = 4 VRRP_VR_FLAG_IPV6 = 8 VRRP_VR_STATE_INIT = 0 VRRP_VR_STATE_BACKUP = 1 VRRP_VR_STATE_MASTER = 2 VRRP_VR_STATE_INTF_DOWN = 3 VRRP_INDEX_INVALID = 0xFFFFFFFF def is_non_arp(p): """Want to filter out advertisements, igmp, etc""" if p.haslayer(ARP): return False return True def is_not_adv(p): """Filter out everything but advertisements. E.g. multicast RD/ND""" if p.haslayer(VRRPv3): return False return True def is_not_echo_reply(p): """filter out advertisements and other while waiting for echo reply""" if p.haslayer(IP) and p.haslayer(ICMP): if icmptypes[p[ICMP].type] == "echo-reply": return False elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply): return False return True class VppVRRPVirtualRouter(VppObject): def __init__( self, test, intf, vr_id, prio=100, intvl=100, flags=VRRP_VR_FLAG_PREEMPT, vips=None, ): self._test = test self._intf = intf self._sw_if_index = self._intf.sw_if_index self._vr_id = vr_id self._prio = prio self._intvl = intvl self._flags = flags if flags & VRRP_VR_FLAG_IPV6: self._is_ipv6 = 1 self._adv_dest_mac = "33:33:00:00:00:12" self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id self._adv_dest_ip = "ff02::12" self._vips = [intf.local_ip6] if vips is None else vips else: self._is_ipv6 = 0 self._adv_dest_mac = "01:00:5e:00:00:12" self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id self._adv_dest_ip = "224.0.0.18" self._vips = [intf.local_ip4] if vips is None else vips self._tracked_ifs = [] self._vrrp_index = VRRP_INDEX_INVALID def add_vpp_config(self): self._test.vapi.vrrp_vr_add_del( is_add=1, sw_if_index=self._intf.sw_if_index, vr_id=self._vr_id, priority=self._prio, interval=self._intvl, flags=self._flags, n_addrs=len(self._vips), addrs=self._vips, ) def update_vpp_config(self): r = self._test.vapi.vrrp_vr_update( vrrp_index=self._vrrp_index, sw_if_index=self._intf.sw_if_index, vr_id=self._vr_id, priority=self._prio, interval=self._intvl, flags=self._flags, n_addrs=len(self._vips), addrs=self._vips, ) self._vrrp_index = r.vrrp_index def delete_vpp_config(self): self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index) def query_vpp_config(self): vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index) for vr in vrs: if vr.config.vr_id != self._vr_id: continue is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0 if is_ipv6 != self._is_ipv6: continue return vr return None def remove_vpp_config(self): self._test.vapi.vrrp_vr_add_del( is_add=0, sw_if_index=self._intf.sw_if_index, vr_id=self._vr_id, priority=self._prio, interval=self._intvl, flags=self._flags, n_addrs=len(self._vips), addrs=self._vips, ) def start_stop(self, is_start): self._test.vapi.vrrp_vr_start_stop( is_start=is_start, sw_if_index=self._intf.sw_if_index, vr_id=self._vr_id, is_ipv6=self._is_ipv6, ) self._start_time = time.time() if is_start else None def add_del_tracked_interface(self, is_add, sw_if_index, prio): args = { "sw_if_index": self._intf.sw_if_index, "is_ipv6": self._is_ipv6, "vr_id": self._vr_id, "is_add": is_add, "n_ifs": 1, "ifs": [{"sw_if_index": sw_if_index, "priority": prio}], } self._test.vapi.vrrp_vr_track_if_add_del(**args) self._tracked_ifs.append(args["ifs"][0]) def set_unicast_peers(self, addrs): args = { "sw_if_index": self._intf.sw_if_index, "is_ipv6": self._is_ipv6, "vr_id": self._vr_id, "n_addrs": len(addrs), "addrs": addrs, } self._test.vapi.vrrp_vr_set_peers(**args) self._unicast_peers = addrs def start_time(self): return self._start_time def virtual_mac(self): return self._virtual_mac def virtual_ips(self): return self._vips def adv_dest_mac(self): return self._adv_dest_mac def adv_dest_ip(self): return self._adv_dest_ip def priority(self): return self._prio def vr_id(self): return self._vr_id def adv_interval(self): return self._intvl def interface(self): return self._intf def assert_state_equals(self, state): vr_details = self.query_vpp_config() self._test.assertEqual(vr_details.runtime.state, state) def master_down_seconds(self): vr_details = self.query_vpp_config() return vr_details.runtime.master_down_int * 0.01 def vrrp_adv_packet(self, prio=None, src_ip=None): dst_ip = self._adv_dest_ip if prio is None: prio = self._prio eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac) vrrp = VRRPv3( vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl ) if self._is_ipv6: src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255) vrrp.addrlist = self._vips else: src_ip = self._intf.local_ip4 if src_ip is None else src_ip ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0) vrrp.addrlist = self._vips # Fill in default values & checksums pkt = Ether(raw(eth / ip / vrrp)) return pkt @unittest.skipIf("vrrp" in config.excluded_plugins, "Exclude VRRP plugin tests") class TestVRRP4(VppTestCase): """IPv4 VRRP Test Case""" @classmethod def setUpClass(cls): super(TestVRRP4, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestVRRP4, cls).tearDownClass() def setUp(self): super(TestVRRP4, self).setUp() self.create_pg_interfaces(range(2)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.generate_remote_hosts(5) i.configure_ipv4_neighbors() self._vrs = [] self._default_flags = VRRP_VR_FLAG_PREEMPT self._default_adv = 100 def tearDown(self): for vr in self._vrs: try: vr_api = vr.query_vpp_config() if vr_api.runtime.state != VRRP_VR_STATE_INIT: vr.start_stop(is_start=0) vr.remove_vpp_config() except: self.logger.error("Error cleaning up") for i in self.pg_interfaces: i.admin_down() i.unconfig_ip4() i.unconfig_ip6() self._vrs = [] super(TestVRRP4, self).tearDown() def verify_vrrp4_igmp(self, pkt): ip = pkt[IP] self.assertEqual(ip.dst, "224.0.0.22") self.assertEqual(ip.proto, 2) igmp = pkt[IGMPv3] self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report") igmpmr = pkt[IGMPv3mr] self.assertEqual(igmpmr.numgrp, 1) self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18") def verify_vrrp4_garp(self, pkt, vip, vmac): arp = pkt[ARP] # ARP "who-has" op == 1 self.assertEqual(arp.op, 1) self.assertEqual(arp.pdst, arp.psrc) self.assertEqual(arp.pdst, vip) self.assertEqual(arp.hwsrc, vmac) def verify_vrrp4_adv(self, rx_pkt, vr, prio=None): vips = vr.virtual_ips() eth = rx_pkt[Ether] ip = rx_pkt[IP] vrrp = rx_pkt[VRRPv3] pkt = vr.vrrp_adv_packet(prio=prio) # Source MAC is virtual MAC, destination is multicast MAC self.assertEqual(eth.src, vr.virtual_mac()) self.assertEqual(eth.dst, vr.adv_dest_mac()) self.assertEqual(ip.dst, "224.0.0.18") self.assertEqual(ip.ttl, 255) self.assertEqual(ip.proto, IPPROTO_VRRP) self.assertEqual(vrrp.version, 3) self.assertEqual(vrrp.type, 1) self.assertEqual(vrrp.vrid, vr.vr_id()) if prio is None: prio = vr.priority() self.assertEqual(vrrp.priority, prio) self.assertEqual(vrrp.ipcount, len(vips)) self.assertEqual(vrrp.adv, vr.adv_interval()) self.assertListEqual(vrrp.addrlist, vips) # VR with priority 255 owns the virtual address and should # become master and start advertising immediately. @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv(self): """IPv4 Master VR advertises""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv vr = VppVRRPVirtualRouter( self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags ) vr.add_vpp_config() vr.start_stop(is_start=1) self.logger.info(self.vapi.cli("show vrrp vr")) vr.start_stop(is_start=0) self.logger.info(self.vapi.cli("show vrrp vr")) pkts = self.pg0.get_capture(4) # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent self.verify_vrrp4_igmp(pkts[0]) self.verify_vrrp4_adv(pkts[1], vr, prio=prio) self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac()) # Master -> Init: Adv with priority 0 sent to force an election self.verify_vrrp4_adv(pkts[3], vr, prio=0) vr.remove_vpp_config() self._vrs = [] # Same as above but with the update API, and add a change # of parameters to test that too @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv_update(self): """IPv4 Master VR adv + Update to Backup""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() prio = 255 intvl = self._default_adv vr = VppVRRPVirtualRouter( self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags ) vr.update_vpp_config() vr.start_stop(is_start=1) self.logger.info(self.vapi.cli("show vrrp vr")) # Update VR with lower prio and larger interval # we need to keep old VR for the adv checks upd_vr = VppVRRPVirtualRouter( self, self.pg0, 100, prio=100, intvl=2 * intvl, flags=self._default_flags, vips=[self.pg0.remote_ip4], ) upd_vr._vrrp_index = vr._vrrp_index upd_vr.update_vpp_config() start_time = time.time() self.logger.info(self.vapi.cli("show vrrp vr")) upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP) self._vrs = [upd_vr] pkts = self.pg0.get_capture(5) # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent self.verify_vrrp4_igmp(pkts[0]) self.verify_vrrp4_adv(pkts[1], vr, prio=prio) self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac()) # Master -> Init: Adv with priority 0 sent to force an election self.verify_vrrp4_adv(pkts[3], vr, prio=0) # Init -> Backup: An IGMP join should be sent self.verify_vrrp4_igmp(pkts[4]) # send higher prio advertisements, should not receive any end_time = start_time + 2 * upd_vr.master_down_seconds() src_ip = self.pg0.remote_ip4 pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01) self.logger.info(self.vapi.cli("show trace")) upd_vr.start_stop(is_start=0) self.logger.info(self.vapi.cli("show vrrp vr")) # VR with priority < 255 enters backup state and does not advertise as # long as it receives higher priority advertisements @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noadv(self): """IPv4 Backup VR does not advertise""" self.pg_enable_capture(self.pg_interfaces) self.pg_start() vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[self.pg0.remote_ip4], ) self._vrs.append(vr) vr.add_vpp_config() vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_BACKUP) # watch for advertisements for 2x the master down preemption timeout end_time = vr.start_time() + 2 * vr.master_down_seconds() # Init -> Backup: An IGMP join should be sent pkts = self.pg0.get_capture(1) self.verify_vrrp4_igmp(pkts[0]) # send higher prio advertisements, should not receive any src_ip = self.pg0.remote_ip4 pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)] while time.time() < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) vr.start_stop(is_start=0) self.logger.info(self.vapi.cli("show vrrp vr")) vr.remove_vpp_config() self._vrs = [] def test_vrrp4_master_arp(self): """IPv4 Master VR replies to ARP""" self.pg_start() # VR virtual IP is the default, which is the pg local IP vr_id = 100 prio = 255 intvl = self._default_adv vr = VppVRRPVirtualRouter( self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags ) self._vrs.append(vr) vr.add_vpp_config() # before the VR is up, ARP should resolve to interface MAC self.pg0.resolve_arp() self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) # start the VR, ARP should now resolve to virtual MAC vr.start_stop(is_start=1) self.pg0.resolve_arp() self.assertEqual(self.pg0.local_mac, vr.virtual_mac()) # stop the VR, ARP should resolve to interface MAC again vr.start_stop(is_start=0) self.pg0.resolve_arp() self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) vr.remove_vpp_config() self._vrs = [] @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_noarp(self): """IPv4 Backup VR ignores ARP""" # We need an address for a virtual IP that is not the IP that # ARP requests will originate from vr_id = 100 prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[1].ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op=ARP.who_has, pdst=vip, psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac, ) # Before the VR is started make sure no reply to request for VIP self.pg_start() self.pg_enable_capture(self.pg_interfaces) self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1) # VR should start in backup state and still should not reply to ARP # send a higher priority adv to make sure it does not become master adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4) vr.start_stop(is_start=1) self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1) vr.start_stop(is_start=0) vr.remove_vpp_config() self._vrs = [] @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_election(self): """IPv4 Backup VR becomes master if no advertisements received""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) self.pg_start() vr.start_stop(is_start=1) # VR should be in backup state after starting vr.assert_state_equals(VRRP_VR_STATE_BACKUP) end_time = vr.start_time() + vr.master_down_seconds() # should not receive adverts until timer expires & state transition self.pg_enable_capture(self.pg_interfaces) while (time.time() + intvl_s) < end_time: time.sleep(intvl_s) self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv) # VR should be in master state, should send an adv self.pg0.enable_capture() self.pg0.wait_for_packet(intvl_s, is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_backup_preempts(self): """IPv4 Backup VR preempts lower priority master""" vr_id = 100 prio = 100 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.remote_ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) self.pg_start() vr.start_stop(is_start=1) # VR should be in backup state after starting vr.assert_state_equals(VRRP_VR_STATE_BACKUP) end_time = vr.start_time() + vr.master_down_seconds() # send lower prio advertisements until timer expires src_ip = self.pg0.remote_ip4 pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)] while time.time() + intvl_s < end_time: self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) self.logger.info(self.vapi.cli("show trace")) # when timer expires, VR should take over as master self.pg0.enable_capture() self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) vr.assert_state_equals(VRRP_VR_STATE_MASTER) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_preempted(self): """IPv4 Master VR preempted by higher priority backup""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over vr_id = 100 prio = 100 intvl = self._default_adv vip = self.pg0.remote_ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) # start VR vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_BACKUP) # wait for VR to take over as master end_time = vr.start_time() + vr.master_down_seconds() sleep_s = end_time - time.time() time.sleep(sleep_s) vr.assert_state_equals(VRRP_VR_STATE_MASTER) # Build advertisement packet and send it pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)] self.pg_send(self.pg0, pkts) # VR should be in backup state again vr.assert_state_equals(VRRP_VR_STATE_BACKUP) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_disabled(self): """IPv4 Master VR does not reply for VIP w/ accept mode off""" # accept mode only matters when prio < 255, so it will have to # come up as a backup and take over as master after the timeout vr_id = 100 prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) # start VR vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_BACKUP) # wait for VR to take over as master end_time = vr.start_time() + vr.master_down_seconds() sleep_s = end_time - time.time() time.sleep(sleep_s) vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address echo = ( Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / IP(dst=vip, src=self.pg0.remote_ip4) / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. none should be received time.sleep(1) self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_accept_mode_enabled(self): """IPv4 Master VR replies for VIP w/ accept mode on""" # A prio 255 VR cannot be preempted so the prio has to be lower and # we have to wait for it to take over vr_id = 100 prio = 100 intvl = self._default_adv vip = self.pg0.remote_hosts[4].ip4 flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip] ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) # start VR vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_BACKUP) # wait for VR to take over as master end_time = vr.start_time() + vr.master_down_seconds() sleep_s = end_time - time.time() time.sleep(sleep_s) vr.assert_state_equals(VRRP_VR_STATE_MASTER) # send an ICMP echo to the VR virtual IP address echo = ( Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / IP(dst=vip, src=self.pg0.remote_ip4) / ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request") ) self.pg_send(self.pg0, [echo]) # wait for an echo reply. time.sleep(1) rx_pkts = self.pg0.get_capture( expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply ) self.assertEqual(rx_pkts[0][IP].src, vip) self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4) self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply") self.assertEqual(rx_pkts[0][ICMP].seq, 1) self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_intf_tracking(self): """IPv4 Master VR adjusts priority based on tracked interface""" vr_id = 100 prio = 255 intvl = self._default_adv intvl_s = intvl * 0.01 vip = self.pg0.local_ip4 vr = VppVRRPVirtualRouter( self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=self._default_flags, vips=[vip], ) self._vrs.append(vr) vr.add_vpp_config() # After adding the VR, it should be in the init state vr.assert_state_equals(VRRP_VR_STATE_INIT) # add pg1 as a tracked interface and start the VR adjustment = 50 adjusted_prio = prio - adjustment vr.add_del_tracked_interface( is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment ) vr.start_stop(is_start=1) vr.assert_state_equals(VRRP_VR_STATE_MASTER) adv_configured = vr.vrrp_adv_packet(prio=prio) adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio) # tracked intf is up -> advertised priority == configured priority self.pg0.enable_capture() rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # take down pg1, verify priority is now being adjusted self.pg1.admin_down() self.pg0.enable_capture() rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # bring up pg1, verify priority now matches configured value self.pg1.admin_up() self.pg0.enable_capture() rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) # remove IP address from pg1, verify priority now being adjusted self.pg1.unconfig_ip4() self.pg0.enable_capture() rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_adjusted) # add IP address to pg1, verify priority now matches configured value self.pg1.config_ip4() self.pg0.enable_capture() rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) self.assertEqual(rx, adv_configured) @unittest.skipUnless(config.extended, "part of extended tests") def test_vrrp4_master_adv_uni
/*
* Copyright (c) 2018-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vcl/vcl_private.h>
#include <vlibmemory/api.h>
#include <vpp/api/vpe_msg_enum.h>
#define vl_typedefs /* define message structures */
#include <vpp/api/vpe_all_api_h.h>
#undef vl_typedefs
/* declare message handlers for each api */
#define vl_endianfun /* define message structures */
#include <vpp/api/vpe_all_api_h.h>
#undef vl_endianfun
/* instantiate all the print functions we know about */
#define vl_print(handle, ...)
#define vl_printfun
#include <vpp/api/vpe_all_api_h.h>
#undef vl_printfun
u8 *
format_api_error (u8 * s, va_list * args)
{
i32 error = va_arg (*args, u32);
uword *p;
p = hash_get (vcm->error_string_by_error_number, -error);
if (p)
s = format (s, "%s (%d)", p[0], error);
else
s = format (s, "%d", error);
return s;
}
static void
vl_api_session_enable_disable_reply_t_handler
(vl_api_session_enable_disable_reply_t * mp)
{
if (mp->retval)
{
clib_warning ("VCL<%d>: session_enable_disable failed: %U", getpid (),
format_api_error, ntohl (mp->retval));
}
else
vcm->app_state = STATE_APP_ENABLED;
}
static int
vcl_segment_attach (u64 segment_handle, char *name, ssvm_segment_type_t type,
int fd)
{
fifo_segment_create_args_t _a, *a = &_a;
int rv;
memset (a, 0, sizeof (*a));
a->segment_name = (char *) name;
a->segment_type = type;
if (type == SSVM_SEGMENT_MEMFD)
a->memfd_fd = fd;
if ((rv = fifo_segment_attach (&vcm->segment_main, a)))
{
clib_warning ("svm_fifo_segment_attach ('%s') failed", name);
return rv;
}
vcl_segment_table_add (segment_handle, a->new_segment_indices[0]);
vec_reset_length (a->new_segment_indices);
return 0;
}
static void
vcl_segment_detach (u64 segment_handle)
{
fifo_segment_main_t *sm = &vcm->segment_main;
fifo_segment_t *segment;
u32 segment_index;
segment_index = vcl_segment_table_lookup (segment_handle);
if (segment_index == (u32) ~ 0)
return;
segment = fifo_segment_get_segment (sm, segment_index);
fifo_segment_delete (sm, segment);
vcl_segment_table_del (segment_handle);
VDBG (0, "detached segment %u handle %u", segment_index, segment_handle);
}
static u64
vcl_vpp_worker_segment_handle (u32 wrk_index)
{
return (VCL_INVALID_SEGMENT_HANDLE - wrk_index - 1);
}
static void
vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
mp)
{
vcl_worker_t *wrk = vcl_worker_get (0);
u64 segment_handle;
int *fds = 0, i;
u32 n_fds = 0;
if (mp->retval)
{
VERR ("attach failed: %U", format_api_error, ntohl (mp->retval));
goto failed;
}
wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
svm_msg_q_t *);
segment_handle = clib_net_to_host_u64 (mp->segment_handle);
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
{
VERR ("invalid segment handle");
goto failed;
}
if (mp->n_fds)
{
vec_validate (fds, mp->n_fds);
if (vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5))
goto failed;
if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT)
if (vcl_segment_attach (vcl_vpp_worker_segment_handle (0),
"vpp-mq-seg", SSVM_SEGMENT_MEMFD,
fds[n_fds++]))
goto failed;
if