#!/usr/bin/env python import unittest import socket from framework import VppTestCase, VppTestRunner, running_extended_tests from vpp_igmp import * from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP from scapy.contrib.igmpv3 import * from scapy.contrib.igmp import * class TestIgmp(VppTestCase): """ IGMP Test Case """ def setUp(self): super(TestIgmp, self).setUp() self.create_pg_interfaces(range(2)) self.sg_list = [] self.config_list = [] self.ip_addr = [] for pg in self.pg_interfaces: pg.admin_up() pg.config_ip4() pg.resolve_arp() def tearDown(self): for pg in self.pg_interfaces: self.vapi.igmp_clear_interface(pg.sw_if_index) pg.unconfig_ip4() pg.admin_down() super(TestIgmp, self).tearDown() def send(self, ti, pkts): ti.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() def test_igmp_parse_report(self): """ IGMP parse Membership Report """ # # VPP acts as a router # self.vapi.want_igmp_events(1) # hos sends join IGMP 'join' p_join = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst='224.0.0.22', tos=0xc0, ttl=1, options=IPOption(copy_flag=1, optclass=0, option="router_alert", length=2, value=0)) / IGMPv3() / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype=3, maddr="224.1.1.1", srcaddrs=["10.1.1.1"])) self.send(self.pg0, p_join) # search for the corresponding state created in VPP dump = self.vapi.igmp_dump() self.assertEqual(len(dump), 1) self.assertEqual(dump[0].sw_if_index, self.pg0.sw_if_index) self.assertEqual(dump[0].gaddr, socket.inet_pton(socket.AF_INET, "224.1.1.1")) self.assertEqual(dump[0].saddr, socket.inet_pton(socket.AF_INET, "10.1.1.1")) # VPP sends a notification that a new group has been joined ev = self.vapi.wait_for_event(2, "igmp_event") self.assertEqual(ev.saddr, socket.inet_pton(socket.AF_INET, "10.1.1.1")) self.assertEqual(ev.gaddr, socket.inet_pton(socket.AF_INET, "224.1.1.1")) self.assertEqual(ev.is_join, 1) # host sends IGMP leave p_leave = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst='224.0.0.22', tos=0xc0) / IGMPv3() / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype=4, maddr="224.1.1.1", srcaddrs=["10.1.1.1"])) self.send(self.pg0, p_leave) # VPP sends a notification that a new group has been left ev = self.vapi.wait_for_event(2, "igmp_event") self.assertEqual(ev.saddr, socket.inet_pton(socket.AF_INET, "10.1.1.1")) self.assertEqual(ev.gaddr, socket.inet_pton(socket.AF_INET, "224.1.1.1")) self.assertEqual(ev.is_join, 0) # state gone dump = self.vapi.igmp_dump() self.assertFalse(dump) # resend the join self.send(self.pg0, p_join) dump = self.vapi.igmp_dump() self.assertEqual(len(dump), 1) self.assertEqual(dump[0].sw_if_index, self.pg0.sw_if_index) self.assertEqual(dump[0].gaddr, socket.inet_pton(socket.AF_INET, "224.1.1.1")) self.assertEqual(dump[0].saddr, socket.inet_pton(socket.AF_INET, "10.1.1.1")) # IGMP block p_block = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst='224.0.0.22', tos=0xc0) / IGMPv3() / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype=6, maddr="224.1.1.1", srcaddrs=["10.1.1.1"])) self.send(self.pg0, p_block) dump = self.vapi.igmp_dump() self.assertFalse(dump) def verify_general_query(self, p): ip = p[IP] self.assertEqual(ip.dst, "224.0.0.1") self.assertEqual(ip.proto, 2) igmp = p[IGMPv3] self.assertEqual(igmp.type, 0x11) self.assertEqual(igmp.gaddr, "0.0.0.0") def test_igmp_send_query(self): """ IGMP send General Query """ # # VPP acts as a router. # Send a membership report so VPP builds state # p_mr = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst='224.0.0.22') / IGMPv3() / IGMPv3mr(numgrp=1) / IGMPv3gr(rtype=3, maddr="224.1.1.1", srcaddrs=["10.1.1.1"])) self.send(self.pg0, p_mr) self.logger.info(self.vapi.cli("sh igmp config")) # # wait for VPP to send out the General Query # capture = self.pg0.get_capture(1, timeout=61) self.verify_general_query(capture[0]) # # the state will expire in 10 more seconds # self.sleep(10)