aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ping.py
blob: 4f3921e992e9de2af9ea9c854bbb03518ca0e50f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import socket

from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.packet import Raw

from framework import VppTestCase
from util import ppp

""" TestPing is a subclass of  VPPTestCase classes.

Basic test for sanity check of the ping.

"""


class TestPing(VppTestCase):
    """ Ping Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestPing, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(2))
            cls.interfaces = list(cls.pg_interfaces)

            for i in cls.interfaces:
                i.admin_up()
                i.config_ip4()
                i.config_ip6()
                i.disable_ipv6_ra()
                i.resolve_arp()
                i.resolve_ndp()
        except Exception:
            super(TestPing, cls).tearDownClass()
            raise

    def tearDown(self):
        super(TestPing, self).tearDown()
        if not self.vpp_dead:
            self.vapi.cli("show hardware")

    def test_ping_basic(self):
        """ basic ping test """
        try:
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.logger.info(self.vapi.cli("show ip arp"))
            self.logger.info(self.vapi.cli("show ip6 neighbors"))

            remote_ip4 = self.pg1.remote_ip4
            ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
            ret = self.vapi.cli(ping_cmd)
            self.logger.info(ret)
            out = self.pg1.get_capture(10)
            icmp_id = None
            icmp_seq = 1
            for p in out:
                ip = p[IP]
                self.assertEqual(ip.version, 4)
                self.assertEqual(ip.flags, 0)
                self.assertEqual(ip.src, self.pg1.local_ip4)
                self.assertEqual(ip.dst, self.pg1.remote_ip4)
                self.assertEqual(ip.proto, 1)
                self.assertEqual(len(ip.options), 0)
                self.assertGreaterEqual(ip.ttl, 254)
                icmp = p[ICMP]
                self.assertEqual(icmp.type, 8)
                self.assertEqual(icmp.code, 0)
                self.assertEqual(icmp.seq, icmp_seq)
                icmp_seq = icmp_seq + 1
                if icmp_id is None:
                    icmp_id = icmp.id
                else:
                    self.assertEqual(icmp.id, icmp_id)
        finally:
            self.vapi.cli("show error")

    def test_ping_burst(self):
        """ burst ping test """
        try:
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.logger.info(self.vapi.cli("show ip arp"))
            self.logger.info(self.vapi.cli("show ip6 neighbors"))

            remote_ip4 = self.pg1.remote_ip4
            ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
            ret = self.vapi.cli(ping_cmd)
            self.logger.info(ret)
            out = self.pg1.get_capture(3*5)
            icmp_id = None
            icmp_seq = 1
            count = 0
            for p in out:
                ip = p[IP]
                self.assertEqual(ip.version, 4)
                self.assertEqual(ip.flags, 0)
                self.assertEqual(ip.src, self.pg1.local_ip4)
                self.assertEqual(ip.dst, self.pg1.remote_ip4)
                self.assertEqual(ip.proto, 1)
                self.assertEqual(len(ip.options), 0)
                self.assertGreaterEqual(ip.ttl, 254)
                icmp = p[ICMP]
                self.assertEqual(icmp.type, 8)
                self.assertEqual(icmp.code, 0)
                self.assertEqual(icmp.seq, icmp_seq)
                count = count + 1
                if count >= 3:
                    icmp_seq = icmp_seq + 1
                    count = 0
                if icmp_id is None:
                    icmp_id = icmp.id
                else:
                    self.assertEqual(icmp.id, icmp_id)
        finally:
            self.vapi.cli("show error")
lass="o">.pg0.get_capture(1) pkt = out[0] self.check_encapsulation(pkt, self.single_tunnel_bd) # payload = self.decapsulate(pkt) # self.assert_eq_pkts(payload, self.frame_reply) def test_ucast_flood(self): """ Unicast flood test Send frames from pg3 Verify receipt of encapsulated frames on pg0 """ self.pg3.add_stream([self.frame_reply]) self.pg0.enable_capture() self.pg_start() # Get packet from each tunnel and assert it's corectly encapsulated. out = self.pg0.get_capture(self.n_ucast_tunnels) for pkt in out: self.check_encapsulation(pkt, self.ucast_flood_bd, True) # payload = self.decapsulate(pkt) # self.assert_eq_pkts(payload, self.frame_reply) def test_mcast_flood(self): """ Multicast flood test Send frames from pg2 Verify receipt of encapsulated frames on pg0 """ self.pg2.add_stream([self.frame_reply]) self.pg0.enable_capture() self.pg_start() # Pick first received frame and check if it's corectly encapsulated. out = self.pg0.get_capture(1) pkt = out[0] self.check_encapsulation(pkt, self.mcast_flood_bd, local_only=False, mcast_pkt=True) # payload = self.decapsulate(pkt) # self.assert_eq_pkts(payload, self.frame_reply) @classmethod def create_gtpu_flood_test_bd(cls, teid, n_ucast_tunnels): # Create 10 ucast gtpu tunnels under bd ip_range_start = 10 ip_range_end = ip_range_start + n_ucast_tunnels next_hop_address = cls.pg0.remote_ip4n for dest_ip4n in ip4n_range(next_hop_address, ip_range_start, ip_range_end): # add host route so dest_ip4n will not be resolved cls.vapi.ip_add_del_route(dst_address=dest_ip4n, dst_address_length=32, next_hop_address=next_hop_address) r = cls.vapi.gtpu_add_del_tunnel( src_addr=cls.pg0.local_ip4n, dst_addr=dest_ip4n, teid=teid) cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, bd_id=teid) @classmethod def add_del_shared_mcast_dst_load(cls, is_add): """ add or del tunnels sharing the same mcast dst to test gtpu ref_count mechanism """ n_shared_dst_tunnels = 20 teid_start = 1000 teid_end = teid_start + n_shared_dst_tunnels for teid in range(teid_start, teid_end): r = cls.vapi.gtpu_add_del_tunnel( src_addr=cls.pg0.local_ip4n, dst_addr=cls.mcast_ip4n, mcast_sw_if_index=1, teid=teid, is_add=is_add) if r.sw_if_index == 0xffffffff: raise ValueError("bad sw_if_index: ~0") @classmethod def add_shared_mcast_dst_load(cls): cls.add_del_shared_mcast_dst_load(is_add=1) @classmethod def del_shared_mcast_dst_load(cls): cls.add_del_shared_mcast_dst_load(is_add=0) @classmethod def add_del_mcast_tunnels_load(cls, is_add): """ add or del tunnels to test gtpu stability """ n_distinct_dst_tunnels = 20 ip_range_start = 10 ip_range_end = ip_range_start + n_distinct_dst_tunnels for dest_ip4n in ip4n_range(cls.mcast_ip4n, ip_range_start, ip_range_end): teid = bytearray(dest_ip4n)[3] cls.vapi.gtpu_add_del_tunnel( src_addr=cls.pg0.local_ip4n, dst_addr=dest_ip4n, mcast_sw_if_index=1, teid=teid, is_add=is_add) @classmethod def add_mcast_tunnels_load(cls): cls.add_del_mcast_tunnels_load(is_add=1) @classmethod def del_mcast_tunnels_load(cls): cls.add_del_mcast_tunnels_load(is_add=0) # Class method to start the GTPU test case. # Overrides setUpClass method in VppTestCase class. # Python try..except statement is used to ensure that the tear down of # the class will be executed even if exception is raised. # @param cls The class pointer. @classmethod def setUpClass(cls): super(TestGtpu, cls).setUpClass() try: cls.dport = 2152 cls.gtp_type = 0xff # Create 2 pg interfaces. cls.create_pg_interfaces(range(4)) for pg in cls.pg_interfaces: pg.admin_up() # Configure IPv4 addresses on VPP pg0. cls.pg0.config_ip4() # Resolve MAC address for VPP's IP address on pg0. cls.pg0.resolve_arp() # Our Multicast address cls.mcast_ip4 = '239.1.1.1' cls.mcast_ip4n = socket.inet_pton(socket.AF_INET, cls.mcast_ip4) iplong = atol(cls.mcast_ip4) cls.mcast_mac = "01:00:5e:%02x:%02x:%02x" % ( (iplong >> 16) & 0x7F, (iplong >> 8) & 0xFF, iplong & 0xFF) # Create GTPU VTEP on VPP pg0, and put gtpu_tunnel0 and pg1 # into BD. cls.single_tunnel_bd = 11 r = cls.vapi.gtpu_add_del_tunnel( src_addr=cls.pg0.local_ip4n, dst_addr=cls.pg0.remote_ip4n, teid=cls.single_tunnel_bd) cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, bd_id=cls.single_tunnel_bd) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd) # Setup teid 2 to test multicast flooding cls.n_ucast_tunnels = 10 cls.mcast_flood_bd = 12 cls.create_gtpu_flood_test_bd(cls.mcast_flood_bd, cls.n_ucast_tunnels) r = cls.vapi.gtpu_add_del_tunnel( src_addr=cls.pg0.local_ip4n, dst_addr=cls.mcast_ip4n, mcast_sw_if_index=1, teid=cls.mcast_flood_bd) cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, bd_id=cls.mcast_flood_bd) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd) # Add and delete mcast tunnels to check stability cls.add_shared_mcast_dst_load() cls.add_mcast_tunnels_load() cls.del_shared_mcast_dst_load() cls.del_mcast_tunnels_load() # Setup teid 3 to test unicast flooding cls.ucast_flood_bd = 13 cls.create_gtpu_flood_test_bd(cls.ucast_flood_bd, cls.n_ucast_tunnels) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd) except Exception: super(TestGtpu, cls).tearDownClass() raise # Method to define VPP actions before tear down of the test case. # Overrides tearDown method in VppTestCase class. # @param self The object pointer. def tearDown(self): super(TestGtpu, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show bridge-domain 11 detail")) self.logger.info(self.vapi.cli("show bridge-domain 12 detail")) self.logger.info(self.vapi.cli("show bridge-domain 13 detail")) self.logger.info(self.vapi.cli("show int")) self.logger.info(self.vapi.cli("show gtpu tunnel")) self.logger.info(self.vapi.cli("show trace")) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)