#!/usr/bin/env python3 import unittest from scapy.layers.l2 import Ether from scapy.packet import Raw from scapy.layers.inet import IP, IPOption from scapy.contrib.igmpv3 import IGMPv3, IGMPv3gr, IGMPv3mq, IGMPv3mr from framework import VppTestCase from asfframework import VppTestRunner, tag_fixme_vpp_workers from vpp_igmp import ( find_igmp_state, IGMP_FILTER, IgmpRecord, IGMP_MODE, IgmpSG, VppHostState, wait_for_igmp_event, ) from vpp_ip_route import find_mroute, VppIpTable class IgmpMode: HOST = 1 ROUTER = 0 @tag_fixme_vpp_workers class TestIgmp(VppTestCase): """IGMP Test Case""" @classmethod def setUpClass(cls): super(TestIgmp, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestIgmp, cls).tearDownClass() def setUp(self): super(TestIgmp, self).setUp() self.create_pg_interfaces(range(4)) self.sg_list = [] self.config_list = [] self.ip_addr = [] self.ip_table = VppIpTable(self, 1) self.ip_table.add_vpp_config() for pg in self.pg_interfaces[2:]: pg.set_table_ip4(1) for pg in self.pg_interfaces: pg.admin_up() pg.config_ip4() pg.resolve_arp() def tearDown(self): for pg in self.pg_interfaces: self.vapi.igmp_clear_interface(pg.sw_if_index) pg.unconfig_ip4() pg.set_table_ip4(0) pg.admin_down() super(TestIgmp, self).tearDown() def send(self, ti, pkts): ti.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() def test_igmp_flush(self): """IGMP Link Up/down and Flush""" # # FIX THIS. Link down. # def test_igmp_enable(self): """IGMP enable/disable on an interface check for the addition/removal of the IGMP mroutes""" self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 1, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 1, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32)) self.assertTrue(find_mroute(self, "224.0.0.22", "0.0.0.0", 32)) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 1, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg3.sw_if_index, 1, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32, table_id=1)) self.assertTrue(find_mroute(self, "224.0.0.22", "0.0.0.0", 32, table_id=1)) self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg3.sw_if_index, 0, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32)) self.assertFalse(find_mroute(self, "224.0.0.22", "0.0.0.0", 32)) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32, table_id=1)) self.assertFalse(find_mroute(self, "224.0.0.22", "0.0.0.0", 32, table_id=1)) def verify_general_query(self, p): ip = p[IP] self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.dst, "224.0.0.1") self.assertEqual(ip.proto, 2) igmp = p[IGMPv3] self.assertEqual(igmp.type, 0x11) self.assertEqual(igmp.gaddr, "0.0.0.0") def verify_group_query(self, p, grp, srcs): ip = p[IP] self.assertEqual(ip.dst, grp) self.assertEqual(ip.proto, 2) self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.proto, 2) igmp = p[IGMPv3] self.assertEqual(igmp.type, 0x11) self.assertEqual(igmp.gaddr, grp) def verify_report(self, rx, records): ip = rx[IP] self.assertEqual(rx[IP].dst, "224.0.0.22") self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.proto, 2) self.assertEqual( IGMPv3.igmpv3types[rx[IGMPv3].type], "Version 3 Membership Report" ) self.assertEqual(rx[IGMPv3mr].numgrp, len(records)) received = rx[IGMPv3mr].records for ii in range(len(records)): gr = received[ii] r = records[ii] self.assertEqual(IGMPv3gr.igmpv3grtypes[gr.rtype], r.type) self.assertEqual(gr.numsrc, len(r.sg.saddrs)) self.assertEqual(gr.maddr, r.sg.gaddr) self.assertEqual(len(gr.srcaddrs), len(r.sg.saddrs)) self.assertEqual(sorted(gr.srcaddrs), sorted(r.sg.saddrs)) def add_group(self, itf, sg, n_pkts=2): self.pg_enable_capture(self.pg_interfaces) self.pg_start() hs = VppHostState(self, IGMP_FILTER.INCLUDE, itf.sw_if_index, sg) hs.add_vpp_config() capture = itf.get_capture(n_pkts, timeout=10) # reports are transmitted twice due to default rebostness value=2 self.verify_report(capture[0], [IgmpRecord(sg, "Allow New Sources")]), self.verify_report(capture[1], [IgmpRecord(sg, "Allow New Sources")]), return hs def remove_group(self, hs): self.pg_enable_capture(self.pg_interfaces) self.pg_start() hs.remove_vpp_config() capture = self.pg0.get_capture(1, timeout=10) self.verify_report(capture[0], [IgmpRecord(hs.sg, "Block Old Sources")]) def test_igmp_host(self): """IGMP Host functions""" # # Enable interface for host functions # self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 1, IGMP_MODE.HOST) # # Add one S,G of state and expect a state-change event report # indicating the addition of the S,G # h1 = self.add_group(self.pg0, IgmpSG("239.1.1.1", ["1.1.1.1"])) # search for the corresponding state created in VPP dump = self.vapi.igmp_dump(self.pg0.sw_if_index) self.assertEqual(len(dump), 1) self.assertTrue(find_igmp_state(dump, self.pg0, "239.1.1.1", "1.1.1.1")) # # Send a general query (to the all router's address) # expect VPP to respond with a membership report. # Pad the query with 0 - some devices in the big wild # internet are prone to this. # p_g = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst="224.0.0.1", tos=0xC0) / IGMPv3(type="Membership Query", mrcode=100) / IGMPv3mq(gaddr="0.0.0.0") / Raw(b"\x00" * 10) ) self.send(self.pg0, p_g) capture = self.pg0.get_capture(1, timeout=10) self.verify_report(capture[0], [IgmpRecord(h1.sg, "Mode Is Include")]) # # Group specific query # p_gs = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP( src=self.pg0.remote_ip4, dst="239.1.1.1", tos=0xC0, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Membership Query
#!/usr/bin/env python
import unittest
from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
class TestSession(VppTestCase):
""" Session Test Case """
@classmethod
def setUpClass(cls):
super(TestSession, cls).setUpClass()
def setUp(self):
super(TestSession, self).setUp()
self.vapi.session_enable_disable(is_enabled=1)
self.create_loopback_interfaces(range(2))
table_id = 0
for i in self.lo_interfaces:
i.admin_up()
if table_id != 0:
tbl = VppIpTable(self, table_id)
tbl.add_vpp_config()
i.set_table_ip4(table_id)
i.config_ip4()
table_id += 1
# Configure namespaces
self.vapi.app_namespace_add(namespace_id="0",
sw_if_index=self.loop0.sw_if_index)
self.vapi.app_namespace_add(namespace_id="1",
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
for i in self.lo_interfaces:
i.unconfig_ip4()
i.set_table_ip4(0)
i.admin_down()
super(TestSession, self).tearDown()
self.vapi.session_enable_disable(is_enabled=1)
def test_session(self):
""" Session Unit Tests """
error = self.vapi.cli("test session all")
if error:
self.logger.critical(error)
self.assertEqual(error.find("failed"), -1)
def test_segment_manager_alloc(self):
""" Session Segment Manager Multiple Segment Allocation """
# Add inter-table routes
ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32,
[VppRoutePath("0.0.0.0",
0xffffffff,
nh_table_id=1)])
ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32,
[VppRoutePath("0.0.0.0",
0xffffffff,
nh_table_id=0)], table_id=1)
ip_t01.add_vpp_config()
ip_t10.add_vpp_config()
# Start builtin server and client with small private segments
uri = "tcp://" + self.loop0.local_ip4 + "/1234"
error = self.vapi.cli("test echo server appns 0 fifo-size 64 " +
"private-segment-size 1m uri " + uri)
if error:
self.logger.critical(error)
self.assertEqual(error.find("failed"), -1)
error = self.vapi.cli("test echo client nclients 100 appns 1 " +
"no-output fifo-size 64 syn-timeout 2 " +
"private-segment-size 1m uri " + uri)
if error:
self.logger.critical(error)
self.assertEqual(error.find("failed"), -1)
if self.vpp_dead:
self.assert_equal(0)
# Delete inter-table routes
ip_t01.remove_vpp_config()
ip_t10.remove_vpp_config()
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)