summaryrefslogtreecommitdiffstats
path: root/test/test_classifier.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_classifier.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_classifier.py')
-rw-r--r--test/test_classifier.py607
1 files changed, 340 insertions, 267 deletions
diff --git a/test/test_classifier.py b/test/test_classifier.py
index 9f0fdbf587a..068561230ea 100644
--- a/test/test_classifier.py
+++ b/test/test_classifier.py
@@ -19,7 +19,7 @@ from vpp_papi import VppEnum
# Tests split to different test case classes because of issue reported in
# ticket VPP-1336
class TestClassifierIP(TestClassifier):
- """ Classifier IP Test Case """
+ """Classifier IP Test Case"""
@classmethod
def setUpClass(cls):
@@ -30,7 +30,7 @@ class TestClassifierIP(TestClassifier):
super(TestClassifierIP, cls).tearDownClass()
def test_iacl_src_ip(self):
- """ Source IP iACL test
+ """Source IP iACL test
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -42,11 +42,11 @@ class TestClassifierIP(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'ip_src'
- self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
+ key = "ip_src"
+ self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_ip_match(src_ip=self.pg0.remote_ip4))
+ self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -60,7 +60,7 @@ class TestClassifierIP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_dst_ip(self):
- """ Destination IP iACL test
+ """Destination IP iACL test
Test scenario for basic IP ACL with destination IP
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -72,11 +72,11 @@ class TestClassifierIP(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'ip_dst'
- self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff'))
+ key = "ip_dst"
+ self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_ip_match(dst_ip=self.pg1.remote_ip4))
+ self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -90,7 +90,7 @@ class TestClassifierIP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_src_dst_ip(self):
- """ Source and destination IP iACL test
+ """Source and destination IP iACL test
Test scenario for basic IP ACL with source and destination IP
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -102,13 +102,14 @@ class TestClassifierIP(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'ip'
+ key = "ip"
self.create_classify_table(
- key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff'))
+ key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
+ )
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(src_ip=self.pg0.remote_ip4,
- dst_ip=self.pg1.remote_ip4))
+ self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -123,7 +124,7 @@ class TestClassifierIP(TestClassifier):
class TestClassifierUDP(TestClassifier):
- """ Classifier UDP proto Test Case """
+ """Classifier UDP proto Test Case"""
@classmethod
def setUpClass(cls):
@@ -134,7 +135,7 @@ class TestClassifierUDP(TestClassifier):
super(TestClassifierUDP, cls).tearDownClass()
def test_iacl_proto_udp(self):
- """ UDP protocol iACL test
+ """UDP protocol iACL test
Test scenario for basic protocol ACL with UDP protocol
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -146,13 +147,12 @@ class TestClassifierUDP(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'proto_udp'
- self.create_classify_table(key, self.build_ip_mask(proto='ff'))
+ key = "proto_udp"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff"))
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_UDP))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -165,7 +165,7 @@ class TestClassifierUDP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_udp_sport(self):
- """ UDP source port iACL test
+ """UDP source port iACL test
Test scenario for basic protocol ACL with UDP and sport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -175,18 +175,18 @@ class TestClassifierUDP(TestClassifier):
# Basic iACL testing with UDP and sport
sport = 38
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=5678))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_udp_sport'
- self.create_classify_table(
- key, self.build_ip_mask(proto='ff', src_port='ffff'))
+ key = "proto_udp_sport"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -199,7 +199,7 @@ class TestClassifierUDP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_udp_dport(self):
- """ UDP destination port iACL test
+ """UDP destination port iACL test
Test scenario for basic protocol ACL with UDP and dport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -209,18 +209,18 @@ class TestClassifierUDP(TestClassifier):
# Basic iACL testing with UDP and dport
dport = 427
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- UDP(sport=1234, dport=dport))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_udp_dport'
- self.create_classify_table(
- key, self.build_ip_mask(proto='ff', dst_port='ffff'))
+ key = "proto_udp_dport"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -233,7 +233,7 @@ class TestClassifierUDP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_udp_sport_dport(self):
- """ UDP source and destination ports iACL test
+ """UDP source and destination ports iACL test
Test scenario for basic protocol ACL with UDP and sport and dport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -244,20 +244,22 @@ class TestClassifierUDP(TestClassifier):
# Basic iACL testing with UDP and sport and dport
sport = 13720
dport = 9080
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=dport))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_udp_ports'
+ key = "proto_udp_ports"
self.create_classify_table(
- key,
- self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
+ key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
+ )
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport,
- dst_port=dport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(
+ proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
+ ),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -271,7 +273,7 @@ class TestClassifierUDP(TestClassifier):
class TestClassifierTCP(TestClassifier):
- """ Classifier TCP proto Test Case """
+ """Classifier TCP proto Test Case"""
@classmethod
def setUpClass(cls):
@@ -282,7 +284,7 @@ class TestClassifierTCP(TestClassifier):
super(TestClassifierTCP, cls).tearDownClass()
def test_iacl_proto_tcp(self):
- """ TCP protocol iACL test
+ """TCP protocol iACL test
Test scenario for basic protocol ACL with TCP protocol
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -291,17 +293,17 @@ class TestClassifierTCP(TestClassifier):
"""
# Basic iACL testing with TCP protocol
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- TCP(sport=1234, dport=5678))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_tcp'
- self.create_classify_table(key, self.build_ip_mask(proto='ff'))
+ key = "proto_tcp"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff"))
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_TCP))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -314,7 +316,7 @@ class TestClassifierTCP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_tcp_sport(self):
- """ TCP source port iACL test
+ """TCP source port iACL test
Test scenario for basic protocol ACL with TCP and sport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -324,18 +326,18 @@ class TestClassifierTCP(TestClassifier):
# Basic iACL testing with TCP and sport
sport = 38
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- TCP(sport=sport, dport=5678))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_tcp_sport'
- self.create_classify_table(
- key, self.build_ip_mask(proto='ff', src_port='ffff'))
+ key = "proto_tcp_sport"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -348,7 +350,7 @@ class TestClassifierTCP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_tcp_dport(self):
- """ TCP destination port iACL test
+ """TCP destination port iACL test
Test scenario for basic protocol ACL with TCP and dport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -358,18 +360,18 @@ class TestClassifierTCP(TestClassifier):
# Basic iACL testing with TCP and dport
dport = 427
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- TCP(sport=1234, dport=dport))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_tcp_sport'
- self.create_classify_table(
- key, self.build_ip_mask(proto='ff', dst_port='ffff'))
+ key = "proto_tcp_sport"
+ self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -382,7 +384,7 @@ class TestClassifierTCP(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_proto_tcp_sport_dport(self):
- """ TCP source and destination ports iACL test
+ """TCP source and destination ports iACL test
Test scenario for basic protocol ACL with TCP and sport and dport
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -393,20 +395,22 @@ class TestClassifierTCP(TestClassifier):
# Basic iACL testing with TCP and sport and dport
sport = 13720
dport = 9080
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- TCP(sport=sport, dport=dport))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
+ )
self.pg0.add_stream(pkts)
- key = 'proto_tcp_ports'
+ key = "proto_tcp_ports"
self.create_classify_table(
- key,
- self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff'))
+ key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
+ )
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport,
- dst_port=dport))
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(key))
+ self.build_ip_match(
+ proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
+ ),
+ )
+ self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
@@ -420,7 +424,7 @@ class TestClassifierTCP(TestClassifier):
class TestClassifierIPOut(TestClassifier):
- """ Classifier output IP Test Case """
+ """Classifier output IP Test Case"""
@classmethod
def setUpClass(cls):
@@ -431,7 +435,7 @@ class TestClassifierIPOut(TestClassifier):
super(TestClassifierIPOut, cls).tearDownClass()
def test_acl_ip_out(self):
- """ Output IP ACL test
+ """Output IP ACL test
Test scenario for basic IP ACL with source IP
- Create IPv4 stream for pg1 -> pg0 interface.
@@ -443,12 +447,13 @@ class TestClassifierIPOut(TestClassifier):
pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
self.pg1.add_stream(pkts)
- key = 'ip_out'
+ key = "ip_out"
self.create_classify_table(
- key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0)
+ key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
+ )
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_ip_match(src_ip=self.pg1.remote_ip4))
+ self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
+ )
self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -463,7 +468,7 @@ class TestClassifierIPOut(TestClassifier):
class TestClassifierMAC(TestClassifier):
- """ Classifier MAC Test Case """
+ """Classifier MAC Test Case"""
@classmethod
def setUpClass(cls):
@@ -474,7 +479,7 @@ class TestClassifierMAC(TestClassifier):
super(TestClassifierMAC, cls).tearDownClass()
def test_acl_mac(self):
- """ MAC ACL test
+ """MAC ACL test
Test scenario for basic MAC ACL with source MAC
- Create IPv4 stream for pg0 -> pg2 interface.
@@ -486,12 +491,13 @@ class TestClassifierMAC(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'mac'
+ key = "mac"
self.create_classify_table(
- key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
+ key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
+ )
self.create_classify_session(
- self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=self.pg0.remote_mac))
+ self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -506,7 +512,7 @@ class TestClassifierMAC(TestClassifier):
class TestClassifierComplex(TestClassifier):
- """ Large & Nested Classifiers Test Cases """
+ """Large & Nested Classifiers Test Cases"""
@classmethod
def setUpClass(cls):
@@ -517,7 +523,7 @@ class TestClassifierComplex(TestClassifier):
super(TestClassifierComplex, cls).tearDownClass()
def test_iacl_large(self):
- """ Large input ACL test
+ """Large input ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -527,7 +533,7 @@ class TestClassifierComplex(TestClassifier):
# 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
# + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
- msk = VarMask(offset=40, spec='ffff')
+ msk = VarMask(offset=40, spec="ffff")
mth = VarMatch(offset=40, value=0x1234, length=2)
payload_msk = self.build_payload_mask([msk])
@@ -537,38 +543,49 @@ class TestClassifierComplex(TestClassifier):
dport = 9080
# 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
- packet_ex = bytes.fromhex(('0' * 36) + '1234')
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=dport),
- packet_ex)
+ packet_ex = bytes.fromhex(("0" * 36) + "1234")
+ pkts = self.create_stream(
+ self.pg0,
+ self.pg1,
+ self.pg_if_packet_sizes,
+ UDP(sport=sport, dport=dport),
+ packet_ex,
+ )
self.pg0.add_stream(pkts)
- key = 'large_in'
+ key = "large_in"
self.create_classify_table(
key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff') +
- payload_msk,
- data_offset=-14)
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ )
+ + payload_msk,
+ data_offset=-14,
+ )
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=self.pg0.remote_mac,
- dst_mac=self.pg0.local_mac,
- # ipv4 next header
- ether_type='0800') +
- self.build_ip_match(proto=socket.IPPROTO_UDP,
- src_ip=self.pg0.remote_ip4,
- dst_ip=self.pg1.remote_ip4,
- src_port=sport,
- dst_port=dport) +
- payload_match
+ self.build_mac_match(
+ src_mac=self.pg0.remote_mac,
+ dst_mac=self.pg0.local_mac,
+ # ipv4 next header
+ ether_type="0800",
+ )
+ + self.build_ip_match(
+ proto=socket.IPPROTO_UDP,
+ src_ip=self.pg0.remote_ip4,
+ dst_ip=self.pg1.remote_ip4,
+ src_port=sport,
+ dst_port=dport,
+ )
+ + payload_match,
)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
@@ -584,7 +601,7 @@ class TestClassifierComplex(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_oacl_large(self):
- """ Large output ACL test
+ """Large output ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg1 -> pg0 interface.
- Create large acl matching on ethernet+ip+udp header fields
@@ -593,7 +610,7 @@ class TestClassifierComplex(TestClassifier):
# 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
# + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
- msk = VarMask(offset=40, spec='ffff')
+ msk = VarMask(offset=40, spec="ffff")
mth = VarMatch(offset=40, value=0x1234, length=2)
payload_msk = self.build_payload_mask([msk])
@@ -603,38 +620,50 @@ class TestClassifierComplex(TestClassifier):
dport = 9080
# 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
- packet_ex = bytes.fromhex(('0' * 36) + '1234')
- pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=dport),
- packet_ex)
+ packet_ex = bytes.fromhex(("0" * 36) + "1234")
+ pkts = self.create_stream(
+ self.pg1,
+ self.pg0,
+ self.pg_if_packet_sizes,
+ UDP(sport=sport, dport=dport),
+ packet_ex,
+ )
self.pg1.add_stream(pkts)
- key = 'large_out'
+ key = "large_out"
self.create_classify_table(
key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff') +
- payload_msk,
- data_offset=-14)
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ )
+ + payload_msk,
+ data_offset=-14,
+ )
self.create_classify_session(
self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=self.pg0.local_mac,
- dst_mac=self.pg0.remote_mac,
- # ipv4 next header
- ether_type='0800') +
- self.build_ip_match(proto=socket.IPPROTO_UDP,
- src_ip=self.pg1.remote_ip4,
- dst_ip=self.pg0.remote_ip4,
- src_port=sport,
- dst_port=dport) +
- payload_match)
+ self.build_mac_match(
+ src_mac=self.pg0.local_mac,
+ dst_mac=self.pg0.remote_mac,
+ # ipv4 next header
+ ether_type="0800",
+ )
+ + self.build_ip_match(
+ proto=socket.IPPROTO_UDP,
+ src_ip=self.pg1.remote_ip4,
+ dst_ip=self.pg0.remote_ip4,
+ src_port=sport,
+ dst_port=dport,
+ )
+ + payload_match,
+ )
self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -649,7 +678,7 @@ class TestClassifierComplex(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_nested(self):
- """ Nested input ACL test
+ """Nested input ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -660,48 +689,60 @@ class TestClassifierComplex(TestClassifier):
sport = 13720
dport = 9080
- pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=dport))
+ pkts = self.create_stream(
+ self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
+ )
self.pg0.add_stream(pkts)
- subtable_key = 'subtable_in'
+ subtable_key = "subtable_in"
self.create_classify_table(
subtable_key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff'),
- data_offset=-14)
-
- key = 'nested_in'
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ ),
+ data_offset=-14,
+ )
+
+ key = "nested_in"
self.create_classify_table(
key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff'),
- next_table_index=self.acl_tbl_idx.get(subtable_key))
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ ),
+ next_table_index=self.acl_tbl_idx.get(subtable_key),
+ )
self.create_classify_session(
self.acl_tbl_idx.get(subtable_key),
- self.build_mac_match(src_mac=self.pg0.remote_mac,
- dst_mac=self.pg0.local_mac,
- # ipv4 next header
- ether_type='0800') +
- self.build_ip_match(proto=socket.IPPROTO_UDP,
- src_ip=self.pg0.remote_ip4,
- dst_ip=self.pg1.remote_ip4,
- src_port=sport,
- dst_port=dport))
+ self.build_mac_match(
+ src_mac=self.pg0.remote_mac,
+ dst_mac=self.pg0.local_mac,
+ # ipv4 next header
+ ether_type="0800",
+ )
+ + self.build_ip_match(
+ proto=socket.IPPROTO_UDP,
+ src_ip=self.pg0.remote_ip4,
+ dst_ip=self.pg1.remote_ip4,
+ src_port=sport,
+ dst_port=dport,
+ ),
+ )
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -716,7 +757,7 @@ class TestClassifierComplex(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_oacl_nested(self):
- """ Nested output ACL test
+ """Nested output ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg1 -> pg0 interface.
@@ -727,48 +768,60 @@ class TestClassifierComplex(TestClassifier):
sport = 13720
dport = 9080
- pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
- UDP(sport=sport, dport=dport))
+ pkts = self.create_stream(
+ self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
+ )
self.pg1.add_stream(pkts)
- subtable_key = 'subtable_out'
+ subtable_key = "subtable_out"
self.create_classify_table(
subtable_key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff'),
- data_offset=-14)
-
- key = 'nested_out'
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ ),
+ data_offset=-14,
+ )
+
+ key = "nested_out"
self.create_classify_table(
key,
- self.build_mac_mask(src_mac='ffffffffffff',
- dst_mac='ffffffffffff',
- ether_type='ffff') +
- self.build_ip_mask(proto='ff',
- src_ip='ffffffff',
- dst_ip='ffffffff',
- src_port='ffff',
- dst_port='ffff'),
+ self.build_mac_mask(
+ src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
+ )
+ + self.build_ip_mask(
+ proto="ff",
+ src_ip="ffffffff",
+ dst_ip="ffffffff",
+ src_port="ffff",
+ dst_port="ffff",
+ ),
next_table_index=self.acl_tbl_idx.get(subtable_key),
- data_offset=-14)
+ data_offset=-14,
+ )
self.create_classify_session(
self.acl_tbl_idx.get(subtable_key),
- self.build_mac_match(src_mac=self.pg0.local_mac,
- dst_mac=self.pg0.remote_mac,
- # ipv4 next header
- ether_type='0800') +
- self.build_ip_match(proto=socket.IPPROTO_UDP,
- src_ip=self.pg1.remote_ip4,
- dst_ip=self.pg0.remote_ip4,
- src_port=sport,
- dst_port=dport))
+ self.build_mac_match(
+ src_mac=self.pg0.local_mac,
+ dst_mac=self.pg0.remote_mac,
+ # ipv4 next header
+ ether_type="0800",
+ )
+ + self.build_ip_match(
+ proto=socket.IPPROTO_UDP,
+ src_ip=self.pg1.remote_ip4,
+ dst_ip=self.pg0.remote_ip4,
+ src_port=sport,
+ dst_port=dport,
+ ),
+ )
self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
@@ -784,7 +837,7 @@ class TestClassifierComplex(TestClassifier):
class TestClassifierPBR(TestClassifier):
- """ Classifier PBR Test Case """
+ """Classifier PBR Test Case"""
@classmethod
def setUpClass(cls):
@@ -795,7 +848,7 @@ class TestClassifierPBR(TestClassifier):
super(TestClassifierPBR, cls).tearDownClass()
def test_acl_pbr(self):
- """ IP PBR test
+ """IP PBR test
Test scenario for PBR with source IP
- Create IPv4 stream for pg0 -> pg3 interface.
@@ -807,19 +860,24 @@ class TestClassifierPBR(TestClassifier):
pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- key = 'pbr'
- self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff'))
+ key = "pbr"
+ self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
pbr_option = 1
# this will create the VRF/table in which we will insert the route
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_ip_match(src_ip=self.pg0.remote_ip4),
- pbr_option, self.pbr_vrfid)
+ pbr_option,
+ self.pbr_vrfid,
+ )
self.assertTrue(self.verify_vrf(self.pbr_vrfid))
- r = VppIpRoute(self, self.pg3.local_ip4, 24,
- [VppRoutePath(self.pg3.remote_ip4,
- INVALID_INDEX)],
- table_id=self.pbr_vrfid)
+ r = VppIpRoute(
+ self,
+ self.pg3.local_ip4,
+ 24,
+ [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
+ table_id=self.pbr_vrfid,
+ )
r.add_vpp_config()
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
@@ -839,14 +897,17 @@ class TestClassifierPBR(TestClassifier):
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_ip_match(src_ip=self.pg0.remote_ip4),
- pbr_option, self.pbr_vrfid, is_add=0)
+ pbr_option,
+ self.pbr_vrfid,
+ is_add=0,
+ )
# and the table should be gone.
self.assertFalse(self.verify_vrf(self.pbr_vrfid))
class TestClassifierPunt(TestClassifier):
- """ Classifier punt Test Case """
+ """Classifier punt Test Case"""
@classmethod
def setUpClass(cls):
@@ -857,7 +918,7 @@ class TestClassifierPunt(TestClassifier):
super(TestClassifierPunt, cls).tearDownClass()
def test_punt_udp(self):
- """ IPv4/UDP protocol punt ACL test
+ """IPv4/UDP protocol punt ACL test
Test scenario for basic punt ACL with UDP protocol
- Create IPv4 stream for pg0 -> pg1 interface.
@@ -868,13 +929,10 @@ class TestClassifierPunt(TestClassifier):
sport = 6754
dport = 17923
- key = 'ip4_udp_punt'
+ key = "ip4_udp_punt"
self.create_classify_table(
- key,
- self.build_ip_mask(
- src_ip='ffffffff',
- proto='ff',
- src_port='ffff'))
+ key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
+ )
table_index = self.acl_tbl_idx.get(key)
self.vapi.punt_acl_add_del(ip4_table_index=table_index)
self.acl_active_table = key
@@ -883,52 +941,67 @@ class TestClassifierPunt(TestClassifier):
self.vapi.set_punt(
is_add=1,
punt={
- 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
- 'punt': {
- 'l4': {
- 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4,
- 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
- 'port': dport,
- }}})
- self.vapi.ip_punt_redirect(punt={
- 'rx_sw_if_index': self.pg0.sw_if_index,
- 'tx_sw_if_index': self.pg1.sw_if_index,
- 'nh': self.pg1.remote_ip4,
- })
-
- pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- UDP(sport=sport, dport=dport) /
- Raw('\x17' * 100))] * 2
+ "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
+ "punt": {
+ "l4": {
+ "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
+ "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
+ "port": dport,
+ }
+ },
+ },
+ )
+ self.vapi.ip_punt_redirect(
+ punt={
+ "rx_sw_if_index": self.pg0.sw_if_index,
+ "tx_sw_if_index": self.pg1.sw_if_index,
+ "nh": self.pg1.remote_ip4,
+ }
+ )
+
+ pkts = [
+ (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / UDP(sport=sport, dport=dport)
+ / Raw("\x17" * 100)
+ )
+ ] * 2
# allow a session but not matching the stream: expect to drop
self.create_classify_session(
table_index,
- self.build_ip_match(src_ip=self.pg0.remote_ip4,
- proto=socket.IPPROTO_UDP, src_port=sport + 10))
+ self.build_ip_match(
+ src_ip=self.pg0.remote_ip4,
+ proto=socket.IPPROTO_UDP,
+ src_port=sport + 10,
+ ),
+ )
self.send_and_assert_no_replies(self.pg0, pkts)
# allow a session matching the stream: expect to pass
self.create_classify_session(
table_index,
- self.build_ip_match(src_ip=self.pg0.remote_ip4,
- proto=socket.IPPROTO_UDP, src_port=sport))
+ self.build_ip_match(
+ src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
+ ),
+ )
self.send_and_expect_only(self.pg0, pkts, self.pg1)
# test dump api: ip4 is set, ip6 is not
r = self.vapi.punt_acl_get()
self.assertEqual(r.ip4_table_index, table_index)
- self.assertEqual(r.ip6_table_index, 0xffffffff)
+ self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
# cleanup
- self.acl_active_table = ''
+ self.acl_active_table = ""
self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)
# test dump api: nothing set
r = self.vapi.punt_acl_get()
- self.assertEqual(r.ip4_table_index, 0xffffffff)
- self.assertEqual(r.ip6_table_index, 0xffffffff)
+ self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
+ self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)