diff options
Diffstat (limited to 'test/test_neighbor.py')
-rw-r--r-- | test/test_neighbor.py | 162 |
1 files changed, 156 insertions, 6 deletions
diff --git a/test/test_neighbor.py b/test/test_neighbor.py index cc1357ce0d8..416241e8a64 100644 --- a/test/test_neighbor.py +++ b/test/test_neighbor.py @@ -13,7 +13,7 @@ from vpp_papi import VppEnum import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q -from scapy.layers.inet import IP, UDP +from scapy.layers.inet import IP, UDP, TCP from scapy.layers.inet6 import IPv6 from scapy.contrib.mpls import MPLS from scapy.layers.inet6 import IPv6 @@ -777,11 +777,7 @@ class ARPTestCase(VppTestCase): # Send the ARP request with an originating address that # is VPP's own address # - self.pg2.add_stream(arp_req_from_me) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - - rx = self.pg2.get_capture(1) + rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2) self.verify_arp_resp(rx[0], self.pg2.local_mac, self.pg2.remote_mac, @@ -796,6 +792,48 @@ class ARPTestCase(VppTestCase): self.pg0.local_ip4)) # + # setup a punt redirect so packets from the uplink go to the tap + # + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg2.sw_if_index, + self.pg0.local_ip4) + + p_tcp = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac,) / + IP(src=self.pg0.remote_ip4, + dst=self.pg0.local_ip4) / + TCP(sport=80, dport=80) / + Raw()) + rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) + + # there's no ARP entry so this is an ARP req + self.assertTrue(rx[0].haslayer(ARP)) + + # and ARP entry for VPP's pg0 address on the host interface + n1 = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_mac, + self.pg0.local_ip4, + is_no_fib_entry=True).add_vpp_config() + # now the packets shold forward + rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) + self.assertFalse(rx[0].haslayer(ARP)) + self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac) + + # + # flush the neighbor cache on the uplink + # + af = VppEnum.vl_api_address_family_t + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) + + # ensure we can still resolve the ARPs on the uplink + self.pg0.resolve_arp() + + self.assertTrue(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_ip4)) + + # # cleanup # self.pg2.set_proxy_arp(0) @@ -1966,5 +2004,117 @@ class NeighborReplaceTestCase(VppTestCase): i.remote_hosts[h].ip6)) +class NeighborFlush(VppTestCase): + """ Neighbor Flush """ + + @classmethod + def setUpClass(cls): + super(NeighborFlush, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(NeighborFlush, cls).tearDownClass() + + def setUp(self): + super(NeighborFlush, self).setUp() + + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.config_ip6() + i.resolve_arp() + i.resolve_ndp() + + def tearDown(self): + super(NeighborFlush, self).tearDown() + + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + + def test_flush(self): + """ Neighbour Flush """ + + e = VppEnum + nf = e.vl_api_ip_neighbor_flags_t + af = e.vl_api_address_family_t + N_HOSTS = 16 + static = [False, True] + self.pg0.generate_remote_hosts(N_HOSTS) + self.pg1.generate_remote_hosts(N_HOSTS) + + for s in static: + # a few v4 and v6 dynamic neoghbors + for n in range(N_HOSTS): + VppNeighbor(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].mac, + self.pg0.remote_hosts[n].ip4, + is_static=s).add_vpp_config() + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].mac, + self.pg1.remote_hosts[n].ip6, + is_static=s).add_vpp_config() + + # flush the interfaces individually + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) + + # check we haven't flushed that which we shouldn't + for n in range(N_HOSTS): + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6, + is_static=s)) + + self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index) + + for n in range(N_HOSTS): + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4)) + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6)) + + # add the nieghbours back + for n in range(N_HOSTS): + VppNeighbor(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].mac, + self.pg0.remote_hosts[n].ip4, + is_static=s).add_vpp_config() + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].mac, + self.pg1.remote_hosts[n].ip6, + is_static=s).add_vpp_config() + + self.logger.info(self.vapi.cli("sh ip neighbor")) + + # flush both interfaces at the same time + self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xffffffff) + + # check we haven't flushed that which we shouldn't + for n in range(N_HOSTS): + self.assertTrue(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4, + is_static=s)) + + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xffffffff) + + for n in range(N_HOSTS): + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4)) + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6)) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |