summaryrefslogtreecommitdiffstats
path: root/test/test_ping.py
blob: 87cb45c3211c59ca67fbff99a3c6e9bf254d57d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import socket

from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.packet import Raw

from framework import VppTestCase
from util import ppp

""" TestPing is a subclass of  VPPTestCase classes.

Basic test for sanity check of the ping.

"""


class TestPing(VppTestCase):
    """ Ping Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestPing, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(2))
            cls.interfaces = list(cls.pg_interfaces)

            for i in cls.interfaces:
                i.admin_up()
                i.config_ip4()
                i.config_ip6()
                i.disable_ipv6_ra()
                i.resolve_arp()
                i.resolve_ndp()
        except Exception:
            super(TestPing, cls).tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestPing, cls).tearDownClass()

    def tearDown(self):
        super(TestPing, self).tearDown()

    def show_commands_at_teardown(self):
        self.logger.info(self.vapi.cli("show hardware"))

    def test_ping_basic(self):
        """ basic ping test """
        try:
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.logger.info(self.vapi.cli("show ip4 neighbors"))
            self.logger.info(self.vapi.cli("show ip6 neighbors"))

            remote_ip4 = self.pg1.remote_ip4
            ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
            ret = self.vapi.cli(ping_cmd)
            self.logger.info(ret)
            out = self.pg1.get_capture(10)
            icmp_id = None
            icmp_seq = 1
            for p in out:
                ip = p[IP]
                self.assertEqual(ip.version, 4)
                self.assertEqual(ip.flags, 0)
                self.assertEqual(ip.src, self.pg1.local_ip4)
                self.assertEqual(ip.dst, self.pg1.remote_ip4)
                self.assertEqual(ip.proto, 1)
                self.assertEqual(len(ip.options), 0)
                self.assertGreaterEqual(ip.ttl, 254)
                icmp = p[ICMP]
                self.assertEqual(icmp.type, 8)
                self.assertEqual(icmp.code, 0)
                self.assertEqual(icmp.seq, icmp_seq)
                icmp_seq = icmp_seq + 1
                if icmp_id is None:
                    icmp_id = icmp.id
                else:
                    self.assertEqual(icmp.id, icmp_id)
        finally:
            self.vapi.cli("show error")

    def test_ping_burst(self):
        """ burst ping test """
        try:
            self.pg_enable_capture(self.pg_interfaces)
            self.pg_start()
            self.logger.info(self.vapi.cli("show ip neighbors"))

            remote_ip4 = self.pg1.remote_ip4
            ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
            ret = self.vapi.cli(ping_cmd)
            self.logger.info(ret)
            out = self.pg1.get_capture(3*5)
            icmp_id = None
            icmp_seq = 1
            count = 0
            for p in out:
                ip = p[IP]
                self.assertEqual(ip.version, 4)
                self.assertEqual(ip.flags, 0)
                self.assertEqual(ip.src, self.pg1.local_ip4)
                self.assertEqual(ip.dst, self.pg1.remote_ip4)
                self.assertEqual(ip.proto, 1)
                self.assertEqual(len(ip.options), 0)
                self.assertGreaterEqual(ip.ttl, 254)
                icmp = p[ICMP]
                self.assertEqual(icmp.type, 8)
                self.assertEqual(icmp.code, 0)
                self.assertEqual(icmp.seq, icmp_seq)
                count = count + 1
                if count >= 3:
                    icmp_seq = icmp_seq + 1
                    count = 0
                if icmp_id is None:
                    icmp_id = icmp.id
                else:
                    self.assertEqual(icmp.id, icmp_id)
        finally:
            self.vapi.cli("show error")
cls.pg_interfaces) macs_per_if = count / n_int i = -1 for pg_if in cls.pg_interfaces: i += 1 start_nr = macs_per_if * i end_nr = count if i == (n_int - 1) else macs_per_if * (i + 1) cls.hosts_by_pg_idx[pg_if.sw_if_index] = [] hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] packets = [] for j in range(start_nr, end_nr): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j)) packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac)) hosts.append(host) if hasattr(pg_if, 'sub_if'): packet = pg_if.sub_if.add_dot1_layer(packet) packets.append(packet) pg_if.add_stream(packets) cls.logger.info("Sending broadcast eth frames for MAC learning") cls.pg_start() def create_stream(self, src_if, packet_sizes, packets_per_burst): """ Create input packet stream for defined interface. :param object src_if: Interface to create packet stream for. :param list packet_sizes: List of required packet sizes. :param int packets_per_burst: Number of packets in burst. :return: Stream of packets. """ pkts = [] for i in range(0, packets_per_burst): dst_if = self.flows[src_if][i % 2] dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index]) src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index]) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) p = (Ether(dst=dst_host.mac, src=src_host.mac) / IP(src=src_host.ip4, dst=dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) pkt_info.data = p.copy() if hasattr(src_if, 'sub_if'): p = src_if.sub_if.add_dot1_layer(p) size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) return pkts def verify_capture(self, pg_if, capture): """ Verify captured input packet stream for defined interface. :param object pg_if: Interface to verify captured packet stream for. :param list capture: Captured packet stream. """ last_info = dict() for i in self.pg_interfaces: last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: payload_info = self.payload_to_info(str(packet[Raw])) src_sw_if_index = payload_info.src src_if = None for ifc in self.pg_interfaces: if ifc != pg_if: if ifc.sw_if_index == src_sw_if_index: src_if = ifc break if hasattr(src_if, 'sub_if'): # Check VLAN tags and Ethernet header packet = src_if.sub_if.remove_dot1_layer(packet) self.assertTrue(Dot1Q not in packet) try: ip = packet[IP] udp = packet[UDP] packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug("Got packet on port %s: src=%u (id=%u)" % (pg_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data # Check standard fields self.assertEqual(ip.src, saved_packet[IP].src) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i, dst_sw_if_index, last_info[i.sw_if_index]) self.assertTrue( remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (dst_sw_if_index, i.sw_if_index)) def run_l2bd_test(self, pkts_per_burst): """ L2BD MAC learning test """ # Create incoming packet streams for packet-generator interfaces for i in self.pg_interfaces: packet_sizes = self.sub_if_packet_sizes if hasattr(i, 'sub_if') \ else self.pg_if_packet_sizes pkts = self.create_stream(i, packet_sizes, pkts_per_burst) i.add_stream(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify outgoing packet streams per packet-generator interface for i in self.pg_interfaces: capture = i.get_capture() self.logger.info("Verifying capture on interface %s" % i.name) self.verify_capture(i, capture) def test_l2bd_sl(self): """ L2BD MAC learning single-loop test Test scenario: 1.config MAC learning enabled learn 100 MAC enries 3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of dot1ad in the first version) 2.sending l2 eth pkts between 3 interface 64B, 512B, 1518B, 9200B (ether_size) burst of 2 pkts per interface """ self.run_l2bd_test(self.sl_pkts_per_burst) def test_l2bd_dl(self): """ L2BD MAC learning dual-loop test Test scenario: 1.config MAC learning enabled learn 100 MAC enries 3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of dot1ad in the first version) 2.sending l2 eth pkts between 3 interface 64B, 512B, 1518B, 9200B (ether_size) burst of 257 pkts per interface """ self.run_l2bd_test(self.dl_pkts_per_burst) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)