diff options
Diffstat (limited to 'test/test_qos.py')
-rw-r--r-- | test/test_qos.py | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/test/test_qos.py b/test/test_qos.py new file mode 100644 index 00000000000..a940bd3a64c --- /dev/null +++ b/test/test_qos.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python + +import unittest +import socket +import struct + +from framework import VppTestCase, VppTestRunner +from vpp_object import VppObject +from vpp_papi_provider import QOS_SOURCE +from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute +from vpp_sub_interface import VppSubInterface, VppDot1QSubint + +from scapy.packet import Raw +from scapy.layers.l2 import Ether, Dot1Q +from scapy.layers.inet import IP, UDP +from scapy.layers.inet6 import IPv6 +from scapy.contrib.mpls import MPLS + + +class TestQOS(VppTestCase): + """ QOS Test Case """ + + def setUp(self): + super(TestQOS, self).setUp() + + self.create_pg_interfaces(range(5)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + + super(TestQOS, self).tearDown() + + def test_qos_ip(self): + """ QoS Mark IP """ + + # + # for table 1 map the n=0xff possible values of input QoS mark, + # n to 1-n + # + output = [chr(0)] * 256 + for i in range(0, 255): + output[i] = chr(255 - i) + os = ''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + + self.vapi.qos_egress_map_update(1, rows) + + # + # For table 2 (and up) use the value n for everything + # + output = [chr(2)] * 256 + os = ''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + + self.vapi.qos_egress_map_update(2, rows) + + output = [chr(3)] * 256 + os = ''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + + self.vapi.qos_egress_map_update(3, rows) + + output = [chr(4)] * 256 + os = ''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + self.vapi.qos_egress_map_update(4, rows) + self.vapi.qos_egress_map_update(5, rows) + self.vapi.qos_egress_map_update(6, rows) + self.vapi.qos_egress_map_update(7, rows) + + self.logger.info(self.vapi.cli("sh qos eg map")) + + # + # Bind interface pgN to table n + # + self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index, + QOS_SOURCE.IP, + 1, + 1) + self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index, + QOS_SOURCE.IP, + 2, + 1) + self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index, + QOS_SOURCE.IP, + 3, + 1) + self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index, + QOS_SOURCE.IP, + 4, + 1) + + # + # packets ingress on Pg0 + # + p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) / + UDP(sport=1234, dport=1234) / + Raw(chr(100) * 65)) + p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, + tc=1) / + UDP(sport=1234, dport=1234) / + Raw(chr(100) * 65)) + + # + # Since we have not yet enabled the recording of the input QoS + # from the input iP header, the egress packet's ToS will be unchanged + # + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 1) + rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IPv6].tc, 1) + + # + # Enable QoS recrding on IP input for pg0 + # + self.vapi.qos_record_enable_disable(self.pg0.sw_if_index, + QOS_SOURCE.IP, + 1) + + # + # send the same packets, this time expect the input TOS of 1 + # to be mapped to pg1's egress value of 254 + # + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 254) + rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IPv6].tc, 254) + + # + # different input ToS to test the mapping + # + p_v4[IP].tos = 127 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 128) + p_v6[IPv6].tc = 127 + rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IPv6].tc, 128) + + p_v4[IP].tos = 254 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 1) + p_v6[IPv6].tc = 254 + rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IPv6].tc, 1) + + # + # send packets out the other interfaces to test the maps are + # correctly applied + # + p_v4[IP].dst = self.pg2.remote_ip4 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2) + for p in rx: + self.assertEqual(p[IP].tos, 2) + + p_v4[IP].dst = self.pg3.remote_ip4 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3) + for p in rx: + self.assertEqual(p[IP].tos, 3) + + p_v6[IPv6].dst = self.pg3.remote_ip6 + rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3) + for p in rx: + self.assertEqual(p[IPv6].tc, 3) + + # + # remove the map on pg2 and pg3, now expect an unchanged IP tos + # + self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index, + QOS_SOURCE.IP, + 2, + 0) + self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index, + QOS_SOURCE.IP, + 3, + 0) + self.logger.info(self.vapi.cli("sh int feat pg2")) + + p_v4[IP].dst = self.pg2.remote_ip4 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2) + for p in rx: + self.assertEqual(p[IP].tos, 254) + + p_v4[IP].dst = self.pg3.remote_ip4 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3) + for p in rx: + self.assertEqual(p[IP].tos, 254) + + # + # still mapping out of pg1 + # + p_v4[IP].dst = self.pg1.remote_ip4 + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 1) + + # + # disable the input recording on pg0 + # + self.vapi.qos_record_enable_disable(self.pg0.sw_if_index, + QOS_SOURCE.IP, + 0) + + # + # back to an unchanged TOS value + # + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 254) + + # + # disable the egress map on pg1 and pg4 + # + self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index, + QOS_SOURCE.IP, + 1, + 0) + self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index, + QOS_SOURCE.IP, + 4, + 0) + + # + # unchanged Tos on pg1 + # + rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1) + for p in rx: + self.assertEqual(p[IP].tos, 254) + + # + # clean-up the masp + # + self.vapi.qos_egress_map_delete(1) + self.vapi.qos_egress_map_delete(4) + self.vapi.qos_egress_map_delete(2) + self.vapi.qos_egress_map_delete(3) + self.vapi.qos_egress_map_delete(5) + self.vapi.qos_egress_map_delete(6) + self.vapi.qos_egress_map_delete(7) + + def test_qos_mpls(self): + """ QoS Mark MPLS """ + + # + # 255 QoS for all input values + # + output = [chr(255)] * 256 + os = ''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + + self.vapi.qos_egress_map_update(1, rows) + + # + # a route with 1 MPLS label + # + route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32, + [VppRoutePath(self.pg1.remote_ip4, + self.pg1.sw_if_index, + labels=[32])]) + route_10_0_0_1.add_vpp_config() + + # + # a route with 3 MPLS labels + # + route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32, + [VppRoutePath(self.pg1.remote_ip4, + self.pg1.sw_if_index, + labels=[63, 33, 34])]) + route_10_0_0_3.add_vpp_config() + + # + # enable IP QoS recording on the input Pg0 and MPLS egress marking + # on Pg1 + # + self.vapi.qos_record_enable_disable(self.pg0.sw_if_index, + QOS_SOURCE.IP, + 1) + self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index, + QOS_SOURCE.MPLS, + 1, + 1) + + # + # packet that will get one label added and 3 labels added resp. + # + p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) / + UDP(sport=1234, dport=1234) / + Raw(chr(100) * 65)) + p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) / + UDP(sport=1234, dport=1234) / + Raw(chr(100) * 65)) + + rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1) + + # + # only 3 bits of ToS value in MPLS make sure tos is correct + # and the label and EOS bit have not been corrupted + # + for p in rx: + self.assertEqual(p[MPLS].cos, 7) + self.assertEqual(p[MPLS].label, 32) + self.assertEqual(p[MPLS].s, 1) + rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1) + for p in rx: + self.assertEqual(p[MPLS].cos, 7) + self.assertEqual(p[MPLS].label, 63) + self.assertEqual(p[MPLS].s, 0) + h = p[MPLS].payload + self.assertEqual(h[MPLS].cos, 7) + self.assertEqual(h[MPLS].label, 33) + self.assertEqual(h[MPLS].s, 0) + h = h[MPLS].payload + self.assertEqual(h[MPLS].cos, 7) + self.assertEqual(h[MPLS].label, 34) + self.assertEqual(h[MPLS].s, 1) + + # + # cleanup + # + self.vapi.qos_record_enable_disable(self.pg0.sw_if_index, + QOS_SOURCE.IP, + 0) + self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index, + QOS_SOURCE.MPLS, + 1, + 0) + self.vapi.qos_egress_map_delete(1) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |