summaryrefslogtreecommitdiffstats
path: root/test/test_adl.py
blob: 4a996fc5c90f2ed857fec2a4e1dafabacea43dc9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3

import unittest

from framework import VppTestCase, VppTestRunner, running_gcov_tests
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath


class TestAdl(VppTestCase):
    """ Allow/Deny Plugin Unit Test Cases """

    @classmethod
    def setUpClass(cls):
        super(TestAdl, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestAdl, cls).tearDownClass()

    def setUp(self):
        super(TestAdl, self).setUp()

    def tearDown(self):
        super(TestAdl, self).tearDown()

    def test_adl1_unittest(self):
        """ Plugin API Test """
        cmds = ["loop create\n",
                "set int ip address loop0 192.168.1.1/24\n",
                "set int ip6 table loop0 0\n",
                "set int ip address loop0 2001:db01::1/64\n",
                "set int state loop0 up\n",
                "packet-generator new {\n"
                " name ip4\n"
                " limit 100\n"
                " rate 0\n"
                " size 128-128\n"
                " interface loop0\n"
                " node adl-input\n"
                " data { IP4: 1.2.40 -> 3cfd.fed0.b6c8\n"
                "        UDP: 192.168.1.2-192.168.1.10 -> 192.168.2.1\n"
                "        UDP: 1234 -> 2345\n"
                "        incrementing 114\n"
                "       }\n"
                " }\n",
                "packet-generator new {\n"
                " name ip6-allow\n"
                " limit 50\n"
                " rate 0\n"
                " size 128-128\n"
                " interface loop0\n"
                " node adl-input\n"
                " data { IP6: 1.2.40 -> 3cfd.fed0.b6c8\n"
                "        UDP: 2001:db01::2 -> 2001:db01::1\n"
                "        UDP: 1234 -> 2345\n"
                "        incrementing 80\n"
                "      }\n"
                " }\n",
                "packet-generator new {\n"
                " name ip6-drop\n"
                " limit 50\n"
                " rate 0\n"
                " size 128-128\n"
                " interface loop0\n"
                " node adl-input\n"
                " data { IP6: 1.2.40 -> 3cfd.fed0.b6c8\n"
                "        UDP: 2001:db01::3 -> 2001:db01::1\n"
                "        UDP: 1234 -> 2345\n"
                "        incrementing 80\n"
                "      }\n"
                " }\n",
                "ip table 1\n",
                "ip route add 192.168.2.1/32 via drop\n",
                "ip route add table 1 192.168.1.2/32 via local\n",
                "ip6 table 1\n",
                "ip route add 2001:db01::1/128 via drop\n",
                "ip route add table 1 2001:db01::2/128 via local\n",
                "bin adl_interface_enable_disable loop0\n",
                "bin adl_allowlist_enable_disable loop0 fib-id 1 ip4 ip6\n",
                "pa en\n"]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, 'reply'):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))

        total_pkts = self.statistics.get_err_counter(
            "/err/adl-input/Allow/Deny packets processed")

        self.assertEqual(total_pkts, 200)

        ip4_allow = self.statistics.get_err_counter(
            "/err/ip4-adl-allowlist/ip4 allowlist allowed")
        self.assertEqual(ip4_allow, 12)
        ip6_allow = self.statistics.get_err_counter(
            "/err/ip6-adl-allowlist/ip6 allowlist allowed")
        self.assertEqual(ip6_allow, 50)

if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
pan> cls.pg2.resolve_arp() # BVI interface has remote hosts, one half of hosts are behind # pg0 second behind pg1 half = cls.remote_hosts_count // 2 cls.pg0.remote_hosts = cls.bvi0.remote_hosts[:half] cls.pg1.remote_hosts = cls.bvi0.remote_hosts[half:] @classmethod def tearDownClass(cls): super(TestIpIrb, cls).tearDownClass() def tearDown(self): """Run standard test teardown and log ``show l2patch``, ``show l2fib verbose``,``show bridge-domain <bd_id> detail``, ``show ip arp``. """ super(TestIpIrb, self).tearDown() def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show l2patch")) self.logger.info(self.vapi.cli("show l2fib verbose")) self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id)) self.logger.info(self.vapi.cli("show ip arp")) def create_stream(self, src_ip_if, dst_ip_if, packet_sizes): pkts = [] for i in range(0, 257): remote_dst_host = choice(dst_ip_if.remote_hosts) info = self.create_packet_info(src_ip_if, dst_ip_if) payload = self.info_to_payload(info) p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) / IP(src=src_ip_if.remote_ip4, dst=remote_dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() size = packet_sizes[(i // 2) % len(packet_sizes)] self.extend_packet(p, size) pkts.append(p) return pkts def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if, packet_sizes): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_ip_if, dst_ip_if) payload = self.info_to_payload(info) host = choice(src_l2_if.remote_hosts) p = (Ether(src=host.mac, dst=src_ip_if.local_mac) / IP(src=host.ip4, dst=dst_ip_if.remote_ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() size = packet_sizes[(i // 2) % len(packet_sizes)] self.extend_packet(p, size) pkts.append(p) return pkts def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture): last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index for packet in capture: ip = packet[IP] udp = packet[IP][UDP] payload_info = self.payload_to_info(packet[IP][UDP][Raw]) self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) saved_packet = next_info.data self.assertTrue(next_info is not None) # MAC: src, dst self.assertEqual(packet.src, dst_ip_if.local_mac) self.assertEqual(packet.dst, dst_ip_if.remote_mac) # IP: src, dst host = src_ip_if.host_by_ip4(ip.src) self.assertIsNotNone(host) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(ip.dst, dst_ip_if.remote_ip4) # UDP: self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) def verify_capture(self, dst_ip_if, src_ip_if, capture): last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index for packet in capture: ip = packet[IP] udp = packet[IP][UDP] payload_info = self.payload_to_info(packet[IP][UDP][Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data self.assertTrue(next_info is not None) # MAC: src, dst self.assertEqual(packet.src, dst_ip_if.local_mac) host = dst_ip_if.host_by_mac(packet.dst) # IP: src, dst self.assertEqual(ip.src, src_ip_if.remote_ip4) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(ip.dst, host.ip4) # UDP: self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) def test_ip4_irb_1(self): """ IPv4 IRB test 1 Test scenario: - ip traffic from pg2 interface must ends in both pg0 and pg1 - arp entry present in bvi0 interface for destination IP - no l2 entry configured, pg0 and pg1 are same """ stream = self.create_stream( self.pg2, self.bvi0, self.pg_if_packet_sizes) self.pg2.add_stream(stream) self.pg_enable_capture(self.pg_interfaces) self.pg_start() packet_count = self.get_packet_count_for_if_idx(self.bvi0.sw_if_index) rcvd1 = self.pg0.get_capture(packet_count) rcvd2 = self.pg1.get_capture(packet_count) self.verify_capture(self.bvi0, self.pg2, rcvd1) self.verify_capture(self.bvi0, self.pg2, rcvd2) self.assertListEqual(rcvd1.res, rcvd2.res) def send_and_verify_l2_to_ip(self): stream1 = self.create_stream_l2_to_ip( self.pg0, self.bvi0, self.pg2, self.pg_if_packet_sizes) stream2 = self.create_stream_l2_to_ip( self.pg1, self.bvi0, self.pg2, self.pg_if_packet_sizes) self.vapi.cli("clear trace") self.pg0.add_stream(stream1) self.pg1.add_stream(stream2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rcvd = self.pg2.get_capture(514) self.verify_capture_l2_to_ip(self.pg2, self.bvi0, rcvd) def test_ip4_irb_2(self): """ IPv4 IRB test 2 Test scenario: - ip traffic from pg0 and pg1 ends on pg2 """ self.send_and_verify_l2_to_ip() # change the BVI's mac and resed traffic self.bvi0.set_mac(MACAddress("00:00:00:11:11:33")) self.send_and_verify_l2_to_ip() # check it wasn't flooded self.pg1.assert_nothing_captured(remark="UU Flood") if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)