summaryrefslogtreecommitdiffstats
path: root/test/test_policer.py
blob: 6b15a0234a3ee560fdfc95e24ecbae7c01da6dc0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
# Copyright (c) 2021 Graphiant, Inc.

import unittest

from framework import VppTestCase, VppTestRunner
from vpp_policer import VppPolicer, PolicerAction

# Default for the tests is 10s of "Green" packets at 8Mbps, ie. 10M bytes.
# The policer helper CLI "sends" 500 byte packets, so default is 20000.

TEST_RATE = 8000    # kbps
TEST_BURST = 10000  # ms

CIR_OK = 8500       # CIR in kbps, above test rate
CIR_LOW = 7000      # CIR in kbps, below test rate
EIR_OK = 9000       # EIR in kbps, above test rate
EIR_LOW = 7500      # EIR in kbps, below test rate

NUM_PKTS = 20000

CBURST = 100000     # Committed burst in bytes
EBURST = 200000     # Excess burst in bytes


class TestPolicer(VppTestCase):
    """ Policer Test Case """

    def run_policer_test(self, type, cir, cb, eir, eb, rate=8000, burst=10000,
                         colour=0):
        """
        Configure a Policer and push traffic through it.
        """
        types = {
            '1R2C': 0,
            '1R3C': 1,
            '2R3C': 3,
        }

        pol_type = types.get(type)
        policer = VppPolicer(self, "pol1", cir, eir, cb, eb, rate_type=0,
                             type=pol_type, color_aware=colour)
        policer.add_vpp_config()

        error = self.vapi.cli(
            f"test policing index {policer.policer_index} rate {rate} "
            f"burst {burst} colour {colour}")

        stats = policer.get_stats()
        policer.remove_vpp_config()

        return stats

    def test_policer_1r2c(self):
        """ Single rate, 2 colour policer """
        stats = self.run_policer_test("1R2C", CIR_OK, CBURST, 0, 0)
        self.assertEqual(stats['conform_packets'], NUM_PKTS)

        stats = self.run_policer_test("1R2C", CIR_LOW, CBURST, 0, 0)
        self.assertLess(stats['conform_packets'], NUM_PKTS)
        self.assertEqual(stats['exceed_packets'], 0)
        self.assertGreater(stats['violate_packets'], 0)

        stats = self.run_policer_test("1R2C", CIR_LOW, CBURST, 0, 0, colour=2)
        self.assertEqual(stats['violate_packets'], NUM_PKTS)

    def test_policer_1r3c(self):
        """ Single rate, 3 colour policer """
        stats = self.run_policer_test("1R3C", CIR_OK, CBURST, 0, 0)
        self.assertEqual(stats['conform_packets'], NUM_PKTS)

        stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST)
        self.assertLess(stats['conform_packets'], NUM_PKTS)
        self.assertGreater(stats['exceed_packets'], 0)
        self.assertGreater(stats['violate_packets'], 0)

        stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST,
                                      colour=1)
        self.assertEqual(stats['conform_packets'], 0)
        self.assertGreater(stats['exceed_packets'], 0)
        self.assertGreater(stats['violate_packets'], 0)

        stats = self.run_policer_test("1R3C", CIR_LOW, CBURST, 0, EBURST,
                                      colour=2)
        self.assertEqual(stats['violate_packets'], NUM_PKTS)

    def test_policer_2r3c(self):
        """ Dual rate, 3 colour policer """
        stats = self.run_policer_test("2R3C", CIR_OK, CBURST, EIR_OK, EBURST)
        self.assertEqual(stats['conform_packets'], NUM_PKTS)

        stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST)
        self.assertLess(stats['conform_packets'], NUM_PKTS)
        self.assertGreater(stats['exceed_packets'], 0)
        self.assertEqual(stats['violate_packets'], 0)

        stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_LOW, EBURST)
        self.assertLess(stats['conform_packets'], NUM_PKTS)
        self.assertGreater(stats['exceed_packets'], 0)
        self.assertGreater(stats['violate_packets'], 0)

        stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST,
                                      colour=1)
        self.assertEqual(stats['exceed_packets'], NUM_PKTS)

        stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_LOW, EBURST,
                                      colour=1)
        self.assertEqual(stats['conform_packets'], 0)
        self.assertGreater(stats['exceed_packets'], 0)
        self.assertGreater(stats['violate_packets'], 0)

        stats = self.run_policer_test("2R3C", CIR_LOW, CBURST, EIR_OK, EBURST,
                                      colour=2)
        self.assertEqual(stats['violate_packets'], NUM_PKTS)

if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
n> 1): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j)) packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac)) hosts.append(host) if hasattr(pg_if, 'sub_if'): packet = pg_if.sub_if.add_dot1_layer(packet) packets.append(packet) pg_if.add_stream(packets) cls.logger.info("Sending broadcast eth frames for MAC learning") cls.pg_enable_capture(cls.pg_interfaces) cls.pg_start() def create_packet(self, src_if, dst_if, do_dot1=True): packet_sizes = [64, 512, 1518, 9018] dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index]) src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index]) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) p = (Ether(dst=dst_host.mac, src=src_host.mac) / IP(src=src_host.ip4, dst=dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) pkt_info.data = p.copy() if do_dot1 and hasattr(src_if, 'sub_if'): p = src_if.sub_if.add_dot1_layer(p) size = random.choice(packet_sizes) self.extend_packet(p, size) return p def _add_tag(self, packet, vlan, tag_type): payload = packet.payload inner_type = packet.type packet.remove_payload() packet.add_payload(Dot1Q(vlan=vlan) / payload) packet.payload.type = inner_type packet.payload.vlan = vlan packet.type = tag_type return packet def _remove_tag(self, packet, vlan=None, tag_type=None): if tag_type: self.assertEqual(packet.type, tag_type) payload = packet.payload if vlan: self.assertEqual(payload.vlan, vlan) inner_type = payload.type payload = payload.payload packet.remove_payload() packet.add_payload(payload) packet.type = inner_type def add_tags(self, packet, tags): for t in reversed(tags): self._add_tag(packet, t.vlan, t.dot1) def remove_tags(self, packet, tags): for t in tags: self._remove_tag(packet, t.vlan, t.dot1) def vtr_test(self, swif, tags): p = self.create_packet(swif, self.pg0) swif.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture(1) if tags: self.remove_tags(rx[0], tags) self.assertTrue(Dot1Q not in rx[0]) if not tags: return i = VppDot1QSubint(self, self.pg0, tags[0].vlan) self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=1) i.admin_up() p = self.create_packet(self.pg0, swif, do_dot1=False) self.add_tags(p, tags) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = swif.get_capture(1) swif.sub_if.remove_dot1_layer(rx[0]) self.assertTrue(Dot1Q not in rx[0]) self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=i.sw_if_index, bd_id=self.bd_id, enable=0) i.remove_vpp_config() def test_1ad_vtr_pop_1(self): """ 1AD VTR pop 1 test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_pop_2(self): """ 1AD VTR pop 2 test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_POP_2) self.vtr_test(self.pg1, []) def test_1ad_vtr_push_1ad(self): """ 1AD VTR push 1 1AD test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1AD, vlan=200), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_push_2ad(self): """ 1AD VTR push 2 1AD test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1AD, vlan=200), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_push_1q(self): """ 1AD VTR push 1 1Q test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1AD, vlan=200), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_push_2q(self): """ 1AD VTR push 2 1Q test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=400), Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1AD, vlan=200), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_translate_1_1ad(self): """ 1AD VTR translate 1 -> 1 1AD test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_translate_1_2ad(self): """ 1AD VTR translate 1 -> 2 1AD test """ self.pg1.sub_if.set_vtr( L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_translate_2_1ad(self): """ 1AD VTR translate 2 -> 1 1AD test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=300)]) def test_1ad_vtr_translate_2_2ad(self): """ 1AD VTR translate 2 -> 2 1AD test """ self.pg1.sub_if.set_vtr( L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400) self.vtr_test(self.pg1, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)]) def test_1ad_vtr_translate_1_1q(self): """ 1AD VTR translate 1 -> 1 1Q test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_translate_1_2q(self): """ 1AD VTR translate 1 -> 2 1Q test """ self.pg1.sub_if.set_vtr( L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=400), Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=100)]) def test_1ad_vtr_translate_2_1q(self): """ 1AD VTR translate 2 -> 1 1Q test """ self.pg1.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_2_1, tag=300, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=300)]) def test_1ad_vtr_translate_2_2q(self): """ 1AD VTR translate 2 -> 2 1Q test """ self.pg1.sub_if.set_vtr( L2_VTR_OP.L2_TRANSLATE_2_2, inner=300, outer=400, push1q=1) self.vtr_test(self.pg1, [Tag(dot1=DOT1Q, vlan=400), Tag(dot1=DOT1Q, vlan=300)]) def test_1q_vtr_pop_1(self): """ 1Q VTR pop 1 test """ self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_POP_1) self.vtr_test(self.pg2, []) def test_1q_vtr_push_1(self): """ 1Q VTR push 1 test """ self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_1, tag=300) self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300), Tag(dot1=DOT1Q, vlan=200)]) def test_1q_vtr_push_2(self): """ 1Q VTR push 2 test """ self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_PUSH_2, outer=400, inner=300) self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300), Tag(dot1=DOT1Q, vlan=200)]) def test_1q_vtr_translate_1_1(self): """ 1Q VTR translate 1 -> 1 test """ self.pg2.sub_if.set_vtr(L2_VTR_OP.L2_TRANSLATE_1_1, tag=300) self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=300)]) def test_1q_vtr_translate_1_2(self): """ 1Q VTR translate 1 -> 2 test """ self.pg2.sub_if.set_vtr( L2_VTR_OP.L2_TRANSLATE_1_2, inner=300, outer=400) self.vtr_test(self.pg2, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=300)]) def test_if_vtr_disable(self): """ Disable VTR on non-sub-interfaces """ # First set the VTR fields to junk self.vapi.l2_interface_vlan_tag_rewrite( sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_PUSH_2, push_dot1q=1, tag1=19, tag2=630) if_state = self.vapi.sw_interface_dump( sw_if_index=self.pg0.sw_if_index) self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index) self.assertNotEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED) # Then ensure that a request to disable VTR is honored. self.vapi.l2_interface_vlan_tag_rewrite( sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_DISABLED) if_state = self.vapi.sw_interface_dump( sw_if_index=self.pg0.sw_if_index) self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index) self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_DISABLED) def test_if_vtr_push_1q(self): """ 1Q VTR push 1 on non-sub-interfaces """ self.vapi.l2_interface_vlan_tag_rewrite( sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_PUSH_1, push_dot1q=1, tag1=150) if_state = self.vapi.sw_interface_dump( sw_if_index=self.pg0.sw_if_index) self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index) self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_1) self.assertEqual(if_state[0].vtr_tag1, 150) self.assertNotEqual(if_state[0].vtr_push_dot1q, 0) def test_if_vtr_push_2ad(self): """ 1AD VTR push 2 on non-sub-interfaces """ self.vapi.l2_interface_vlan_tag_rewrite( sw_if_index=self.pg0.sw_if_index, vtr_op=L2_VTR_OP.L2_PUSH_2, push_dot1q=0, tag1=450, tag2=350) if_state = self.vapi.sw_interface_dump( sw_if_index=self.pg0.sw_if_index) self.assertEqual(if_state[0].sw_if_index, self.pg0.sw_if_index) self.assertEqual(if_state[0].vtr_op, L2_VTR_OP.L2_PUSH_2) self.assertEqual(if_state[0].vtr_tag1, 450) # outer self.assertEqual(if_state[0].vtr_tag2, 350) # inner self.assertEqual(if_state[0].vtr_push_dot1q, 0) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)