#!/usr/bin/env python3 import unittest from scapy.layers.l2 import Ether from scapy.packet import Raw from scapy.layers.inet import IP, IPOption from scapy.contrib.igmpv3 import IGMPv3, IGMPv3gr, IGMPv3mq, IGMPv3mr from framework import VppTestCase from asfframework import VppTestRunner, tag_fixme_vpp_workers from vpp_igmp import ( find_igmp_state, IGMP_FILTER, IgmpRecord, IGMP_MODE, IgmpSG, VppHostState, wait_for_igmp_event, ) from vpp_ip_route import find_mroute, VppIpTable class IgmpMode: HOST = 1 ROUTER = 0 @tag_fixme_vpp_workers class TestIgmp(VppTestCase): """IGMP Test Case""" @classmethod def setUpClass(cls): super(TestIgmp, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestIgmp, cls).tearDownClass() def setUp(self): super(TestIgmp, self).setUp() self.create_pg_interfaces(range(4)) self.sg_list = [] self.config_list = [] self.ip_addr = [] self.ip_table = VppIpTable(self, 1) self.ip_table.add_vpp_config() for pg in self.pg_interfaces[2:]: pg.set_table_ip4(1) for pg in self.pg_interfaces: pg.admin_up() pg.config_ip4() pg.resolve_arp() def tearDown(self): for pg in self.pg_interfaces: self.vapi.igmp_clear_interface(pg.sw_if_index) pg.unconfig_ip4() pg.set_table_ip4(0) pg.admin_down() super(TestIgmp, self).tearDown() def send(self, ti, pkts): ti.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() def test_igmp_flush(self): """IGMP Link Up/down and Flush""" # # FIX THIS. Link down. # def test_igmp_enable(self): """IGMP enable/disable on an interface check for the addition/removal of the IGMP mroutes""" self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 1, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 1, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32)) self.assertTrue(find_mroute(self, "224.0.0.22", "0.0.0.0", 32)) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 1, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg3.sw_if_index, 1, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32, table_id=1)) self.assertTrue(find_mroute(self, "224.0.0.22", "0.0.0.0", 32, table_id=1)) self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg3.sw_if_index, 0, IGMP_MODE.HOST) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32)) self.assertFalse(find_mroute(self, "224.0.0.22", "0.0.0.0", 32)) self.assertTrue(find_mroute(self, "224.0.0.1", "0.0.0.0", 32, table_id=1)) self.assertFalse(find_mroute(self, "224.0.0.22", "0.0.0.0", 32, table_id=1)) def verify_general_query(self, p): ip = p[IP] self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.dst, "224.0.0.1") self.assertEqual(ip.proto, 2) igmp = p[IGMPv3] self.assertEqual(igmp.type, 0x11) self.assertEqual(igmp.gaddr, "0.0.0.0") def verify_group_query(self, p, grp, srcs): ip = p[IP] self.assertEqual(ip.dst, grp) self.assertEqual(ip.proto, 2) self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.proto, 2) igmp = p[IGMPv3] self.assertEqual(igmp.type, 0x11) self.assertEqual(igmp.gaddr, grp) def verify_report(self, rx, records): ip = rx[IP] self.assertEqual(rx[IP].dst, "224.0.0.22") self.assertEqual(len(ip.options), 1) self.assertEqual(ip.options[0].option, 20) self.assertEqual(ip.proto, 2) self.assertEqual( IGMPv3.igmpv3types[rx[IGMPv3].type], "Version 3 Membership Report" ) self.assertEqual(rx[IGMPv3mr].numgrp, len(records)) received = rx[IGMPv3mr].records for ii in range(len(records)): gr = received[ii] r = records[ii] self.assertEqual(IGMPv3gr.igmpv3grtypes[gr.rtype], r.type) self.assertEqual(gr.numsrc, len(r.sg.saddrs)) self.assertEqual(gr.maddr, r.sg.gaddr) self.assertEqual(len(gr.srcaddrs), len(r.sg.saddrs)) self.assertEqual(sorted(gr.srcaddrs), sorted(r.sg.saddrs)) def add_group(self, itf, sg, n_pkts=2): self.pg_enable_capture(self.pg_interfaces) self.pg_start() hs = VppHostState(self, IGMP_FILTER.INCLUDE, itf.sw_if_index, sg) hs.add_vpp_config() capture = itf.get_capture(n_pkts, timeout=10) # reports are transmitted twice due to default rebostness value=2 self.verify_report(capture[0], [IgmpRecord(sg, "Allow New Sources")]), self.verify_report(capture[1], [IgmpRecord(sg, "Allow New Sources")]), return hs def remove_group(self, hs): self.pg_enable_capture(self.pg_interfaces) self.pg_start() hs.remove_vpp_config() capture = self.pg0.get_capture(1, timeout=10) self.verify_report(capture[0], [IgmpRecord(hs.sg, "Block Old Sources")]) def test_igmp_host(self): """IGMP Host functions""" # # Enable interface for host functions # self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 1, IGMP_MODE.HOST) # # Add one S,G of state and expect a state-change event report # indicating the addition of the S,G # h1 = self.add_group(self.pg0, IgmpSG("239.1.1.1", ["1.1.1.1"])) # search for the corresponding state created in VPP dump = self.vapi.igmp_dump(self.pg0.sw_if_index) self.assertEqual(len(dump), 1) self.assertTrue(find_igmp_state(dump, self.pg0, "239.1.1.1", "1.1.1.1")) # # Send a general query (to the all router's address) # expect VPP to respond with a membership report. # Pad the query with 0 - some devices in the big wild # internet are prone to this. # p_g = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst="224.0.0.1", tos=0xC0) / IGMPv3(type="Membership Query", mrcode=100) / IGMPv3mq(gaddr="0.0.0.0") / Raw(b"\x00" * 10) ) self.send(self.pg0, p_g) capture = self.pg0.get_capture(1, timeout=10) self.verify_report(capture[0], [IgmpRecord(h1.sg, "Mode Is Include")]) # # Group specific query # p_gs = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP( src=self.pg0.remote_ip4, dst="239.1.1.1", tos=0xC0, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Membership Query
#!/usr/bin/env python

import unittest

from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath


class TestSession(VppTestCase):
    """ Session Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestSession, cls).setUpClass()

    def setUp(self):
        super(TestSession, self).setUp()

        self.vapi.session_enable_disable(is_enabled=1)
        self.create_loopback_interfaces(range(2))

        table_id = 0

        for i in self.lo_interfaces:
            i.admin_up()

            if table_id != 0:
                tbl = VppIpTable(self, table_id)
                tbl.add_vpp_config()

            i.set_table_ip4(table_id)
            i.config_ip4()
            table_id += 1

        # Configure namespaces
        self.vapi.app_namespace_add(namespace_id="0",
                                    sw_if_index=self.loop0.sw_if_index)
        self.vapi.app_namespace_add(namespace_id="1",
                                    sw_if_index=self.loop1.sw_if_index)

    def tearDown(self):
        for i in self.lo_interfaces:
            i.unconfig_ip4()
            i.set_table_ip4(0)
            i.admin_down()

        super(TestSession, self).tearDown()
        self.vapi.session_enable_disable(is_enabled=1)

    def test_session(self):
        """ Session Unit Tests """
        error = self.vapi.cli("test session all")

        if error:
            self.logger.critical(error)
        self.assertEqual(error.find("failed"), -1)

    def test_segment_manager_alloc(self):
        """ Session Segment Manager Multiple Segment Allocation """

        # Add inter-table routes
        ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32,
                            [VppRoutePath("0.0.0.0",
                                          0xffffffff,
                                          nh_table_id=1)])
        ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32,
                            [VppRoutePath("0.0.0.0",
                                          0xffffffff,
                                          nh_table_id=0)], table_id=1)
        ip_t01.add_vpp_config()
        ip_t10.add_vpp_config()

        # Start builtin server and client with small private segments
        uri = "tcp://" + self.loop0.local_ip4 + "/1234"
        error = self.vapi.cli("test echo server appns 0 fifo-size 64 " +
                              "private-segment-size 1m uri " + uri)
        if error:
            self.logger.critical(error)
            self.assertEqual(error.find("failed"), -1)

        error = self.vapi.cli("test echo client nclients 100 appns 1 " +
                              "no-output fifo-size 64 syn-timeout 2 " +
                              "private-segment-size 1m uri " + uri)
        if error:
            self.logger.critical(error)
            self.assertEqual(error.find("failed"), -1)

        if self.vpp_dead:
            self.assert_equal(0)

        # Delete inter-table routes
        ip_t01.remove_vpp_config()
        ip_t10.remove_vpp_config()

if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
( src=self.pg0.remote_ip4, dst="224.0.0.22", tos=0xC0, ttl=1, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Version 3 Membership Report") / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype="Mode Is Exclude", maddr="239.1.1.3") ) self.send(self.pg0, p_j) self.assertTrue( wait_for_igmp_event(self, 1, self.pg0, "239.1.1.3", "0.0.0.0", 1) ) # # A 'allow sources' for {} should be ignored as it should # never be sent. # p_j = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP( src=self.pg0.remote_ip4, dst="224.0.0.22", tos=0xC0, ttl=1, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Version 3 Membership Report") / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype="Allow New Sources", maddr="239.1.1.4") ) self.send(self.pg0, p_j) dump = self.vapi.igmp_dump(self.pg0.sw_if_index) self.assertTrue(find_igmp_state(dump, self.pg0, "239.1.1.2", "0.0.0.0")) self.assertTrue(find_igmp_state(dump, self.pg0, "239.1.1.3", "0.0.0.0")) self.assertFalse(find_igmp_state(dump, self.pg0, "239.1.1.4", "0.0.0.0")) # # a TO_IN({}) and IS_IN({}) are treated like a (*,G) leave # self.vapi.cli("set logging class igmp level debug") p_l = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP( src=self.pg0.remote_ip4, dst="224.0.0.22", tos=0xC0, ttl=1, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Version 3 Membership Report") / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype="Change To Include Mode", maddr="239.1.1.2") ) self.send(self.pg0, p_l) self.assertTrue( wait_for_igmp_event(self, 2, self.pg0, "239.1.1.2", "0.0.0.0", 0) ) p_l = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP( src=self.pg0.remote_ip4, dst="224.0.0.22", tos=0xC0, ttl=1, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Version 3 Membership Report") / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype="Mode Is Include", maddr="239.1.1.3") ) self.send(self.pg0, p_l) self.assertTrue( wait_for_igmp_event(self, 2, self.pg0, "239.1.1.3", "0.0.0.0", 0) ) self.assertFalse(self.vapi.igmp_dump(self.pg0.sw_if_index)) # # disable router config # self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 0, IGMP_MODE.ROUTER) def _create_igmpv3_pck(self, itf, rtype, maddr, srcaddrs): p = ( Ether(dst=itf.local_mac, src=itf.remote_mac) / IP( src=itf.remote_ip4, dst="224.0.0.22", tos=0xC0, ttl=1, options=[ IPOption( copy_flag=1, optclass="control", option="router_alert", length=4 ) ], ) / IGMPv3(type="Version 3 Membership Report") / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype=rtype, maddr=maddr, srcaddrs=srcaddrs) ) return p def test_igmp_proxy_device(self): """IGMP proxy device""" self.pg2.admin_down() self.pg2.unconfig_ip4() self.pg2.set_table_ip4(0) self.pg2.config_ip4() self.pg2.admin_up() self.vapi.cli("test igmp timers query 10 src 3 leave 1") # enable IGMP self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 1, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 1, IGMP_MODE.ROUTER) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 1, IGMP_MODE.ROUTER) # create IGMP proxy device self.vapi.igmp_proxy_device_add_del(0, self.pg0.sw_if_index, 1) self.vapi.igmp_proxy_device_add_del_interface(0, self.pg1.sw_if_index, 1) self.vapi.igmp_proxy_device_add_del_interface(0, self.pg2.sw_if_index, 1) # send join on pg1. join should be proxied by pg0 p_j = self._create_igmpv3_pck( self.pg1, "Allow New Sources", "239.1.1.1", ["10.1.1.1", "10.1.1.2"] ) self.send(self.pg1, p_j) capture = self.pg0.get_capture(1, timeout=1) self.verify_report( capture[0], [ IgmpRecord( IgmpSG("239.1.1.1", ["10.1.1.1", "10.1.1.2"]), "Allow New Sources" ) ], ) self.assertTrue(find_mroute(self, "239.1.1.1", "0.0.0.0", 32)) # send join on pg2. join should be proxied by pg0. # the group should contain only 10.1.1.3 as # 10.1.1.1 was already reported p_j = self._create_igmpv3_pck( self.pg2, "Allow New Sources", "239.1.1.1", ["10.1.1.1", "10.1.1.3"] ) self.send(self.pg2, p_j) capture = self.pg0.get_capture(1, timeout=1) self.verify_report( capture[0], [IgmpRecord(IgmpSG("239.1.1.1", ["10.1.1.3"]), "Allow New Sources")], ) self.assertTrue(find_mroute(self, "239.1.1.1", "0.0.0.0", 32)) # send leave on pg2. leave for 10.1.1.3 should be proxyed # as pg2 was the only interface interested in 10.1.1.3 p_l = self._create_igmpv3_pck( self.pg2, "Block Old Sources", "239.1.1.1", ["10.1.1.3"] ) self.send(self.pg2, p_l) capture = self.pg0.get_capture(1, timeout=2) self.verify_report( capture[0], [IgmpRecord(IgmpSG("239.1.1.1", ["10.1.1.3"]), "Block Old Sources")], ) self.assertTrue(find_mroute(self, "239.1.1.1", "0.0.0.0", 32)) # disable igmp on pg1 (also removes interface from proxy device) # proxy leave for 10.1.1.2. pg2 is still interested in 10.1.1.1 self.pg_enable_capture(self.pg_interfaces) self.vapi.igmp_enable_disable(self.pg1.sw_if_index, 0, IGMP_MODE.ROUTER) capture = self.pg0.get_capture(1, timeout=1) self.verify_report( capture[0], [IgmpRecord(IgmpSG("239.1.1.1", ["10.1.1.2"]), "Block Old Sources")], ) self.assertTrue(find_mroute(self, "239.1.1.1", "0.0.0.0", 32)) # disable IGMP on pg0 and pg1. # disabling IGMP on pg0 (proxy device upstream interface) # removes this proxy device self.vapi.igmp_enable_disable(self.pg0.sw_if_index, 0, IGMP_MODE.HOST) self.vapi.igmp_enable_disable(self.pg2.sw_if_index, 0, IGMP_MODE.ROUTER) self.assertFalse(find_mroute(self, "239.1.1.1", "0.0.0.0", 32)) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)