#!/usr/bin/env python3 import unittest import os from framework import VppTestCase from asfframework import VppTestRunner, tag_fixme_vpp_workers, tag_fixme_ubuntu2204 from vpp_neighbor import VppNeighbor, find_nbr from vpp_ip_route import ( VppIpRoute, VppRoutePath, find_route, VppIpTable, DpoProto, FibPathType, VppIpInterfaceAddress, ) from vpp_papi import VppEnum, MACAddress from vpp_ip import VppIpPuntRedirect from vpp_sub_interface import VppDot1ADSubint from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q from scapy.layers.inet import IP, UDP, TCP from scapy.layers.inet6 import IPv6 from scapy.contrib.mpls import MPLS from scapy.layers.inet6 import IPv6 NUM_PKTS = 67 # not exported by scapy, so redefined here arp_opts = {"who-has": 1, "is-at": 2} class ARPTestCase(VppTestCase): """ARP Test Case""" @classmethod def setUpClass(cls): super(ARPTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(ARPTestCase, cls).tearDownClass() def setUp(self): super(ARPTestCase, self).setUp() # create 3 pg interfaces self.create_pg_interfaces(range(4)) # pg0 configured with ip4 and 6 addresses used for input # pg1 configured with ip4 and 6 addresses used for output # pg2 is unnumbered to pg0 for i in self.pg_interfaces: i.admin_up() self.pg0.config_ip4() self.pg0.config_ip6() self.pg0.resolve_arp() self.pg1.config_ip4() self.pg1.config_ip6() # pg3 in a different VRF self.tbl = VppIpTable(self, 1) self.tbl.add_vpp_config() self.pg3.set_table_ip4(1) self.pg3.config_ip4() def tearDown(self): self.pg0.unconfig_ip4() self.pg0.unconfig_ip6() self.pg1.unconfig_ip4() self.pg1.unconfig_ip6() self.pg3.unconfig_ip4() self.pg3.set_table_ip4(0) for i in self.pg_interfaces: i.admin_down() super(ARPTestCase, self).tearDown() def verify_arp_req(self, rx, smac, sip, dip, etype=0x0806): ether = rx[Ether] self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") self.assertEqual(ether.src, smac) self.assertEqual(ether.type, etype) arp = rx[ARP] self.assertEqual(arp.hwtype, 1) self.assertEqual(arp.ptype, 0x800) self.assertEqual(arp.hwlen, 6) self.assertEqual(arp.plen, 4) self.assertEqual(arp.op, arp_opts["who-has"]) self.assertEqual(arp.hwsrc, smac) self.assertEqual(arp.hwdst, "00:00:00:00:00:00") self.assertEqual(arp.psrc, sip) self.assertEqual(arp.pdst, dip) def verify_arp_resp(self, rx, smac, dmac, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, dmac) self.assertEqual(ether.src, smac) self.assertEqual(ether.type, 0x0806) arp = rx[ARP] self.assertEqual(arp.hwtype, 1) self.assertEqual(arp.ptype, 0x800) self.assertEqual(arp.hwlen, 6) self.assertEqual(arp.plen, 4) self.assertEqual(arp.op, arp_opts["is-at"]) self.assertEqual(arp.hwsrc, smac) self.assertEqual(arp.hwdst, dmac) self.assertEqual(arp.psrc, sip) self.assertEqual(arp.pdst, dip) def verify_arp_vrrp_resp(self, rx, smac, dmac, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, dmac) self.assertEqual(ether.src, smac) arp = rx[ARP] self.assertEqual(arp.hwtype, 1) self.assertEqual(arp.ptype, 0x800) self.assertEqual(arp.hwlen, 6) self.assertEqual(arp.plen, 4) self.assertEqual(arp.op, arp_opts["is-at"]) self.assertNotEqual(arp.hwsrc, smac) self.assertTrue("00:00:5e:00:01" in arp.hwsrc or "00:00:5E:00:01" in arp.hwsrc) self.assertEqual(arp.hwdst, dmac) self.assertEqual(arp.psrc, sip) self.assertEqual(arp.pdst, dip) def verify_ip(self, rx, smac, dmac, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, dmac) self.assertEqual(ether.src, smac) self.assertEqual(ether.type, 0x0800) ip = rx[IP] self.assertEqual(ip.src, sip) self.assertEqual(ip.dst, dip) def verify_ip_o_mpls(self, rx, smac, dmac, label, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, dmac) self.assertEqual(ether.src, smac) self.assertEqual(ether.type, 0x8847) mpls = rx[MPLS] self.assertTrue(mpls.label, label) ip = rx[IP] self.assertEqual(ip.src, sip) self.assertEqual(ip.dst, dip) def get_arp_rx_requests(self, itf): """Get ARP RX request stats for and interface""" return self.statistics["/net/arp/rx/requests"][:, itf.sw_if_index].sum() def get_arp_tx_requests(self, itf): """Get ARP TX request stats for and interface""" return self.statistics["/net/arp/tx/requests"][:, itf.sw_if_index].sum() def get_arp_rx_replies(self, itf): """Get ARP RX replies stats for and interface""" return self.statistics["/net/arp/rx/replies"][:, itf.sw_if_index].sum() def get_arp_tx_replies(self, itf): """Get ARP TX replies stats for and interface""" return self.statistics["/net/arp/tx/replies"][:, itf.sw_if_index].sum() def get_arp_rx_garp(self, itf): """Get ARP RX grat stats for and interface""" return self.statistics["/net/arp/rx/gratuitous"][:, itf.sw_if_index].sum() def get_arp_tx_garp(self, itf): """Get ARP RX grat stats for and interface""" return self.statistics["/net/arp/tx/gratuitous"][:, itf.sw_if_index].sum() def test_arp(self): """ARP""" # # Generate some hosts on the LAN # self.pg1.generate_remote_hosts(11) # # watch for: # - all neighbour events # - all neighbor events on pg1 # - neighbor events for host[1] on pg1 # self.vapi.want_ip_neighbor_events(enable=1, pid=os.getpid()) self.vapi.want_ip_neighbor_events( enable=1, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index ) self.vapi.want_ip_neighbor_events( enable=1, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index, ip=self.pg1.remote_hosts[1].ip4, ) self.logger.info(self.vapi.cli("sh ip neighbor-watcher")) # # Send IP traffic to one of these unresolved hosts. # expect the generation of an ARP request # p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[1].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4 ) self.logger.info(self.vapi.cli("sh ip neighbor-stats")) self.logger.info(self.vapi.cli("sh ip neighbor-stats pg1")) self.assert_equal(self.get_arp_tx_requests(self.pg1), 1) # # And a dynamic ARP entry for host 1 # dyn_arp = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, self.pg1.remote_hosts[1].ip4, ) dyn_arp.add_vpp_config() self.assertTrue(dyn_arp.query_vpp_config()) self.logger.info(self.vapi.cli("show ip neighbor-watcher")) # this matches all of the listnerers es = [self.vapi.wait_for_event(1, "ip_neighbor_event") for i in range(3)] for e in es: self.assertEqual(str(e.neighbor.ip_address), self.pg1.remote_hosts[1].ip4) # # now we expect IP traffic forwarded # dyn_p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[1].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(dyn_p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_ip( rx[0], self.pg1.local_mac, self.pg1.remote_hosts[1].mac, self.pg0.remote_ip4, self.pg1._remote_hosts[1].ip4, ) # # And a Static ARP entry for host 2 # static_arp = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[2].mac, self.pg1.remote_hosts[2].ip4, is_static=1, ) static_arp.add_vpp_config() es = [self.vapi.wait_for_event(1, "ip_neighbor_event") for i in range(2)] for e in es: self.assertEqual(str(e.neighbor.ip_address), self.pg1.remote_hosts[2].ip4) static_p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[2].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(static_p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_ip( rx[0], self.pg1.local_mac, self.pg1.remote_hosts[2].mac, self.pg0.remote_ip4, self.pg1._remote_hosts[2].ip4, ) # # remove all the listeners # self.vapi.want_ip_neighbor_events(enable=0, pid=os.getpid()) self.vapi.want_ip_neighbor_events( enable=0, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index ) self.vapi.want_ip_neighbor_events( enable=0, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index, ip=self.pg1.remote_hosts[1].ip4, ) # # flap the link. dynamic ARPs get flush, statics don't # self.pg1.admin_down() self.pg1.admin_up() self.pg0.add_stream(static_p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_ip( rx[0], self.pg1.local_mac, self.pg1.remote_hosts[2].mac, self.pg0.remote_ip4, self.pg1._remote_hosts[2].ip4, ) self.pg0.add_stream(dyn_p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4 ) self.assert_equal(self.get_arp_tx_requests(self.pg1), 2) self.assertFalse(dyn_arp.query_vpp_config()) self.assertTrue(static_arp.query_vpp_config()) # # Send an ARP request from one of the so-far unlearned remote hosts # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1._remote_hosts[3].mac) / ARP( op="who-has", hwsrc=self.pg1._remote_hosts[3].mac, pdst=self.pg1.local_ip4, psrc=self.pg1._remote_hosts[3].ip4, ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_resp( rx[0], self.pg1.local_mac, self.pg1._remote_hosts[3].mac, self.pg1.local_ip4, self.pg1._remote_hosts[3].ip4, ) self.logger.info(self.vapi.cli("sh ip neighbor-stats pg1")) self.assert_equal(self.get_arp_rx_requests(self.pg1), 1) self.assert_equal(self.get_arp_tx_replies(self.pg1), 1) # # VPP should have learned the mapping for the remote host # self.assertTrue( find_nbr(self, self.pg1.sw_if_index, self.pg1._remote_hosts[3].ip4) ) # # Fire in an ARP request before the interface becomes IP enabled # self.pg2.generate_remote_hosts(4) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg2.remote_hosts[3].ip4, ) pt = ( Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / Dot1Q(vlan=0) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg2.remote_hosts[3].ip4, ) ) self.send_and_assert_no_replies(self.pg2, p, "interface not IP enabled") # # Make pg2 un-numbered to pg1 # self.pg2.set_unnumbered(self.pg1.sw_if_index) # # test the unnumbered dump both by all interfaces and just the enabled # one # unnum = self.vapi.ip_unnumbered_dump() self.assertTrue(len(unnum)) self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index) self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index) unnum = self.vapi.ip_unnumbered_dump(self.pg2.sw_if_index) self.assertTrue(len(unnum)) self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index) self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index) # Allow for ARP requests from point-to-point ethernet neighbors # without an attached route on pg2 self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg2.remote_hosts[3].ip4, ) # # Allow for ARP requests from neighbors on unnumbered with # an attached route on pg2 attached_host = VppIpRoute( self, self.pg2.remote_hosts[3].ip4, 32, [VppRoutePath("0.0.0.0", self.pg2.sw_if_index)], ) attached_host.add_vpp_config() self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg2.remote_hosts[3].ip4, ) self.pg2.add_stream(pt) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg2.remote_hosts[3].ip4, ) # # A neighbor entry that has no associated FIB-entry # arp_no_fib = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[4].mac, self.pg1.remote_hosts[4].ip4, is_no_fib_entry=1, ) arp_no_fib.add_vpp_config() # # check we have the neighbor, but no route # self.assertTrue( find_nbr(self, self.pg1.sw_if_index, self.pg1._remote_hosts[4].ip4) ) self.assertFalse(find_route(self, self.pg1._remote_hosts[4].ip4, 32)) # # pg2 is unnumbered to pg1, so we can form adjacencies out of pg2 # from within pg1's subnet # arp_unnum = VppNeighbor( self, self.pg2.sw_if_index, self.pg1.remote_hosts[5].mac, self.pg1.remote_hosts[5].ip4, ) arp_unnum.add_vpp_config() p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[5].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_ip( rx[0], self.pg2.local_mac, self.pg1.remote_hosts[5].mac, self.pg0.remote_ip4, self.pg1._remote_hosts[5].ip4, ) # # ARP requests from hosts in pg1's subnet sent on pg2 are replied to # with the unnumbered interface's address as the source # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg1.remote_hosts[6].ip4, ) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg1.remote_hosts[6].ip4, ) # # An attached host route out of pg2 for an undiscovered hosts generates # an ARP request with the unnumbered address as the source # att_unnum = VppIpRoute( self, self.pg1.remote_hosts[7].ip4, 32, [VppRoutePath("0.0.0.0", self.pg2.sw_if_index)], ) att_unnum.add_vpp_config() p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[7].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_req( rx[0], self.pg2.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[7].ip4 ) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg1.remote_hosts[7].ip4, ) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg1.remote_hosts[7].ip4, ) # # An attached host route as yet unresolved out of pg2 for an # undiscovered host, an ARP requests begets a response. # att_unnum1 = VppIpRoute( self, self.pg1.remote_hosts[8].ip4, 32, [VppRoutePath("0.0.0.0", self.pg2.sw_if_index)], ) att_unnum1.add_vpp_config() p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg1.remote_hosts[8].ip4, ) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg1.local_ip4, self.pg1.remote_hosts[8].ip4, ) # # Send an ARP request from one of the so-far unlearned remote hosts # with a VLAN0 tag # p = ( Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1._remote_hosts[9].mac) / Dot1Q(vlan=0) / ARP( op="who-has", hwsrc=self.pg1._remote_hosts[9].mac, pdst=self.pg1.local_ip4, psrc=self.pg1._remote_hosts[9].ip4, ) ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_resp( rx[0], self.pg1.local_mac, self.pg1._remote_hosts[9].mac, self.pg1.local_ip4, self.pg1._remote_hosts[9].ip4, ) # # Add a hierarchy of routes for a host in the sub-net. # Should still get an ARP resp since the cover is attached # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_mac) / ARP( op="who-has", hwsrc=self.pg1.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg1.remote_hosts[10].ip4, ) r1 = VppIpRoute( self, self.pg1.remote_hosts[10].ip4, 30, [VppRoutePath(self.pg1.remote_hosts[10].ip4, self.pg1.sw_if_index)], ) r1.add_vpp_config() self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_resp( rx[0], self.pg1.local_mac, self.pg1.remote_mac, self.pg1.local_ip4, self.pg1.remote_hosts[10].ip4, ) r2 = VppIpRoute( self, self.pg1.remote_hosts[10].ip4, 32, [VppRoutePath(self.pg1.remote_hosts[10].ip4, self.pg1.sw_if_index)], ) r2.add_vpp_config() self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_resp( rx[0], self.pg1.local_mac, self.pg1.remote_mac, self.pg1.local_ip4, self.pg1.remote_hosts[10].ip4, ) # # add an ARP entry that's not on the sub-net and so whose # adj-fib fails the refinement check. then send an ARP request # from that source # a1 = VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_mac, "100.100.100.50" ) a1.add_vpp_config() p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, psrc="100.100.100.50", pdst=self.pg0.remote_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req for from failed adj-fib") # # ERROR Cases # 1 - don't respond to ARP request for address not within the # interface's sub-net # 1b - nor within the unnumbered subnet # 1c - nor within the subnet of a different interface # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, pdst="10.10.10.3", psrc=self.pg0.remote_ip4, ) self.send_and_assert_no_replies( self.pg0, p, "ARP req for non-local destination" ) self.assertFalse(find_nbr(self, self.pg0.sw_if_index, "10.10.10.3")) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst="10.10.10.3", psrc=self.pg1.remote_hosts[7].ip4, ) self.send_and_assert_no_replies( self.pg0, p, "ARP req for non-local destination - unnum" ) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, pdst=self.pg1.local_ip4, psrc=self.pg1.remote_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req diff sub-net") self.assertFalse(find_nbr(self, self.pg0.sw_if_index, self.pg1.remote_ip4)) # # 2 - don't respond to ARP request from an address not within the # interface's sub-net # 2b - to a proxied address # 2c - not within a different interface's sub-net p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, psrc="10.10.10.3", pdst=self.pg0.local_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source") p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP( op="who-has", hwsrc=self.pg2.remote_mac, psrc="10.10.10.3", pdst=self.pg0.local_ip4, ) self.send_and_assert_no_replies( self.pg0, p, "ARP req for non-local source - unnum" ) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, psrc=self.pg1.remote_ip4, pdst=self.pg0.local_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source 2c") # # 3 - don't respond to ARP request from an address that belongs to # the router # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, psrc=self.pg0.local_ip4, pdst=self.pg0.local_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source") # # 4 - don't respond to ARP requests that has mac source different # from ARP request HW source # p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc="00:00:00:DE:AD:BE", psrc=self.pg0.remote_ip4, pdst=self.pg0.local_ip4, ) self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source") # # 5 - don't respond to ARP requests for address within the # interface's sub-net but not the interface's address # self.pg0.generate_remote_hosts(2) p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, psrc=self.pg0.remote_hosts[0].ip4, pdst=self.pg0.remote_hosts[1].ip4, ) self.send_and_assert_no_replies( self.pg0, p, "ARP req for non-local destination" ) # # cleanup # static_arp.remove_vpp_config() self.pg2.unset_unnumbered(self.pg1.sw_if_index) # need this to flush the adj-fibs self.pg2.unset_unnumbered(self.pg1.sw_if_index) self.pg2.admin_down() self.pg1.admin_down() def test_arp_after_mac_change(self): """ARP (after MAC address change)""" # # Prepare a subinterface # subif0 = VppDot1ADSubint(self, self.pg1, 0, 300, 400) subif0.admin_up() subif0.config_ip4() # # Send a packet to cause ARP generation for the parent interface's remote host # p1 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4 ) # # Send a packet to cause ARP generation for the subinterface's remote host # p2 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=subif0.remote_ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, subif0.local_ip4, subif0.remote_ip4, subif0.DOT1AD_TYPE, ) # # Change MAC address of the parent interface # pg1_mac_saved = self.pg1.local_mac self.pg1.set_mac(MACAddress("00:00:00:11:22:33")) # # Send a packet to cause ARP generation for the parent interface's remote host # - expect new MAC address is used as the source # self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4 ) # # Send a packet to cause ARP generation for the subinterface's remote host # - expect new MAC address is used as the source # self.pg0.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_req( rx[0], self.pg1.local_mac, subif0.local_ip4, subif0.remote_ip4, subif0.DOT1AD_TYPE, ) # # Cleanup # subif0.remove_vpp_config() self.pg1.set_mac(MACAddress(pg1_mac_saved)) def test_proxy_mirror_arp(self): """Interface Mirror Proxy ARP""" # # When VPP has an interface whose address is also applied to a TAP # interface on the host, then VPP's TAP interface will be unnumbered # to the 'real' interface and do proxy ARP from the host. # the curious aspect of this setup is that ARP requests from the host # will come from the VPP's own address. # self.pg0.generate_remote_hosts(2) arp_req_from_me = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst=self.pg0.remote_hosts[1].ip4, psrc=self.pg0.local_ip4, ) # # Configure Proxy ARP for the subnet on PG0addresses on pg0 # self.vapi.proxy_arp_add_del( proxy={ "table_id": 0, "low": self.pg0._local_ip4_subnet, "hi": self.pg0._local_ip4_bcast, }, is_add=1, ) # Make pg2 un-numbered to pg0 # self.pg2.set_unnumbered(self.pg0.sw_if_index) # # Enable pg2 for proxy ARP # self.pg2.set_proxy_arp() # # Send the ARP request with an originating address that # is VPP's own address # rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, self.pg0.remote_hosts[1].ip4, self.pg0.local_ip4, ) # # validate we have not learned an ARP entry as a result of this # self.assertFalse(find_nbr(self, self.pg2.sw_if_index, self.pg0.local_ip4)) # # setup a punt redirect so packets from the uplink go to the tap # redirect = VppIpPuntRedirect( self, self.pg0.sw_if_index, self.pg2.sw_if_index, self.pg0.local_ip4 ) redirect.add_vpp_config() p_tcp = ( Ether( src=self.pg0.remote_mac, dst=self.pg0.local_mac, ) / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / TCP(sport=80, dport=80) / Raw() ) rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) # there's no ARP entry so this is an ARP req self.assertTrue(rx[0].haslayer(ARP)) # and ARP entry for VPP's pg0 address on the host interface n1 = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_mac, self.pg0.local_ip4, is_no_fib_entry=True, ).add_vpp_config() # now the packets shold forward rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) self.assertFalse(rx[0].haslayer(ARP)) self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac) # # flush the neighbor cache on the uplink # af = VppEnum.vl_api_address_family_t self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) # ensure we can still resolve the ARPs on the uplink self.pg0.resolve_arp() self.assertTrue(find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_ip4)) # # cleanup # self.vapi.proxy_arp_add_del( proxy={ "table_id": 0, "low": self.pg0._local_ip4_subnet, "hi": self.pg0._local_ip4_bcast, }, is_add=0, ) redirect.remove_vpp_config() def test_proxy_arp(self): """Proxy ARP""" self.pg1.generate_remote_hosts(2) # # Proxy ARP request packets for each interface # arp_req_pg0 = Ether(src=self.pg0.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg0.remote_mac, pdst="10.10.10.3", psrc=self.pg0.remote_ip4, ) arp_req_pg0_tagged = ( Ether(src=self.pg0.remote_mac, dst="ff:ff:ff:ff:ff:ff") / Dot1Q(vlan=0) / ARP( op="who-has", hwsrc=self.pg0.remote_mac, pdst="10.10.10.3", psrc=self.pg0.remote_ip4, ) ) arp_req_pg1 = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg1.remote_mac, pdst="10.10.10.3", psrc=self.pg1.remote_ip4, ) arp_req_pg2 = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg2.remote_mac, pdst="10.10.10.3", psrc=self.pg1.remote_hosts[1].ip4, ) arp_req_pg3 = Ether(src=self.pg3.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg3.remote_mac, pdst="10.10.10.3", psrc=self.pg3.remote_ip4, ) # # Configure Proxy ARP for 10.10.10.0 -> 10.10.10.124 # self.vapi.proxy_arp_add_del( proxy={"table_id": 0, "low": "10.10.10.2", "hi": "10.10.10.124"}, is_add=1 ) # # No responses are sent when the interfaces are not enabled for proxy # ARP # self.send_and_assert_no_replies( self.pg0, arp_req_pg0, "ARP req from unconfigured interface" ) self.send_and_assert_no_replies( self.pg2, arp_req_pg2, "ARP req from unconfigured interface" ) # # Make pg2 un-numbered to pg1 # still won't reply. # self.pg2.set_unnumbered(self.pg1.sw_if_index) self.send_and_assert_no_replies( self.pg2, arp_req_pg2, "ARP req from unnumbered interface" ) # # Enable each interface to reply to proxy ARPs # for i in self.pg_interfaces: i.set_proxy_arp() # # Now each of the interfaces should reply to a request to a proxied # address # self.pg0.add_stream(arp_req_pg0) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture(1) self.verify_arp_resp( rx[0], self.pg0.local_mac, self.pg0.remote_mac, "10.10.10.3", self.pg0.remote_ip4, ) self.pg0.add_stream(arp_req_pg0_tagged) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture(1) self.verify_arp_resp( rx[0], self.pg0.local_mac, self.pg0.remote_mac, "10.10.10.3", self.pg0.remote_ip4, ) self.pg1.add_stream(arp_req_pg1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) self.verify_arp_resp( rx[0], self.pg1.local_mac, self.pg1.remote_mac, "10.10.10.3", self.pg1.remote_ip4, ) self.pg2.add_stream(arp_req_pg2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_resp( rx[0], self.pg2.local_mac, self.pg2.remote_mac, "10.10.10.3", self.pg1.remote_hosts[1].ip4, ) # # A request for an address out of the configured range # arp_req_pg1_hi = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg1.remote_mac, pdst="10.10.10.125", psrc=self.pg1.remote_ip4, ) self.send_and_assert_no_replies( self.pg1, arp_req_pg1_hi, "ARP req out of range HI" ) arp_req_pg1_low = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( op="who-has", hwsrc=self.pg1.remote_mac, pdst="10.10.10.1", psrc=self.pg1.remote_ip4, ) self.send_and_assert_no_replies( self.pg1, arp_req_pg1_low, "ARP req out of range Low" ) # # Request for an address in the proxy range but from an interface # in a different VRF # self.send_and_assert_no_replies( self.pg3, arp_req_pg3, "ARP req from different VRF" ) # # Disable Each interface for proxy ARP # - expect none to respond # for i in self.pg_interfaces: i.set_proxy_arp(0) self.send_and_assert_no_replies(self.pg0, arp_req_pg0, "ARP req from disable") self.send_and_assert_no_replies(self.pg1, arp_req_pg1, "ARP req from disable") self.send_and_assert_no_replies(self.pg2, arp_req_pg2, "ARP req from disable") # # clean up on interface 2 # self.pg2.unset_unnumbered(self.pg1.sw_if_index) def test_mpls(self): """MPLS""" # # Interface 2 does not yet have ip4 config # self.pg2.config_ip4() self.pg2.generate_remote_hosts(2) # # Add a route with out going label via an ARP unresolved next-hop # ip_10_0_0_1 = VppIpRoute( self, "10.0.0.1", 32, [ VppRoutePath( self.pg2.remote_hosts[1].ip4, self.pg2.sw_if_index, labels=[55] ) ], ) ip_10_0_0_1.add_vpp_config() # # packets should generate an ARP request # p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst="10.0.0.1") / UDP(sport=1234, dport=1234) / Raw(b"\xa5" * 100) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_arp_req( rx[0], self.pg2.local_mac, self.pg2.local_ip4, self.pg2._remote_hosts[1].ip4 ) # # now resolve the neighbours # self.pg2.configure_ipv4_neighbors() # # Now packet should be properly MPLS encapped. # This verifies that MPLS link-type adjacencies are completed # when the ARP entry resolves # self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg2.get_capture(1) self.verify_ip_o_mpls( rx[0], self.pg2.local_mac, self.pg2.remote_hosts[1].mac, 55, self.pg0.remote_ip4, "10.0.0.1", ) self.pg2.unconfig_ip4() def test_arp_vrrp(self): """ARP reply with VRRP virtual src hw addr""" # # IP packet destined for pg1 remote host arrives on pg0 resulting # in an ARP request for the address of the remote host on pg1 # p0 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=1234, dport=1234) / Raw() ) rx1 = self.send_and_expect(self.pg0, [p0], self.pg1) self.verify_arp_req( rx1[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4 ) # # ARP reply for address of pg1 remote host arrives on pg1 with # the hw src addr set to a value in the VRRP IPv4 range of # MAC addresses # p1 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ARP( op="is-at", hwdst=self.pg1.local_mac, hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4, psrc=self.pg1.remote_ip4, ) self.send_and_assert_no_replies(self.pg1, p1, "ARP reply") # # IP packet destined for pg1 remote host arrives on pg0 again. # VPP should have an ARP entry for that address now and the packet # should be sent out pg1. # rx1 = self.send_and_expect(self.pg0, [p0], self.pg1) self.verify_ip( rx1[0], self.pg1.local_mac, "00:00:5e:00:01:09", self.pg0.remote_ip4, self.pg1.remote_ip4, ) self.pg1.admin_down() self.pg1.admin_up() def test_arp_duplicates(self): """ARP Duplicates""" # # Generate some hosts on the LAN # self.pg1.generate_remote_hosts(3) # # Add host 1 on pg1 and pg2 # arp_pg1 = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, self.pg1.remote_hosts[1].ip4, ) arp_pg1.add_vpp_config() arp_pg2 = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_mac, self.pg1.remote_hosts[1].ip4, ) arp_pg2.add_vpp_config() # # IP packet destined for pg1 remote host arrives on pg1 again. # p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4) / UDP(sport=1234, dport=1234) / Raw() ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx1 = self.pg1.get_capture(1) self.verify_ip( rx1[0], self.pg1.local_mac, self.pg1.remote_hosts[1].mac, self.pg0.remote_ip4, self.pg1.remote_hosts[1].ip4, ) # # remove the duplicate on pg1 # packet stream should generate ARPs out of pg1 # arp_pg1.remove_vpp_config() self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx1 = self.pg1.get_capture(1) self.verify_arp_req( rx1[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_hosts[1].ip4 ) # # Add it back # arp_pg1.add_vpp_config() self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx1 = self.pg1.get_capture(1) self.verify_ip( rx1[0], self.pg1.local_mac, self.pg1.remote_hosts[1].mac, self.pg0.remote_ip4, self.pg1.remote_hosts[1].ip4, ) def test_arp_static(self): """ARP Static""" self.pg2.generate_remote_hosts(3) # # Add a static ARP entry # static_arp = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_hosts[1].mac, self.pg2.remote_hosts[1].ip4, is_static=1, ) static_arp.add_vpp_config() # # Add the connected prefix to the interface # self.pg2.config_ip4() # # We should now find the adj-fib # self.assertTrue( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[1].ip4, is_static=1 ) ) self.assertTrue(find_route(self, self.pg2.remote_hosts[1].ip4, 32)) # # remove the connected # self.pg2.unconfig_ip4() # # put the interface into table 1 # self.pg2.set_table_ip4(1) # # configure the same connected and expect to find the # adj fib in the new table # self.pg2.config_ip4() self.assertTrue(find_route(self, self.pg2.remote_hosts[1].ip4, 32, table_id=1)) # # clean-up # self.pg2.unconfig_ip4() static_arp.remove_vpp_config() self.pg2.set_table_ip4(0) def test_arp_static_replace_dynamic_same_mac(self): """ARP Static can replace Dynamic (same mac)""" self.pg2.generate_remote_hosts(1) dyn_arp = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].mac, self.pg2.remote_hosts[0].ip4, ) static_arp = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].mac, self.pg2.remote_hosts[0].ip4, is_static=1, ) # # Add a dynamic ARP entry # dyn_arp.add_vpp_config() # # We should find the dynamic nbr # self.assertFalse( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1 ) ) self.assertTrue( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0, mac=self.pg2.remote_hosts[0].mac, ) ) # # Add a static ARP entry with the same mac # static_arp.add_vpp_config() # # We should now find the static nbr with the same mac # self.assertFalse( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0 ) ) self.assertTrue( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1, mac=self.pg2.remote_hosts[0].mac, ) ) # # clean-up # static_arp.remove_vpp_config() def test_arp_static_replace_dynamic_diff_mac(self): """ARP Static can replace Dynamic (diff mac)""" self.pg2.generate_remote_hosts(2) dyn_arp = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].mac, self.pg2.remote_hosts[0].ip4, ) static_arp = VppNeighbor( self, self.pg2.sw_if_index, self.pg2.remote_hosts[1].mac, self.pg2.remote_hosts[0].ip4, is_static=1, ) # # Add a dynamic ARP entry # dyn_arp.add_vpp_config() # # We should find the dynamic nbr # self.assertFalse( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1 ) ) self.assertTrue( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0, mac=self.pg2.remote_hosts[0].mac, ) ) # # Add a static ARP entry with a changed mac # static_arp.add_vpp_config() # # We should now find the static nbr with a changed mac # self.assertFalse( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0 ) ) self.assertTrue( find_nbr( self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1, mac=self.pg2.remote_hosts[1].mac, ) ) # # clean-up # static_arp.remove_vpp_config() def test_arp_incomplete(self): """ARP Incomplete""" self.pg1.generate_remote_hosts(4) p0 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4) / UDP(sport=1234, dport=1234) / Raw() ) p1 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[2].ip4) / UDP(sport=1234, dport=1234) / Raw() ) p2 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst="1.1.1.1") / UDP(sport=1234, dport=1234) / Raw() ) # # a packet to an unresolved destination generates an ARP request # rx = self.send_and_expect(self.pg0, [p0], self.pg1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4 ) # # add a neighbour for remote host 1 # static_arp = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, self.pg1.remote_hosts[1].ip4, is_static=1, ) static_arp.add_vpp_config() # # add a route through remote host 3 hence we get an incomplete # VppIpRoute( self, "1.1.1.1", 32, [VppRoutePath(self.pg1.remote_hosts[3].ip4, self.pg1.sw_if_index)], ).add_vpp_config() rx = self.send_and_expect(self.pg0, [p2], self.pg1) self.verify_arp_req( rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[3].ip4 ) # # change the interface's MAC # self.vapi.sw_interface_set_mac_address( self.pg1.sw_if_index, "00:00:00:33:33:33" ) # # now ARP requests come from the new source mac # rx = self.send_and_expect(self.pg0, [p1], self.pg1) self.verify_arp_req( rx[0], "00:00:00:33:33:33", self.pg1.local_ip4, self.pg1._remote_hosts[2].ip4, ) rx = self.send_and_expect(self.pg0, [p2], self.pg1) self.verify_arp_req( rx[0], "00:00:00:33:33:33", self.pg1.local_ip4, self.pg1._remote_hosts[3].ip4, ) # # packets to the resolved host also have the new source mac # rx = self.send_and_expect(self.pg0, [p0], self.pg1) self.verify_ip( rx[0], "00:00:00:33:33:33", self.pg1.remote_hosts[1].mac, self.pg0.remote_ip4, self.pg1.remote_hosts[1].ip4, ) # # set the mac address on the interface that does not have a # configured subnet and thus no glean # self.vapi.sw_interface_set_mac_address( self.pg2.sw_if_index, "00:00:00:33:33:33" ) def test_garp(self): """GARP""" # # Generate some hosts on the LAN # self.pg1.generate_remote_hosts(4) self.pg2.generate_remote_hosts(4) # # And an ARP entry # arp = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, self.pg1.remote_hosts[1].ip4, ) arp.add_vpp_config() self.assertTrue( find_nbr( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].ip4, mac=self.pg1.remote_hosts[1].mac, ) ) # # Send a GARP (request) to swap the host 1's address to that of host 2 # p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[2].mac) / ARP( op="who-has", hwdst=self.pg1.local_mac, hwsrc=self.pg1.remote_hosts[2].mac, pdst=self.pg1.remote_hosts[1].ip4, psrc=self.pg1.remote_hosts[1].ip4, ) self.pg1.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.assertTrue( find_nbr( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].ip4, mac=self.pg1.remote_hosts[2].mac, ) ) self.assert_equal(self.get_arp_rx_garp(self.pg1), 1) # # Send a GARP (reply) to swap the host 1's address to that of host 3 # p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP( op="is-at", hwdst=self.pg1.local_mac, hwsrc=self.pg1.remote_hosts[3].mac, pdst=self.pg1.remote_hosts[1].ip4, psrc=self.pg1.remote_hosts[1].ip4, ) self.pg1.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.assertTrue( find_nbr( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].ip4, mac=self.pg1.remote_hosts[3].mac, ) ) self.assert_equal(self.get_arp_rx_garp(self.pg1), 2) # # GARPs (request nor replies) for host we don't know yet # don't result in new neighbour entries # p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP( op="who-has", hwdst=self.pg1.local_mac, hwsrc=self.pg1.remote_hosts[3].mac, pdst=self.pg1.remote_hosts[2].ip4, psrc=self.pg1.remote_hosts[2].ip4, ) self.pg1.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[2].ip4) ) p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP( op="is-at", hwdst=self.pg1.local_mac, hwsrc=self.pg1.remote_hosts[3].mac, pdst=self.pg1.remote_hosts[2].ip4, psrc=self.pg1.remote_hosts[2].ip4, ) self.pg1.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[2].ip4) ) # # IP address in different subnets are not learnt # self.pg2.configure_ipv4_neighbors() cntr = self.statistics.get_err_counter( "/err/arp-reply/l3_dst_address_not_local" ) for op in ["is-at", "who-has"]: p1 = [ ( Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_hosts[1].mac) / ARP( op=op, hwdst=self.pg2.local_mac, hwsrc=self.pg2.remote_hosts[1].mac, pdst=self.pg2.remote_hosts[1].ip4, psrc=self.pg2.remote_hosts[1].ip4, ) ), ( Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_hosts[1].mac) / ARP( op=op, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=self.pg2.remote_hosts[1].mac, pdst=self.pg2.remote_hosts[1].ip4, psrc=self.pg2.remote_hosts[1].ip4, ) ), ] self.send_and_assert_no_replies(self.pg1, p1) self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg2.remote_hosts[1].ip4) ) # they are all dropped because the subnet's don't match self.assertEqual( cntr + 4, self.statistics.get_err_counter("/err/arp-reply/l3_dst_address_not_local"), ) def test_arp_incomplete2(self): """Incomplete Entries""" # # ensure that we throttle the ARP and ND requests # self.pg0.generate_remote_hosts(2) # # IPv4/ARP # ip_10_0_0_1 = VppIpRoute( self, "10.0.0.1", 32, [VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)], ) ip_10_0_0_1.add_vpp_config() p1 = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IP(src=self.pg1.remote_ip4, dst="10.0.0.1") / UDP(sport=1234, dport=1234) / Raw() ) self.pg1.add_stream(p1 * 257) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0._get_capture(1) # # how many we get is going to be dependent on the time for packet # processing but it should be small # self.assertLess(len(rx), 64) # # IPv6/ND # ip_10_1 = VppIpRoute( self, "10::1", 128, [ VppRoutePath( self.pg0.remote_hosts[1].ip6, self.pg0.sw_if_index, proto=DpoProto.DPO_PROTO_IP6, ) ], ) ip_10_1.add_vpp_config() p1 = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IPv6(src=self.pg1.remote_ip6, dst="10::1") / UDP(sport=1234, dport=1234) / Raw() ) self.pg1.add_stream(p1 * 257) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0._get_capture(1) # # how many we get is going to be dependent on the time for packet # processing but it should be small # self.assertLess(len(rx), 64) def test_arp_forus(self): """ARP for for-us""" # # Test that VPP responds with ARP requests to addresses that # are connected and local routes. # Use one of the 'remote' addresses in the subnet as a local address # The intention of this route is that it then acts like a secondary # address added to an interface # self.pg0.generate_remote_hosts(2) forus = VppIpRoute( self, self.pg0.remote_hosts[1].ip4, 32, [ VppRoutePath( "0.0.0.0", self.pg0.sw_if_index, type=FibPathType.FIB_PATH_TYPE_LOCAL, ) ], ) forus.add_vpp_config() p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP( op="who-has", hwdst=self.pg0.local_mac, hwsrc=self.pg0.remote_mac, pdst=self.pg0.remote_hosts[1].ip4, psrc=self.pg0.remote_ip4, ) rx = self.send_and_expect(self.pg0, [p], self.pg0) self.verify_arp_resp( rx[0], self.pg0.local_mac, self.pg0.remote_mac, self.pg0.remote_hosts[1].ip4, self.pg0.remote_ip4, ) def test_arp_table_swap(self): # # Generate some hosts on the LAN # N_NBRS = 4 self.pg1.generate_remote_hosts(N_NBRS) for n in range(N_NBRS): # a route thru each neighbour VppIpRoute( self, "10.0.0.%d" % n, 32, [VppRoutePath(self.pg1.remote_hosts[n].ip4, self.pg1.sw_if_index)], ).add_vpp_config() # resolve each neighbour p1 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ARP( op="is-at", hwdst=self.pg1.local_mac, hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4, psrc=self.pg1.remote_hosts[n].ip4, ) self.send_and_assert_no_replies(self.pg1, p1, "ARP reply") self.logger.info(self.vapi.cli("sh ip neighbors")) # # swap the table pg1 is in # table = VppIpTable(self, 100).add_vpp_config() self.pg1.unconfig_ip4() self.pg1.set_table_ip4(100) self.pg1.config_ip4() # # all neighbours are cleared # for n in range(N_NBRS): self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip4) ) # # packets to all neighbours generate ARP requests # for n in range(N_NBRS): # a route thru each neighbour VppIpRoute( self, "10.0.0.%d" % n, 32, [VppRoutePath(self.pg1.remote_hosts[n].ip4, self.pg1.sw_if_index)], table_id=100, ).add_vpp_config() p = ( Ether(src=self.pg1.remote_hosts[n].mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_hosts[n].ip4, dst="10.0.0.%d" % n) / Raw(b"0x5" * 100) ) rxs = self.send_and_expect(self.pg1, [p], self.pg1) for rx in rxs: self.verify_arp_req( rx, self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_hosts[n].ip4, ) self.pg1.unconfig_ip4() self.pg1.set_table_ip4(0) def test_glean_src_select(self): """Multi Connecteds""" # # configure multiple connected subnets on an interface # and ensure that ARP requests for hosts on those subnets # pick up the correct source address # conn1 = VppIpInterfaceAddress(self, self.pg1, "10.0.0.1", 24).add_vpp_config() conn2 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.1", 24).add_vpp_config() p1 = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg1.remote_ip4, dst="10.0.0.128") / Raw(b"0x5" * 100) ) rxs = self.send_and_expect(self.pg0, [p1], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.0.1", "10.0.0.128") p2 = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg1.remote_ip4, dst="10.0.1.128") / Raw(b"0x5" * 100) ) rxs = self.send_and_expect(self.pg0, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.1", "10.0.1.128") # # add a local address in the same subnet # the source addresses are equivalent. # VPP leaves the glean address being used for a prefix # in place until that address is deleted. # conn3 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.2", 24).add_vpp_config() rxs = self.send_and_expect(self.pg0, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.1", "10.0.1.128") # # remove first address, which is currently in use # the second address should be used now # conn2.remove_vpp_config() rxs = self.send_and_expect(self.pg0, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128") # # add first address back. Second address should continue # being used. # conn2 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.1", 24).add_vpp_config() rxs = self.send_and_expect(self.pg0, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128") conn1.remove_vpp_config() rxs = self.send_and_expect(self.pg0, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128") # apply a connected prefix to an interface in a different table VppIpRoute( self, "10.0.1.0", 24, [VppRoutePath("0.0.0.0", self.pg1.sw_if_index)], table_id=1, ).add_vpp_config() rxs = self.send_and_expect(self.pg3, [p2], self.pg1) for rx in rxs: self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128") # apply an attached prefix to the interface # since there's no local address in this prefix, # any other address is used p3 = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg1.remote_ip4, dst="10.0.2.128") / Raw(b"0x5" * 100) ) VppIpRoute( self, "10.0.2.0", 24, [VppRoutePath("0.0.0.0", self.pg1.sw_if_index)], ).add_vpp_config() rxs = self.send_and_expect(self.pg0, [p3], self.pg1) for rx in rxs: self.verify_arp_req( rx, self.pg1.local_mac, self.pg1.local_ip4, "10.0.2.128" ) # cleanup conn3.remove_vpp_config() conn2.remove_vpp_config() @tag_fixme_vpp_workers class NeighborStatsTestCase(VppTestCase): """ARP/ND Counters""" @classmethod def setUpClass(cls): super(NeighborStatsTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(NeighborStatsTestCase, cls).tearDownClass() def setUp(self): super(NeighborStatsTestCase, self).setUp() self.create_pg_interfaces(range(2)) # pg0 configured with ip4 and 6 addresses used for input # pg1 configured with ip4 and 6 addresses used for output # pg2 is unnumbered to pg0 for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(NeighborStatsTestCase, self).tearDown() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def test_arp_stats(self): """ARP Counters""" self.vapi.cli("adj counters enable") self.pg1.generate_remote_hosts(2) arp1 = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[0].mac, self.pg1.remote_hosts[0].ip4, ) arp1.add_vpp_config() arp2 = VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[1].mac, self.pg1.remote_hosts[1].ip4, ) arp2.add_vpp_config() p1 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[0].ip4) / UDP(sport=1234, dport=1234) / Raw() ) p2 = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4) / UDP(sport=1234, dport=1234) / Raw() ) rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1) rx = self.send_and_expect(self.pg0, p2 * NUM_PKTS, self.pg1) self.assertEqual(NUM_PKTS, arp1.get_stats()["packets"]) self.assertEqual(NUM_PKTS, arp2.get_stats()["packets"]) rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1) self.assertEqual(NUM_PKTS * 2, arp1.get_stats()["packets"]) def test_nd_stats(self): """ND Counters""" self.vapi.cli("adj counters enable") self.pg0.generate_remote_hosts(3) nd1 = VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[1].mac, self.pg0.remote_hosts[1].ip6, ) nd1.add_vpp_config() nd2 = VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[2].mac, self.pg0.remote_hosts[2].ip6, ) nd2.add_vpp_config() p1 = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IPv6(src=self.pg1.remote_ip6, dst=self.pg0.remote_hosts[1].ip6) / UDP(sport=1234, dport=1234) / Raw() ) p2 = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IPv6(src=self.pg1.remote_ip6, dst=self.pg0.remote_hosts[2].ip6) / UDP(sport=1234, dport=1234) / Raw() ) rx = self.send_and_expect(self.pg1, p1 * 16, self.pg0) rx = self.send_and_expect(self.pg1, p2 * 16, self.pg0) self.assertEqual(16, nd1.get_stats()["packets"]) self.assertEqual(16, nd2.get_stats()["packets"]) rx = self.send_and_expect(self.pg1, p1 * NUM_PKTS, self.pg0) self.assertEqual(NUM_PKTS + 16, nd1.get_stats()["packets"]) @tag_fixme_ubuntu2204 class NeighborAgeTestCase(VppTestCase): """ARP/ND Aging""" @classmethod def setUpClass(cls): super(NeighborAgeTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(NeighborAgeTestCase, cls).tearDownClass() def setUp(self): super(NeighborAgeTestCase, self).setUp() self.create_pg_interfaces(range(1)) # pg0 configured with ip4 and 6 addresses used for input # pg1 configured with ip4 and 6 addresses used for output # pg2 is unnumbered to pg0 for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(NeighborAgeTestCase, self).tearDown() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def verify_arp_req(self, rx, smac, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") self.assertEqual(ether.src, smac) arp = rx[ARP] self.assertEqual(arp.hwtype, 1) self.assertEqual(arp.ptype, 0x800) self.assertEqual(arp.hwlen, 6) self.assertEqual(arp.plen, 4) self.assertEqual(arp.op, arp_opts["who-has"]) self.assertEqual(arp.hwsrc, smac) self.assertEqual(arp.hwdst, "00:00:00:00:00:00") self.assertEqual(arp.psrc, sip) self.assertEqual(arp.pdst, dip) def verify_ip_neighbor_config(self, af, max_number, max_age, recycle): config = self.vapi.ip_neighbor_config_get(af) self.assertEqual(config.af, af) self.assertEqual(config.max_number, max_number) self.assertEqual(config.max_age, max_age) self.assertEqual(config.recycle, recycle) def test_age(self): """Aging/Recycle""" self.vapi.cli("set logging unthrottle 0") self.vapi.cli("set logging size %d" % 0xFFFF) self.pg0.generate_remote_hosts(201) vaf = VppEnum.vl_api_address_family_t # # start listening on all interfaces # self.pg_enable_capture(self.pg_interfaces) # # Verify neighbor configuration defaults # self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=50000, max_age=0, recycle=False ) # # Set the neighbor configuration: # limi = 200 # age = 0 seconds # recycle = false # self.vapi.ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False ) self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False ) self.vapi.cli("sh ip neighbor-config") # add the 198 neighbours that should pass (-1 for one created in setup) for ii in range(200): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[ii].mac, self.pg0.remote_hosts[ii].ip4, ).add_vpp_config() # one more neighbor over the limit should fail with self.vapi.assert_negative_api_retval(): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[200].mac, self.pg0.remote_hosts[200].ip4, ).add_vpp_config() # # change the config to allow recycling the old neighbors # self.vapi.ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=True ) self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=True ) # now new additions are allowed VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[200].mac, self.pg0.remote_hosts[200].ip4, ).add_vpp_config() # add the first neighbor we configured has been re-used self.assertFalse( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[0].ip4) ) self.assertTrue( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[200].ip4) ) # # change the config to age old neighbors # self.vapi.ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=2, recycle=True ) self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=2, recycle=True ) self.vapi.cli("sh ip4 neighbor-sorted") # age out neighbors self.virtual_sleep(3) # # expect probes from all these ARP entries as they age # 3 probes for each neighbor 3*200 = 600 rxs = self.pg0.get_capture(600, timeout=2) for ii in range(3): for jj in range(200): rx = rxs[ii * 200 + jj] # rx.show() # # 3 probes sent then 1 more second to see if a reply comes, before # they age out # self.virtual_sleep(1) self.assertFalse( self.vapi.ip_neighbor_dump(sw_if_index=0xFFFFFFFF, af=vaf.ADDRESS_IP4) ) # # load up some neighbours again with 2s aging enabled # they should be removed after 10s (2s age + 4s for probes + gap) # check for the add and remove events # enum = VppEnum.vl_api_ip_neighbor_event_flags_t self.vapi.want_ip_neighbor_events_v2(enable=1) for ii in range(10): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[ii].mac, self.pg0.remote_hosts[ii].ip4, ).add_vpp_config() e = self.vapi.wait_for_event(1, "ip_neighbor_event_v2") self.assertEqual(e.flags, enum.IP_NEIGHBOR_API_EVENT_FLAG_ADDED) self.assertEqual(str(e.neighbor.ip_address), self.pg0.remote_hosts[ii].ip4) self.assertEqual(e.neighbor.mac_address, self.pg0.remote_hosts[ii].mac) self.virtual_sleep(10) self.assertFalse( self.vapi.ip_neighbor_dump(sw_if_index=0xFFFFFFFF, af=vaf.ADDRESS_IP4) ) evs = [] for ii in range(10): e = self.vapi.wait_for_event(1, "ip_neighbor_event_v2") self.assertEqual(e.flags, enum.IP_NEIGHBOR_API_EVENT_FLAG_REMOVED) evs.append(e) # check we got the correct mac/ip pairs - done separately # because we don't care about the order the remove notifications # arrive for ii in range(10): found = False mac = self.pg0.remote_hosts[ii].mac ip = self.pg0.remote_hosts[ii].ip4 for e in evs: if e.neighbor.mac_address == mac and str(e.neighbor.ip_address) == ip: found = True break self.assertTrue(found) # # check if we can set age and recycle with empty neighbor list # self.vapi.ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=1000, recycle=True ) self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=1000, recycle=True ) # # load up some neighbours again, then disable the aging # they should still be there in 10 seconds time # for ii in range(10): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[ii].mac, self.pg0.remote_hosts[ii].ip4, ).add_vpp_config() self.vapi.ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False ) self.verify_ip_neighbor_config( af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False ) self.virtual_sleep(10) self.assertTrue( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[0].ip4) ) class NeighborReplaceTestCase(VppTestCase): """ARP/ND Replacement""" @classmethod def setUpClass(cls): super(NeighborReplaceTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(NeighborReplaceTestCase, cls).tearDownClass() def setUp(self): super(NeighborReplaceTestCase, self).setUp() self.create_pg_interfaces(range(4)) # pg0 configured with ip4 and 6 addresses used for input # pg1 configured with ip4 and 6 addresses used for output # pg2 is unnumbered to pg0 for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(NeighborReplaceTestCase, self).tearDown() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def test_replace(self): """replace""" N_HOSTS = 16 for i in self.pg_interfaces: i.generate_remote_hosts(N_HOSTS) i.configure_ipv4_neighbors() i.configure_ipv6_neighbors() # replace them all self.vapi.ip_neighbor_replace_begin() self.vapi.ip_neighbor_replace_end() for i in self.pg_interfaces: for h in range(N_HOSTS): self.assertFalse( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[h].ip4) ) self.assertFalse( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[h].ip6) ) # # and them all back via the API # for i in self.pg_interfaces: for h in range(N_HOSTS): VppNeighbor( self, i.sw_if_index, i.remote_hosts[h].mac, i.remote_hosts[h].ip4 ).add_vpp_config() VppNeighbor( self, i.sw_if_index, i.remote_hosts[h].mac, i.remote_hosts[h].ip6 ).add_vpp_config() # # begin the replacement again, this time touch some # the neighbours on pg1 so they are not deleted # self.vapi.ip_neighbor_replace_begin() # update from the API all neighbours on pg1 for h in range(N_HOSTS): VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[h].mac, self.pg1.remote_hosts[h].ip4, ).add_vpp_config() VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[h].mac, self.pg1.remote_hosts[h].ip6, ).add_vpp_config() # update from the data-plane all neighbours on pg3 self.pg3.configure_ipv4_neighbors() self.pg3.configure_ipv6_neighbors() # complete the replacement self.logger.info(self.vapi.cli("sh ip neighbors")) self.vapi.ip_neighbor_replace_end() for i in self.pg_interfaces: if i == self.pg1 or i == self.pg3: # neighbours on pg1 and pg3 are still present for h in range(N_HOSTS): self.assertTrue( find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip4) ) self.assertTrue( find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip6) ) else: # all other neighbours are toast for h in range(N_HOSTS): self.assertFalse( find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip4) ) self.assertFalse( find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip6) ) class NeighborFlush(VppTestCase): """Neighbor Flush""" @classmethod def setUpClass(cls): super(NeighborFlush, cls).setUpClass() @classmethod def tearDownClass(cls): super(NeighborFlush, cls).tearDownClass() def setUp(self): super(NeighborFlush, self).setUp() self.create_pg_interfaces(range(2)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.config_ip6() i.resolve_arp() i.resolve_ndp() def tearDown(self): super(NeighborFlush, self).tearDown() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() def test_flush(self): """Neighbour Flush""" e = VppEnum nf = e.vl_api_ip_neighbor_flags_t af = e.vl_api_address_family_t N_HOSTS = 16 static = [False, True] self.pg0.generate_remote_hosts(N_HOSTS) self.pg1.generate_remote_hosts(N_HOSTS) for s in static: # a few v4 and v6 dynamic neoghbors for n in range(N_HOSTS): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].mac, self.pg0.remote_hosts[n].ip4, is_static=s, ).add_vpp_config() VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].mac, self.pg1.remote_hosts[n].ip6, is_static=s, ).add_vpp_config() # flush the interfaces individually self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) # check we haven't flushed that which we shouldn't for n in range(N_HOSTS): self.assertTrue( find_nbr( self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip6, is_static=s, ) ) self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index) for n in range(N_HOSTS): self.assertFalse( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].ip4) ) self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip6) ) # add the nieghbours back for n in range(N_HOSTS): VppNeighbor( self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].mac, self.pg0.remote_hosts[n].ip4, is_static=s, ).add_vpp_config() VppNeighbor( self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].mac, self.pg1.remote_hosts[n].ip6, is_static=s, ).add_vpp_config() self.logger.info(self.vapi.cli("sh ip neighbor")) # flush both interfaces at the same time self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xFFFFFFFF) # check we haven't flushed that which we shouldn't for n in range(N_HOSTS): self.assertTrue( find_nbr( self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].ip4, is_static=s, ) ) self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xFFFFFFFF) for n in range(N_HOSTS): self.assertFalse( find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].ip4) ) self.assertFalse( find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip6) ) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)