diff options
author | Benoît Ganne <bganne@cisco.com> | 2021-09-30 13:41:00 +0200 |
---|---|---|
committer | Neale Ranns <neale@graphiant.com> | 2021-10-06 11:58:43 +0000 |
commit | abb2a42239430a1a67b259b931848a9195402d1a (patch) | |
tree | ebc0c6ed52424f0bea5130090a00b96053e4b451 /test | |
parent | 7b3a3df263c7a5bf549f350553cbd9bce7ee40b3 (diff) |
ip: add classifier-based ACLs support on ip punt
This feature allows one to add classifier-based ACLs on packets punted
from the ip infra, eg. to only whitelist specific sender(s).
Type: feature
Change-Id: Idab37b188583efbca980038875fc3e540cb2e880
Signed-off-by: Benoît Ganne <bganne@cisco.com>
Diffstat (limited to 'test')
-rw-r--r-- | test/template_classifier.py | 3 | ||||
-rw-r--r-- | test/test_classifier.py | 77 |
2 files changed, 78 insertions, 2 deletions
diff --git a/test/template_classifier.py b/test/template_classifier.py index b9a8ca9d7f3..7ce69d436f5 100644 --- a/test/template_classifier.py +++ b/test/template_classifier.py @@ -116,11 +116,10 @@ class TestClassifier(VppTestCase): if self.acl_active_table.endswith('out'): self.output_acl_set_interface( self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' elif self.acl_active_table != '': self.input_acl_set_interface( self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.acl_active_table = '' for intf in self.interfaces: if self.af == AF_INET: diff --git a/test/test_classifier.py b/test/test_classifier.py index 1c7fb4c5a5c..74851f92377 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -13,6 +13,7 @@ from util import ppp from template_classifier import TestClassifier, VarMask, VarMatch from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import INVALID_INDEX +from vpp_papi import VppEnum # Tests split to different test case classes because of issue reported in @@ -843,5 +844,81 @@ class TestClassifierPBR(TestClassifier): # and the table should be gone. self.assertFalse(self.verify_vrf(self.pbr_vrfid)) + +class TestClassifierPunt(TestClassifier): + """ Classifier punt Test Case """ + + @classmethod + def setUpClass(cls): + super(TestClassifierPunt, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestClassifierPunt, cls).tearDownClass() + + def test_punt_udp(self): + """ IPv4/UDP protocol punt ACL test + + Test scenario for basic punt ACL with UDP protocol + - Create IPv4 stream for pg0 -> pg1 interface. + - Create punt ACL with UDP IP protocol. + - Send and verify received packets on pg1 interface. + """ + + sport = 6754 + dport = 17923 + + key = 'ip4_udp_punt' + self.create_classify_table( + key, + self.build_ip_mask( + src_ip='ffffffff', + proto='ff', + src_port='ffff')) + table_index = self.acl_tbl_idx.get(key) + self.vapi.punt_acl_add_del(ip4_table_index=table_index) + self.acl_active_table = key + + # punt udp packets to dport received on pg0 through pg1 + self.vapi.set_punt( + is_add=1, + punt={ + 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, + 'punt': { + 'l4': { + 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4, + 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, + 'port': dport, + }}}) + self.vapi.ip_punt_redirect(punt={ + 'rx_sw_if_index': self.pg0.sw_if_index, + 'tx_sw_if_index': self.pg1.sw_if_index, + 'nh': self.pg1.remote_ip4, + }) + + pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + UDP(sport=sport, dport=dport) / + Raw('\x17' * 100))] * 2 + + # allow a session but not matching the stream: expect to drop + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport + 10)) + self.send_and_assert_no_replies(self.pg0, pkts) + + # allow a session matching the stream: expect to pass + self.create_classify_session( + table_index, + self.build_ip_match(src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, src_port=sport)) + self.send_and_expect_only(self.pg0, pkts, self.pg1) + + # cleanup + self.acl_active_table = '' + self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |