aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_l2bd_arp_term.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_l2bd_arp_term.py')
-rw-r--r--test/test_l2bd_arp_term.py176
1 files changed, 87 insertions, 89 deletions
diff --git a/test/test_l2bd_arp_term.py b/test/test_l2bd_arp_term.py
index c64d5d4f01c..598203950d9 100644
--- a/test/test_l2bd_arp_term.py
+++ b/test/test_l2bd_arp_term.py
@@ -10,19 +10,38 @@ from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP
-from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
- in6_mactoifaceid, in6_ismaddr
-from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
- ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
- ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
- ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
+from scapy.utils6 import (
+ in6_getnsma,
+ in6_getnsmac,
+ in6_ptop,
+ in6_islladdr,
+ in6_mactoifaceid,
+ in6_ismaddr,
+)
+from scapy.layers.inet6 import (
+ IPv6,
+ UDP,
+ ICMPv6ND_NS,
+ ICMPv6ND_RS,
+ ICMPv6ND_RA,
+ ICMPv6NDOptSrcLLAddr,
+ getmacbyip6,
+ ICMPv6MRD_Solicitation,
+ ICMPv6NDOptMTU,
+ ICMPv6NDOptSrcLLAddr,
+ ICMPv6NDOptPrefixInfo,
+ ICMPv6ND_NA,
+ ICMPv6NDOptDstLLAddr,
+ ICMPv6DestUnreach,
+ icmp6types,
+)
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
class TestL2bdArpTerm(VppTestCase):
- """ L2BD arp termination Test Case """
+ """L2BD arp termination Test Case"""
@classmethod
def setUpClass(cls):
@@ -77,11 +96,9 @@ class TestL2bdArpTerm(VppTestCase):
def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
for e in entries:
ip = e.ip4 if is_ipv6 == 0 else e.ip6
- self.vapi.bd_ip_mac_add_del(is_add=is_add,
- entry={
- 'bd_id': bd_id,
- 'ip': ip,
- 'mac': e.mac})
+ self.vapi.bd_ip_mac_add_del(
+ is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac}
+ )
@classmethod
def mac_list(cls, b6_range):
@@ -89,23 +106,23 @@ class TestL2bdArpTerm(VppTestCase):
@classmethod
def ip4_host(cls, subnet, host, mac):
- return Host(mac=mac,
- ip4="172.17.1%02u.%u" % (subnet, host))
+ return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host))
@classmethod
def ip4_hosts(cls, subnet, start, mac_list):
- return {cls.ip4_host(subnet, start + j, mac_list[j])
- for j in range(len(mac_list))}
+ return {
+ cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
+ }
@classmethod
def ip6_host(cls, subnet, host, mac):
- return Host(mac=mac,
- ip6="fd01:%x::%x" % (subnet, host))
+ return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host))
@classmethod
def ip6_hosts(cls, subnet, start, mac_list):
- return {cls.ip6_host(subnet, start + j, mac_list[j])
- for j in range(len(mac_list))}
+ return {
+ cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
+ }
@classmethod
def bd_swifs(cls, b):
@@ -118,18 +135,17 @@ class TestL2bdArpTerm(VppTestCase):
self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
for swif in self.bd_swifs(bd_id):
swif_idx = swif.sw_if_index
- self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
- bd_id=bd_id, enable=is_add)
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add
+ )
if not is_add:
self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
@classmethod
def arp_req(cls, src_host, host):
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
- ARP(op="who-has",
- hwsrc=src_host.bin_mac,
- pdst=host.ip4,
- psrc=src_host.ip4))
+ return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(
+ op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4
+ )
@classmethod
def arp_reqs(cls, src_host, entries):
@@ -167,7 +183,7 @@ class TestL2bdArpTerm(VppTestCase):
o2 = int(ip / 65536) % 256
o3 = int(ip / 256) % 256
o4 = int(ip) % 256
- return '%s.%s.%s.%s' % (o1, o2, o3, o4)
+ return "%s.%s.%s.%s" % (o1, o2, o3, o4)
def arp_event_host(self, e):
return Host(str(e.mac), ip4=str(e.ip))
@@ -185,10 +201,12 @@ class TestL2bdArpTerm(VppTestCase):
def ns_req(cls, src_host, host):
nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
d = inet_ntop(AF_INET6, nsma)
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
- IPv6(dst=d, src=src_host.ip6) /
- ICMPv6ND_NS(tgt=host.ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
+ return (
+ Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac)
+ / IPv6(dst=d, src=src_host.ip6)
+ / ICMPv6ND_NS(tgt=host.ip6)
+ / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)
+ )
@classmethod
def ns_reqs_dst(cls, entries, dst_host):
@@ -200,8 +218,7 @@ class TestL2bdArpTerm(VppTestCase):
def na_resp_host(self, src_host, rx):
self.assertEqual(rx[Ether].dst, src_host.mac)
- self.assertEqual(in6_ptop(rx[IPv6].dst),
- in6_ptop(src_host.ip6))
+ self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6))
self.assertTrue(rx.haslayer(ICMPv6ND_NA))
self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
@@ -236,8 +253,7 @@ class TestL2bdArpTerm(VppTestCase):
else:
raise ValueError("Unknown feature used: %s" % flag)
is_set = 1 if args[flag] else 0
- self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
- flags=feature_bitmap)
+ self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap)
self.logger.info("Bridge domain ID %d updated" % bd_id)
def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
@@ -269,12 +285,10 @@ class TestL2bdArpTerm(VppTestCase):
self.assertEqual(len(resps ^ resp_hosts), 0)
def test_l2bd_arp_term_01(self):
- """ L2BD arp term - add 5 hosts, verify arp responses
- """
+ """L2BD arp term - add 5 hosts, verify arp responses"""
src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs = self.mac_list(range(1, 5))
hosts = self.ip4_hosts(4, 1, macs)
self.add_del_arp_term_hosts(hosts, is_add=1)
@@ -283,8 +297,7 @@ class TestL2bdArpTerm(VppTestCase):
type(self).hosts = hosts
def test_l2bd_arp_term_02(self):
- """ L2BD arp term - delete 3 hosts, verify arp responses
- """
+ """L2BD arp term - delete 3 hosts, verify arp responses"""
src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
macs = self.mac_list(range(1, 3))
deleted = self.ip4_hosts(4, 1, macs)
@@ -295,12 +308,10 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_03(self):
- """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
- """
+ """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses"""
src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs = self.mac_list(range(1, 3))
readded = self.ip4_hosts(4, 1, macs)
self.add_del_arp_term_hosts(readded, is_add=1)
@@ -308,8 +319,7 @@ class TestL2bdArpTerm(VppTestCase):
type(self).hosts = readded
def test_l2bd_arp_term_04(self):
- """ L2BD arp term - 2 IP4 addrs per host
- """
+ """L2BD arp term - 2 IP4 addrs per host"""
src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
macs = self.mac_list(range(1, 3))
sub5_hosts = self.ip4_hosts(5, 1, macs)
@@ -320,12 +330,10 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_05(self):
- """ L2BD arp term - create and update 10 IP4-mac pairs
- """
+ """L2BD arp term - create and update 10 IP4-mac pairs"""
src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs1 = self.mac_list(range(10, 20))
hosts1 = self.ip4_hosts(5, 1, macs1)
self.add_del_arp_term_hosts(hosts1, is_add=1)
@@ -337,14 +345,12 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_06(self):
- """ L2BD arp/ND term - hosts with both ip4/ip6
- """
+ """L2BD arp/ND term - hosts with both ip4/ip6"""
src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
# enable flood to make sure requests are not flooded
- self.set_bd_flags(1, arp_term=True, flood=True,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False)
macs = self.mac_list(range(10, 20))
hosts6 = self.ip6_hosts(5, 1, macs)
hosts4 = self.ip4_hosts(5, 1, macs)
@@ -355,12 +361,10 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_07(self):
- """ L2BD ND term - Add and Del hosts, verify ND replies
- """
+ """L2BD ND term - Add and Del hosts, verify ND replies"""
src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs = self.mac_list(range(10, 20))
hosts6 = self.ip6_hosts(5, 1, macs)
self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
@@ -372,12 +376,10 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_08(self):
- """ L2BD ND term - Add and update IP+mac, verify ND replies
- """
+ """L2BD ND term - Add and update IP+mac, verify ND replies"""
src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs1 = self.mac_list(range(10, 20))
hosts = self.ip6_hosts(5, 1, macs1)
self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
@@ -389,12 +391,10 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_09(self):
- """ L2BD arp term - send garps, verify arp event reports
- """
+ """L2BD arp term - send garps, verify arp event reports"""
self.vapi.want_l2_arp_term_events(enable=1)
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs = self.mac_list(range(90, 95))
hosts = self.ip4_hosts(5, 1, macs)
@@ -403,14 +403,14 @@ class TestL2bdArpTerm(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
- for i in range(len(hosts))]
+ evs = [
+ self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
+ ]
ev_hosts = self.arp_event_hosts(evs)
self.assertEqual(len(ev_hosts ^ hosts), 0)
def test_l2bd_arp_term_10(self):
- """ L2BD arp term - send duplicate garps, verify suppression
- """
+ """L2BD arp term - send duplicate garps, verify suppression"""
macs = self.mac_list(range(70, 71))
hosts = self.ip4_hosts(6, 1, macs)
@@ -421,14 +421,14 @@ class TestL2bdArpTerm(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
- for i in range(len(hosts))]
+ evs = [
+ self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
+ ]
ev_hosts = self.arp_event_hosts(evs)
self.assertEqual(len(ev_hosts ^ hosts), 0)
def test_l2bd_arp_term_11(self):
- """ L2BD arp term - disable ip4 arp events,send garps, verify no events
- """
+ """L2BD arp term - disable ip4 arp events,send garps, verify no events"""
self.vapi.want_l2_arp_term_events(enable=0)
macs = self.mac_list(range(90, 95))
hosts = self.ip4_hosts(5, 1, macs)
@@ -443,13 +443,11 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
def test_l2bd_arp_term_12(self):
- """ L2BD ND term - send NS packets verify reports
- """
+ """L2BD ND term - send NS packets verify reports"""
self.vapi.want_l2_arp_term_events(enable=1)
dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
self.bd_add_del(1, is_add=1)
- self.set_bd_flags(1, arp_term=True, flood=False,
- uu_flood=False, learn=False)
+ self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
macs = self.mac_list(range(10, 15))
hosts = self.ip6_hosts(5, 1, macs)
reqs = self.ns_reqs_dst(hosts, dst_host)
@@ -457,14 +455,14 @@ class TestL2bdArpTerm(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
- for i in range(len(hosts))]
+ evs = [
+ self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
+ ]
ev_hosts = self.nd_event_hosts(evs)
self.assertEqual(len(ev_hosts ^ hosts), 0)
def test_l2bd_arp_term_13(self):
- """ L2BD ND term - send duplicate ns, verify suppression
- """
+ """L2BD ND term - send duplicate ns, verify suppression"""
dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
macs = self.mac_list(range(16, 17))
hosts = self.ip6_hosts(5, 1, macs)
@@ -473,14 +471,14 @@ class TestL2bdArpTerm(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
- for i in range(len(hosts))]
+ evs = [
+ self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
+ ]
ev_hosts = self.nd_event_hosts(evs)
self.assertEqual(len(ev_hosts ^ hosts), 0)
def test_l2bd_arp_term_14(self):
- """ L2BD ND term - disable ip4 arp events,send ns, verify no events
- """
+ """L2BD ND term - disable ip4 arp events,send ns, verify no events"""
self.vapi.want_l2_arp_term_events(enable=0)
dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
macs = self.mac_list(range(10, 15))
@@ -495,5 +493,5 @@ class TestL2bdArpTerm(VppTestCase):
self.bd_add_del(1, is_add=0)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)