From 1976f36b09ead86874feb630277dfaeceed9c0fe Mon Sep 17 00:00:00 2001 From: Neale Ranns Date: Wed, 6 Nov 2019 13:13:01 +0000 Subject: tests: Add UT to test incomplete MPLS adjacencies send ARP requests Type: test Change-Id: I81e07233aec54c786e4e9beb8c4f06d0a3dca90f Signed-off-by: Neale Ranns --- test/test_mpls.py | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/test/test_mpls.py b/test/test_mpls.py index 8ed047df5f4..32868c69157 100644 --- a/test/test_mpls.py +++ b/test/test_mpls.py @@ -14,13 +14,19 @@ from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface import scapy.compat from scapy.packet import Raw -from scapy.layers.l2 import Ether +from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded from scapy.contrib.mpls import MPLS NUM_PKTS = 67 +# scapy removed these attributes. +# we asked that they be restored: https://github.com/secdev/scapy/pull/1878 +# semantic names have more meaning than numbers. so here they are. +ARP.who_has = 1 +ARP.is_at = 2 + def verify_filter(capture, sent): if not len(capture) == len(sent): @@ -2079,10 +2085,9 @@ class TestMPLSL2(VppTestCase): tbl.add_vpp_config() self.tables.append(tbl) - # use pg0 as the core facing interface + # use pg0 as the core facing interface, don't resolve ARP self.pg0.admin_up() self.pg0.config_ip4() - self.pg0.resolve_arp() self.pg0.enable_mpls() # use the other 2 for customer facing L2 links @@ -2116,6 +2121,22 @@ class TestMPLSL2(VppTestCase): self.assertEqual(rx_eth.src, tx_eth.src) self.assertEqual(rx_eth.dst, tx_eth.dst) + def verify_arp_req(self, rx, smac, sip, dip): + ether = rx[Ether] + self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") + self.assertEqual(ether.src, smac) + + arp = rx[ARP] + self.assertEqual(arp.hwtype, 1) + self.assertEqual(arp.ptype, 0x800) + self.assertEqual(arp.hwlen, 6) + self.assertEqual(arp.plen, 4) + self.assertEqual(arp.op, ARP.who_has) + self.assertEqual(arp.hwsrc, smac) + self.assertEqual(arp.hwdst, "00:00:00:00:00:00") + self.assertEqual(arp.psrc, sip) + self.assertEqual(arp.pdst, dip) + def test_vpws(self): """ Virtual Private Wire Service """ @@ -2176,7 +2197,21 @@ class TestMPLSL2(VppTestCase): # # Inject a packet from the customer/L2 side + # there's no resolved ARP entry so the first packet we see should be + # an ARP request + # + tx1 = pcore[MPLS].payload + rx1 = self.send_and_expect(self.pg1, [tx1], self.pg0) + + self.verify_arp_req(rx1[0], + self.pg0.local_mac, + self.pg0.local_ip4, + self.pg0.remote_ip4) + # + # resolve the ARP entries and send again + # + self.pg0.resolve_arp() tx1 = pcore[MPLS].payload * NUM_PKTS rx1 = self.send_and_expect(self.pg1, tx1, self.pg0) @@ -2184,6 +2219,10 @@ class TestMPLSL2(VppTestCase): def test_vpls(self): """ Virtual Private LAN Service """ + + # we skipped this in the setup + self.pg0.resolve_arp() + # # Create a L2 MPLS tunnels # -- cgit 1.2.3-korg