#!/usr/bin/env python3 # Copyright (c) 2021 Graphiant, Inc. import unittest from scapy.layers.inet import IP, UDP from scapy.layers.l2 import Ether from scapy.packet import Raw from framework import VppTestCase from asfframework import VppTestRunner from vpp_papi import VppEnum from vpp_policer import VppPolicer, PolicerAction, Dir NUM_PKTS = 67 class TestPolicerInput(VppTestCase): """Policer on an interface""" vpp_worker_count = 2 def setUp(self): super(TestPolicerInput, self).setUp() self.create_pg_interfaces(range(2)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() self.pkt = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=1234, dport=1234) / Raw(b"\xa5" * 100) ) def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.admin_down() super(TestPolicerInput, self).tearDown() def policer_interface_test(self, dir: Dir): pkts = self.pkt * NUM_PKTS action_tx = PolicerAction( VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0 ) policer = VppPolicer( self, "pol1", 80, 0, 1000, 0, conform_action=action_tx, exceed_action=action_tx, violate_action=action_tx, ) policer.add_vpp_config() sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index # Start policing on pg0 policer.apply_vpp_config(sw_if_index, dir, True) rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0) stats = policer.get_stats() # Single rate, 2 colour policer - expect conform, violate but no exceed self.assertGreater(stats["conform_packets"], 0) self.assertEqual(stats["exceed_packets"], 0) self.assertGreater(stats["violate_packets"], 0) # Stop policing on pg0 policer.apply_vpp_config(sw_if_index, dir, False) rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0) statsnew = policer.get_stats() # No new packets counted self.assertEqual(stats, statsnew) policer.remove_vpp_config() def test_policer_input(self): """Input Policing""" self.policer_interface_test(Dir.RX) def test_policer_output(self): """Output Policing""" self.policer_interface_test(Dir.TX) def test_policer_reset(self): """Policer reset bucket""" pkts = self.pkt * NUM_PKTS action_tx = PolicerAction( VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0 ) policer = VppPolicer( self, "pol1", 1, 0, 10000, 0, conform_action=action_tx, exceed_action=action_tx, violate_action=action_tx, ) policer.add_vpp_config() # Start policing on pg0 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True) self.send_and_expect(self.pg0, pkts, self.pg1, worker=0) details = policer.get_details() self.assertGreater(details.current_limit, details.current_bucket) self.send_and_expect(self.pg0, pkts, self.pg1, worker=0) self.vapi.policer_reset(policer_index=policer.policer_index) details = policer.get_details() self.assertEqual(details.current_limit, details.current_bucket) policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False) policer.remove_vpp_config() def test_policer_update(self): """Policer update""" pkts = self.pkt * NUM_PKTS action_tx = PolicerAction( VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0 ) policer = VppPolicer( self, "pol1", 1, 0, 10000, 0, conform_action=action_tx, exceed_action=action_tx, violate_action=action_tx, ) policer.add_vpp_config() # Start policing on pg0 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True) self.send_and_expect(self.pg0, pkts, self.pg1, worker=0) details_before = policer.get_details() self.assertGreater(details_before.current_limit, details_before.current_bucket) policer.cir = 8000 policer.commited_burst = 100000 policer.update() details_after = policer.get_details() self.assertGreater(details_after.cir, details_before.cir) self.assertGreater(details_after.cb, details_before.cb) policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False) policer.remove_vpp_config() def policer_handoff_test(self, dir: Dir): pkts = self.pkt * NUM_PKTS action_tx = PolicerAction( VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0 ) policer = VppPolicer( self, "pol2", 80, 0, 1000, 0, conform_action=action_tx, exceed_action=action_tx, violate_action=action_tx, ) policer.add_vpp_config() sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index # Bind the policer to worker 1 policer.bind_vpp_config(1, True) # Start policing on pg0 policer.apply_vpp_config(sw_if_index, dir, True) for worker in [0, 1]: self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker) self.logger.debug(self.vapi.cli("show trace max 100")) stats = policer.get_stats() stats0 = policer.get_stats(worker=0) stats1 = policer.get_stats(worker=1) # Worker 1, should have done all the policing self.assertEqual(stats, stats1) # Worker 0, should have handed everything off self.assertEqual(stats0["conform_packets"], 0) self.assertEqual(stats0["exceed_packets"], 0) self.assertEqual(stats0["violate_packets"], 0) # Unbind the policer from worker 1 and repeat policer.bind_vpp_config(1, False) for worker in [0, 1]: self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker) self.logger.debug(self.vapi.cli("show trace max 100")) # The policer should auto-bind to worker 0 when packets arrive stats = policer.get_stats() # The 2 workers should now have policed the same amount stats = policer.get_stats() stats0 = policer.get_stats(worker=0) stats1 = policer.get_stats(worker=1) self.assertGreater(stats0["conform_packets"], 0) self.assertEqual(stats0["exceed_packets"], 0) self.assertGreater(stats0["violate_packets"], 0) self.assertGreater(stats1["conform_packets"], 0) self.assertEqual(stats1["exceed_packets"], 0) self.assertGreater(stats1["violate_packets"], 0) self.assertEqual( stats0["conform_packets"] + stats1["conform_packets"], stats["conform_packets"], ) self.assertEqual( stats0["violate_packets"] + stats1["violate_packets"], stats["violate_packets"], ) # Stop policing on pg0 policer.apply_vpp_config(sw_if_index, dir, False) policer.remove_vpp_config() def test_policer_handoff_input(self): """Worker thread handoff policer input""" self.policer_handoff_test(Dir.RX) def test_policer_handoff_output(self): """Worker thread handoff policer output""" self.policer_handoff_test(Dir.TX) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)