From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_classifier.py | 607 +++++++++++++++++++++++++++--------------------- 1 file changed, 340 insertions(+), 267 deletions(-) (limited to 'test/test_classifier.py') diff --git a/test/test_classifier.py b/test/test_classifier.py index 9f0fdbf587a..068561230ea 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -19,7 +19,7 @@ from vpp_papi import VppEnum # Tests split to different test case classes because of issue reported in # ticket VPP-1336 class TestClassifierIP(TestClassifier): - """ Classifier IP Test Case """ + """Classifier IP Test Case""" @classmethod def setUpClass(cls): @@ -30,7 +30,7 @@ class TestClassifierIP(TestClassifier): super(TestClassifierIP, cls).tearDownClass() def test_iacl_src_ip(self): - """ Source IP iACL test + """Source IP iACL test Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -42,11 +42,11 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip_src' - self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff')) + key = "ip_src" + self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg0.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -60,7 +60,7 @@ class TestClassifierIP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_dst_ip(self): - """ Destination IP iACL test + """Destination IP iACL test Test scenario for basic IP ACL with destination IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -72,11 +72,11 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip_dst' - self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff')) + key = "ip_dst" + self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(dst_ip=self.pg1.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -90,7 +90,7 @@ class TestClassifierIP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_src_dst_ip(self): - """ Source and destination IP iACL test + """Source and destination IP iACL test Test scenario for basic IP ACL with source and destination IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -102,13 +102,14 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip' + key = "ip" self.create_classify_table( - key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff')) + key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4)) + self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4), + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -123,7 +124,7 @@ class TestClassifierIP(TestClassifier): class TestClassifierUDP(TestClassifier): - """ Classifier UDP proto Test Case """ + """Classifier UDP proto Test Case""" @classmethod def setUpClass(cls): @@ -134,7 +135,7 @@ class TestClassifierUDP(TestClassifier): super(TestClassifierUDP, cls).tearDownClass() def test_iacl_proto_udp(self): - """ UDP protocol iACL test + """UDP protocol iACL test Test scenario for basic protocol ACL with UDP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -146,13 +147,12 @@ class TestClassifierUDP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'proto_udp' - self.create_classify_table(key, self.build_ip_mask(proto='ff')) + key = "proto_udp" + self.create_classify_table(key, self.build_ip_mask(proto="ff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP) + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -165,7 +165,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_sport(self): - """ UDP source port iACL test + """UDP source port iACL test Test scenario for basic protocol ACL with UDP and sport - Create IPv4 stream for pg0 -> pg1 interface. @@ -175,18 +175,18 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and sport sport = 38 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', src_port='ffff')) + key = "proto_udp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -199,7 +199,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_dport(self): - """ UDP destination port iACL test + """UDP destination port iACL test Test scenario for basic protocol ACL with UDP and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -209,18 +209,18 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and dport dport = 427 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=1234, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_dport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', dst_port='ffff')) + key = "proto_udp_dport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -233,7 +233,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_sport_dport(self): - """ UDP source and destination ports iACL test + """UDP source and destination ports iACL test Test scenario for basic protocol ACL with UDP and sport and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -244,20 +244,22 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and sport and dport sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_ports' + key = "proto_udp_ports" self.create_classify_table( - key, - self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff')) + key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport, - dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match( + proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport + ), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -271,7 +273,7 @@ class TestClassifierUDP(TestClassifier): class TestClassifierTCP(TestClassifier): - """ Classifier TCP proto Test Case """ + """Classifier TCP proto Test Case""" @classmethod def setUpClass(cls): @@ -282,7 +284,7 @@ class TestClassifierTCP(TestClassifier): super(TestClassifierTCP, cls).tearDownClass() def test_iacl_proto_tcp(self): - """ TCP protocol iACL test + """TCP protocol iACL test Test scenario for basic protocol ACL with TCP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -291,17 +293,17 @@ class TestClassifierTCP(TestClassifier): """ # Basic iACL testing with TCP protocol - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=1234, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp' - self.create_classify_table(key, self.build_ip_mask(proto='ff')) + key = "proto_tcp" + self.create_classify_table(key, self.build_ip_mask(proto="ff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP) + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -314,7 +316,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_sport(self): - """ TCP source port iACL test + """TCP source port iACL test Test scenario for basic protocol ACL with TCP and sport - Create IPv4 stream for pg0 -> pg1 interface. @@ -324,18 +326,18 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and sport sport = 38 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=sport, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', src_port='ffff')) + key = "proto_tcp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -348,7 +350,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_dport(self): - """ TCP destination port iACL test + """TCP destination port iACL test Test scenario for basic protocol ACL with TCP and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -358,18 +360,18 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and dport dport = 427 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=1234, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', dst_port='ffff')) + key = "proto_tcp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -382,7 +384,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_sport_dport(self): - """ TCP source and destination ports iACL test + """TCP source and destination ports iACL test Test scenario for basic protocol ACL with TCP and sport and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -393,20 +395,22 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and sport and dport sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_ports' + key = "proto_tcp_ports" self.create_classify_table( - key, - self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff')) + key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport, - dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match( + proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport + ), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -420,7 +424,7 @@ class TestClassifierTCP(TestClassifier): class TestClassifierIPOut(TestClassifier): - """ Classifier output IP Test Case """ + """Classifier output IP Test Case""" @classmethod def setUpClass(cls): @@ -431,7 +435,7 @@ class TestClassifierIPOut(TestClassifier): super(TestClassifierIPOut, cls).tearDownClass() def test_acl_ip_out(self): - """ Output IP ACL test + """Output IP ACL test Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg1 -> pg0 interface. @@ -443,12 +447,13 @@ class TestClassifierIPOut(TestClassifier): pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes) self.pg1.add_stream(pkts) - key = 'ip_out' + key = "ip_out" self.create_classify_table( - key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0) + key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0 + ) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg1.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4) + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -463,7 +468,7 @@ class TestClassifierIPOut(TestClassifier): class TestClassifierMAC(TestClassifier): - """ Classifier MAC Test Case """ + """Classifier MAC Test Case""" @classmethod def setUpClass(cls): @@ -474,7 +479,7 @@ class TestClassifierMAC(TestClassifier): super(TestClassifierMAC, cls).tearDownClass() def test_acl_mac(self): - """ MAC ACL test + """MAC ACL test Test scenario for basic MAC ACL with source MAC - Create IPv4 stream for pg0 -> pg2 interface. @@ -486,12 +491,13 @@ class TestClassifierMAC(TestClassifier): pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'mac' + key = "mac" self.create_classify_table( - key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14) + key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14 + ) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.remote_mac)) + self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -506,7 +512,7 @@ class TestClassifierMAC(TestClassifier): class TestClassifierComplex(TestClassifier): - """ Large & Nested Classifiers Test Cases """ + """Large & Nested Classifiers Test Cases""" @classmethod def setUpClass(cls): @@ -517,7 +523,7 @@ class TestClassifierComplex(TestClassifier): super(TestClassifierComplex, cls).tearDownClass() def test_iacl_large(self): - """ Large input ACL test + """Large input ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg0 -> pg1 interface. @@ -527,7 +533,7 @@ class TestClassifierComplex(TestClassifier): # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum - msk = VarMask(offset=40, spec='ffff') + msk = VarMask(offset=40, spec="ffff") mth = VarMatch(offset=40, value=0x1234, length=2) payload_msk = self.build_payload_mask([msk]) @@ -537,38 +543,49 @@ class TestClassifierComplex(TestClassifier): dport = 9080 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) - packet_ex = bytes.fromhex(('0' * 36) + '1234') - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport), - packet_ex) + packet_ex = bytes.fromhex(("0" * 36) + "1234") + pkts = self.create_stream( + self.pg0, + self.pg1, + self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex, + ) self.pg0.add_stream(pkts) - key = 'large_in' + key = "large_in" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff') + - payload_msk, - data_offset=-14) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ) + + payload_msk, + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.remote_mac, - dst_mac=self.pg0.local_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4, - src_port=sport, - dst_port=dport) + - payload_match + self.build_mac_match( + src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport, + ) + + payload_match, ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) @@ -584,7 +601,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_oacl_large(self): - """ Large output ACL test + """Large output ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg1 -> pg0 interface. - Create large acl matching on ethernet+ip+udp header fields @@ -593,7 +610,7 @@ class TestClassifierComplex(TestClassifier): # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum - msk = VarMask(offset=40, spec='ffff') + msk = VarMask(offset=40, spec="ffff") mth = VarMatch(offset=40, value=0x1234, length=2) payload_msk = self.build_payload_mask([msk]) @@ -603,38 +620,50 @@ class TestClassifierComplex(TestClassifier): dport = 9080 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) - packet_ex = bytes.fromhex(('0' * 36) + '1234') - pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport), - packet_ex) + packet_ex = bytes.fromhex(("0" * 36) + "1234") + pkts = self.create_stream( + self.pg1, + self.pg0, + self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex, + ) self.pg1.add_stream(pkts) - key = 'large_out' + key = "large_out" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff') + - payload_msk, - data_offset=-14) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ) + + payload_msk, + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.local_mac, - dst_mac=self.pg0.remote_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg1.remote_ip4, - dst_ip=self.pg0.remote_ip4, - src_port=sport, - dst_port=dport) + - payload_match) + self.build_mac_match( + src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport, + ) + + payload_match, + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -649,7 +678,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_nested(self): - """ Nested input ACL test + """Nested input ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg0 -> pg1 interface. @@ -660,48 +689,60 @@ class TestClassifierComplex(TestClassifier): sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - subtable_key = 'subtable_in' + subtable_key = "subtable_in" self.create_classify_table( subtable_key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - data_offset=-14) - - key = 'nested_in' + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + data_offset=-14, + ) + + key = "nested_in" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - next_table_index=self.acl_tbl_idx.get(subtable_key)) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + next_table_index=self.acl_tbl_idx.get(subtable_key), + ) self.create_classify_session( self.acl_tbl_idx.get(subtable_key), - self.build_mac_match(src_mac=self.pg0.remote_mac, - dst_mac=self.pg0.local_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4, - src_port=sport, - dst_port=dport)) + self.build_mac_match( + src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport, + ), + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -716,7 +757,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_oacl_nested(self): - """ Nested output ACL test + """Nested output ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg1 -> pg0 interface. @@ -727,48 +768,60 @@ class TestClassifierComplex(TestClassifier): sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg1.add_stream(pkts) - subtable_key = 'subtable_out' + subtable_key = "subtable_out" self.create_classify_table( subtable_key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - data_offset=-14) - - key = 'nested_out' + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + data_offset=-14, + ) + + key = "nested_out" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), next_table_index=self.acl_tbl_idx.get(subtable_key), - data_offset=-14) + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(subtable_key), - self.build_mac_match(src_mac=self.pg0.local_mac, - dst_mac=self.pg0.remote_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg1.remote_ip4, - dst_ip=self.pg0.remote_ip4, - src_port=sport, - dst_port=dport)) + self.build_mac_match( + src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport, + ), + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -784,7 +837,7 @@ class TestClassifierComplex(TestClassifier): class TestClassifierPBR(TestClassifier): - """ Classifier PBR Test Case """ + """Classifier PBR Test Case""" @classmethod def setUpClass(cls): @@ -795,7 +848,7 @@ class TestClassifierPBR(TestClassifier): super(TestClassifierPBR, cls).tearDownClass() def test_acl_pbr(self): - """ IP PBR test + """IP PBR test Test scenario for PBR with source IP - Create IPv4 stream for pg0 -> pg3 interface. @@ -807,19 +860,24 @@ class TestClassifierPBR(TestClassifier): pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'pbr' - self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff')) + key = "pbr" + self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff")) pbr_option = 1 # this will create the VRF/table in which we will insert the route self.create_classify_session( self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4), - pbr_option, self.pbr_vrfid) + pbr_option, + self.pbr_vrfid, + ) self.assertTrue(self.verify_vrf(self.pbr_vrfid)) - r = VppIpRoute(self, self.pg3.local_ip4, 24, - [VppRoutePath(self.pg3.remote_ip4, - INVALID_INDEX)], - table_id=self.pbr_vrfid) + r = VppIpRoute( + self, + self.pg3.local_ip4, + 24, + [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)], + table_id=self.pbr_vrfid, + ) r.add_vpp_config() self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) @@ -839,14 +897,17 @@ class TestClassifierPBR(TestClassifier): self.create_classify_session( self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4), - pbr_option, self.pbr_vrfid, is_add=0) + pbr_option, + self.pbr_vrfid, + is_add=0, + ) # and the table should be gone. self.assertFalse(self.verify_vrf(self.pbr_vrfid)) class TestClassifierPunt(TestClassifier): - """ Classifier punt Test Case """ + """Classifier punt Test Case""" @classmethod def setUpClass(cls): @@ -857,7 +918,7 @@ class TestClassifierPunt(TestClassifier): super(TestClassifierPunt, cls).tearDownClass() def test_punt_udp(self): - """ IPv4/UDP protocol punt ACL test + """IPv4/UDP protocol punt ACL test Test scenario for basic punt ACL with UDP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -868,13 +929,10 @@ class TestClassifierPunt(TestClassifier): sport = 6754 dport = 17923 - key = 'ip4_udp_punt' + key = "ip4_udp_punt" self.create_classify_table( - key, - self.build_ip_mask( - src_ip='ffffffff', - proto='ff', - src_port='ffff')) + key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff") + ) table_index = self.acl_tbl_idx.get(key) self.vapi.punt_acl_add_del(ip4_table_index=table_index) self.acl_active_table = key @@ -883,52 +941,67 @@ class TestClassifierPunt(TestClassifier): self.vapi.set_punt( is_add=1, punt={ - 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, - 'punt': { - 'l4': { - 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4, - 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, - 'port': dport, - }}}) - self.vapi.ip_punt_redirect(punt={ - 'rx_sw_if_index': self.pg0.sw_if_index, - 'tx_sw_if_index': self.pg1.sw_if_index, - 'nh': self.pg1.remote_ip4, - }) - - pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / - UDP(sport=sport, dport=dport) / - Raw('\x17' * 100))] * 2 + "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, + "punt": { + "l4": { + "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, + "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, + "port": dport, + } + }, + }, + ) + self.vapi.ip_punt_redirect( + punt={ + "rx_sw_if_index": self.pg0.sw_if_index, + "tx_sw_if_index": self.pg1.sw_if_index, + "nh": self.pg1.remote_ip4, + } + ) + + pkts = [ + ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) + / UDP(sport=sport, dport=dport) + / Raw("\x17" * 100) + ) + ] * 2 # allow a session but not matching the stream: expect to drop self.create_classify_session( table_index, - self.build_ip_match(src_ip=self.pg0.remote_ip4, - proto=socket.IPPROTO_UDP, src_port=sport + 10)) + self.build_ip_match( + src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, + src_port=sport + 10, + ), + ) self.send_and_assert_no_replies(self.pg0, pkts) # allow a session matching the stream: expect to pass self.create_classify_session( table_index, - self.build_ip_match(src_ip=self.pg0.remote_ip4, - proto=socket.IPPROTO_UDP, src_port=sport)) + self.build_ip_match( + src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport + ), + ) self.send_and_expect_only(self.pg0, pkts, self.pg1) # test dump api: ip4 is set, ip6 is not r = self.vapi.punt_acl_get() self.assertEqual(r.ip4_table_index, table_index) - self.assertEqual(r.ip6_table_index, 0xffffffff) + self.assertEqual(r.ip6_table_index, 0xFFFFFFFF) # cleanup - self.acl_active_table = '' + self.acl_active_table = "" self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0) # test dump api: nothing set r = self.vapi.punt_acl_get() - self.assertEqual(r.ip4_table_index, 0xffffffff) - self.assertEqual(r.ip6_table_index, 0xffffffff) + self.assertEqual(r.ip4_table_index, 0xFFFFFFFF) + self.assertEqual(r.ip6_table_index, 0xFFFFFFFF) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg