From abb2a42239430a1a67b259b931848a9195402d1a Mon Sep 17 00:00:00 2001 From: Benoît Ganne Date: Thu, 30 Sep 2021 13:41:00 +0200 Subject: ip: add classifier-based ACLs support on ip punt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This feature allows one to add classifier-based ACLs on packets punted from the ip infra, eg. to only whitelist specific sender(s). Type: feature Change-Id: Idab37b188583efbca980038875fc3e540cb2e880 Signed-off-by: Benoît Ganne --- test/test_classifier.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) (limited to 'test/test_classifier.py') diff --git a/test/test_classifier.py b/test/test_classifier.py index 1c7fb4c5a5c..74851f92377 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -13,6 +13,7 @@ from util import ppp from template_classifier import TestClassifier, VarMask, VarMatch from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import INVALID_INDEX +from vpp_papi import VppEnum # Tests split to different test case classes because of issue reported in @@ -843,5 +844,81 @@ class TestClassifierPBR(TestClassifier): # and the table should be gone. self.assertFalse(self.verify_vrf(self.pbr_vrfid)) + +class TestClassifierPunt(TestClassifier): + """ Classifier punt Test Case """ + + @classmethod + def setUpClass(cls): + super(TestClassifierPunt, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierPunt, cls).tearDownClass() + + def test_punt_udp(self): + """ IPv4/UDP protocol punt ACL test + + Test scenario for basic punt ACL with UDP protocol + - Create IPv4 stream for pg0 -> pg1 interface. + - Create punt ACL with UDP IP protocol. + - Send and verify received packets on pg1 interface. + """ + + sport = 6754 + dport = 17923 + + key = 'ip4_udp_punt' + self.create_classify_table( + key, + self.build_ip_mask( + src_ip='ffffffff', + proto='ff', + src_port='ffff')) + table_index = self.acl_tbl_idx.get(key) + self.vapi.punt_acl_add_del(ip4_table_index=table_index) + self.acl_active_table = key + + # punt udp packets to dport received on pg0 through pg1 + self.vapi.set_punt( + is_add=1, + punt={ + 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, + 'punt': { + 'l4': { + 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4, + 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, + 'port': dport, + }}}) + self.vapi.ip_punt_redirect(punt={ + 'rx_sw_if_index': self.pg0.sw_if_index, + 'tx_sw_if_index': self.pg1.sw_if_index, + 'nh': self.pg1.remote_ip4, + }) + + pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + UDP(sport=sport, dport=dport) / + Raw('\x17' * 100))] * 2 + + # allow a session but not matching the stream: expect to drop + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport + 10)) + self.send_and_assert_no_replies(self.pg0, pkts) + + # allow a session matching the stream: expect to pass + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport)) + self.send_and_expect_only(self.pg0, pkts, self.pg1) + + # cleanup + self.acl_active_table = '' + self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg