diff options
Diffstat (limited to 'test/test_classify_l2_acl.py')
-rw-r--r-- | test/test_classify_l2_acl.py | 317 |
1 files changed, 180 insertions, 137 deletions
diff --git a/test/test_classify_l2_acl.py b/test/test_classify_l2_acl.py index b1309881e58..d94af3f718e 100644 --- a/test/test_classify_l2_acl.py +++ b/test/test_classify_l2_acl.py @@ -20,7 +20,7 @@ from template_classifier import TestClassifier class TestClassifyAcl(TestClassifier): - """ Classifier-based L2 input and output ACL Test Case """ + """Classifier-based L2 input and output ACL Test Case""" # traffic types IP = 0 @@ -37,7 +37,7 @@ class TestClassifyAcl(TestClassifier): # supported protocols proto = [[6, 17], [1, 58]] - proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'} + proto_map = {1: "ICMP", 58: "ICMPv6EchoRequest", 6: "TCP", 17: "UDP"} ICMPv4 = 0 ICMPv6 = 1 TCP = 0 @@ -104,11 +104,11 @@ class TestClassifyAcl(TestClassifier): # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, - learn=1) + cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, learn=1) for pg_if in cls.pg_interfaces: cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -128,7 +128,7 @@ class TestClassifyAcl(TestClassifier): # self.warmup_test() # Holder of the active classify table key - cls.acl_active_table = '' + cls.acl_active_table = "" except Exception: super(TestClassifyAcl, cls).tearDownClass() @@ -147,25 +147,30 @@ class TestClassifyAcl(TestClassifier): Show various debug prints after each test. """ if not self.vpp_dead: - if self.acl_active_table == 'mac_inout': + if self.acl_active_table == "mac_inout": self.output_acl_set_interface( - self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' - elif self.acl_active_table == 'mac_out': + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" + elif self.acl_active_table == "mac_out": self.output_acl_set_interface( - self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' - elif self.acl_active_table == 'mac_in': + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" + elif self.acl_active_table == "mac_in": self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) - self.acl_active_table = '' + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0 + ) + self.acl_active_table = "" super(TestClassifyAcl, self).tearDown() - def create_classify_session(self, intf, table_index, match, - hit_next_index=0xffffffff, is_add=1): + def create_classify_session( + self, intf, table_index, match, hit_next_index=0xFFFFFFFF, is_add=1 + ): """Create Classify Session :param VppInterface intf: Interface to apply classify session. @@ -180,8 +185,9 @@ class TestClassifyAcl(TestClassifier): table_index=table_index, match=mask_match, match_len=mask_match_len, - hit_next_index=hit_next_index) - self.assertIsNotNone(r, 'No response msg for add_del_session') + hit_next_index=hit_next_index, + ) + self.assertIsNotNone(r, "No response msg for add_del_session") def create_hosts(self, count, start=0): """ @@ -197,39 +203,50 @@ class TestClassifyAcl(TestClassifier): for pg_if in self.pg_interfaces: i += 1 start_nr = macs_per_if * i + start - end_nr = count + start if i == (n_int - 1) \ - else macs_per_if * (i + 1) + start + end_nr = ( + count + start if i == (n_int - 1) else macs_per_if * (i + 1) + start + ) hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] for j in range(start_nr, end_nr): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j), - "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j), + ) hosts.append(host) def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] - if p == 'UDP': + if p == "UDP": if ports == 0: - return UDP(sport=random.randint(self.udp_sport_from, - self.udp_sport_to), - dport=random.randint(self.udp_dport_from, - self.udp_dport_to)) + return UDP( + sport=random.randint(self.udp_sport_from, self.udp_sport_to), + dport=random.randint(self.udp_dport_from, self.udp_dport_to), + ) else: return UDP(sport=ports, dport=ports) - elif p == 'TCP': + elif p == "TCP": if ports == 0: - return TCP(sport=random.randint(self.tcp_sport_from, - self.tcp_sport_to), - dport=random.randint(self.tcp_dport_from, - self.tcp_dport_to)) + return TCP( + sport=random.randint(self.tcp_sport_from, self.tcp_sport_to), + dport=random.randint(self.tcp_dport_from, self.tcp_dport_to), + ) else: return TCP(sport=ports, dport=ports) - return '' - - def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0, fragments=False, - pkt_raw=True, etype=-1): + return "" + + def create_stream( + self, + src_if, + packet_sizes, + traffic_type=0, + ipv6=0, + proto=-1, + ports=0, + fragments=False, + pkt_raw=True, + etype=-1, + ): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -262,26 +279,25 @@ class TestClassifyAcl(TestClassifier): payload = self.info_to_payload(pkt_info) p = Ether(dst=dst_host.mac, src=src_host.mac) if etype > 0: - p = Ether(dst=dst_host.mac, - src=src_host.mac, - type=etype) + p = Ether(dst=dst_host.mac, src=src_host.mac, type=etype) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) if fragments: p /= IPv6ExtHdrFragment(offset=64, m=1) else: if fragments: - p /= IP(src=src_host.ip4, dst=dst_host.ip4, - flags=1, frag=64) + p /= IP( + src=src_host.ip4, dst=dst_host.ip4, flags=1, frag=64 + ) else: p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: - p /= ICMPv6EchoRequest(type=self.icmp6_type, - code=self.icmp6_code) + p /= ICMPv6EchoRequest( + type=self.icmp6_type, code=self.icmp6_code + ) else: - p /= ICMP(type=self.icmp4_type, - code=self.icmp4_code) + p /= ICMP(type=self.icmp4_type, code=self.icmp4_code) else: p /= self.create_upper_layer(i, pkt_info.proto, ports) if pkt_raw: @@ -293,8 +309,7 @@ class TestClassifyAcl(TestClassifier): pkts.append(p) return pkts - def verify_capture(self, pg_if, capture, - traffic_type=0, ip_type=0, etype=-1): + def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0, etype=-1): """ Verify captured input packet stream for defined interface. @@ -309,22 +324,21 @@ class TestClassifyAcl(TestClassifier): for packet in capture: if etype > 0: if packet[Ether].type != etype: - self.logger.error(ppp("Unexpected ethertype in packet:", - packet)) + self.logger.error(ppp("Unexpected ethertype in packet:", packet)) else: continue try: # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data if traffic_type == self.ICMP and ip_type == self.IPV6: - payload_info = self.payload_to_info( - packet[ICMPv6EchoRequest].data) + payload_info = self.payload_to_info(packet[ICMPv6EchoRequest].data) payload = packet[ICMPv6EchoRequest] else: payload_info = self.payload_to_info(packet[Raw]) payload = packet[self.proto_map[payload_info.proto]] except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise if ip_type != 0: @@ -338,8 +352,9 @@ class TestClassifyAcl(TestClassifier): self.assertEqual(payload.type, self.icmp6_type) self.assertEqual(payload.code, self.icmp6_code) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise else: try: @@ -349,12 +364,13 @@ class TestClassifyAcl(TestClassifier): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (pg_if.name, payload_info.src, - packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" + % (pg_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -363,29 +379,26 @@ class TestClassifyAcl(TestClassifier): self.assertEqual(ip.src, saved_packet[ip_version].src) self.assertEqual(ip.dst, saved_packet[ip_version].dst) p = self.proto_map[payload_info.proto] - if p == 'TCP': + if p == "TCP": tcp = packet[TCP] - self.assertEqual(tcp.sport, saved_packet[ - TCP].sport) - self.assertEqual(tcp.dport, saved_packet[ - TCP].dport) - elif p == 'UDP': + self.assertEqual(tcp.sport, saved_packet[TCP].sport) + self.assertEqual(tcp.dport, saved_packet[TCP].dport) + elif p == "UDP": udp = packet[UDP] - self.assertEqual(udp.sport, saved_packet[ - UDP].sport) - self.assertEqual(udp.dport, saved_packet[ - UDP].dport) + self.assertEqual(udp.sport, saved_packet[UDP].sport) + self.assertEqual(udp.dport, saved_packet[UDP].dport) except: - self.logger.error(ppp("Unexpected or invalid packet:", - packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i, dst_sw_if_index, last_info[i.sw_if_index]) + i, dst_sw_if_index, last_info[i.sw_if_index] + ) self.assertTrue( remaining_packet is None, - "Port %u: Packet expected from source %u didn't arrive" % - (dst_sw_if_index, i.sw_if_index)) + "Port %u: Packet expected from source %u didn't arrive" + % (dst_sw_if_index, i.sw_if_index), + ) def run_traffic_no_check(self): # Test @@ -400,16 +413,32 @@ class TestClassifyAcl(TestClassifier): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, - frags=False, pkt_raw=True, etype=-1): + def run_verify_test( + self, + traffic_type=0, + ip_type=0, + proto=-1, + ports=0, + frags=False, + pkt_raw=True, + etype=-1, + ): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, pkt_raw, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + pkt_raw, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -424,20 +453,27 @@ class TestClassifyAcl(TestClassifier): if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: capture = dst_if.get_capture(pkts_cnt) - self.logger.info("Verifying capture on interface %s" % - dst_if.name) - self.verify_capture(dst_if, capture, - traffic_type, ip_type, etype) + self.logger.info("Verifying capture on interface %s" % dst_if.name) + self.verify_capture(dst_if, capture, traffic_type, ip_type, etype) - def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0, frags=False, etype=-1): + def run_verify_negat_test( + self, traffic_type=0, ip_type=0, proto=-1, ports=0, frags=False, etype=-1 + ): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): - pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports, - frags, True, etype) + pkts = self.create_stream( + i, + self.pg_if_packet_sizes, + traffic_type, + ip_type, + proto, + ports, + frags, + True, + etype, + ) if len(pkts) > 0: i.add_stream(pkts) @@ -450,101 +486,110 @@ class TestClassifyAcl(TestClassifier): for src_if in self.pg_interfaces: if self.flows.__contains__(src_if): for dst_if in self.flows[src_if]: - self.logger.info("Verifying capture on interface %s" % - dst_if.name) + self.logger.info("Verifying capture on interface %s" % dst_if.name) capture = dst_if.get_capture(0) self.assertEqual(len(capture), 0) - def build_classify_table(self, src_mac='', dst_mac='', ether_type='', - etype='', key='mac', hit_next_index=0xffffffff): + def build_classify_table( + self, + src_mac="", + dst_mac="", + ether_type="", + etype="", + key="mac", + hit_next_index=0xFFFFFFFF, + ): # Basic ACL testing - a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac, - ether_type=ether_type) + a_mask = self.build_mac_mask( + src_mac=src_mac, dst_mac=dst_mac, ether_type=ether_type + ) self.create_classify_table(key, a_mask) for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]: - s_mac = host.mac if src_mac else '' + s_mac = host.mac if src_mac else "" if dst_mac: for dst_if in self.flows[self.pg0]: for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]: self.create_classify_session( - self.pg0, self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=s_mac, - dst_mac=dst_host.mac, - ether_type=etype), - hit_next_index=hit_next_index) + self.pg0, + self.acl_tbl_idx.get(key), + self.build_mac_match( + src_mac=s_mac, dst_mac=dst_host.mac, ether_type=etype + ), + hit_next_index=hit_next_index, + ) else: self.create_classify_session( - self.pg0, self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=s_mac, dst_mac='', - ether_type=etype), - hit_next_index=hit_next_index) + self.pg0, + self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=s_mac, dst_mac="", ether_type=etype), + hit_next_index=hit_next_index, + ) def test_0000_warmup_test(self): - """ Learn the MAC addresses - """ + """Learn the MAC addresses""" self.create_hosts(2) self.run_traffic_no_check() def test_0010_inacl_permit_src_mac(self): - """ Input L2 ACL test - permit source MAC + """Input L2 ACL test - permit source MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_in" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0011_inacl_permit_dst_mac(self): - """ Input L2 ACL test - permit destination MAC + """Input L2 ACL test - permit destination MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with destination MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table(dst_mac='ffffffffffff', key=key) + key = "mac_in" + self.build_classify_table(dst_mac="ffffffffffff", key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0012_inacl_permit_src_dst_mac(self): - """ Input L2 ACL test - permit source and destination MAC + """Input L2 ACL test - permit source and destination MAC Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source and destination MAC addresses. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' + key = "mac_in" self.build_classify_table( - src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key) + src_mac="ffffffffffff", dst_mac="ffffffffffff", key=key + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0013_inacl_permit_ether_type(self): - """ Input L2 ACL test - permit ether_type + """Input L2 ACL test - permit ether_type Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with destination MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table( - ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key) + key = "mac_in" + self.build_classify_table(ether_type="ffff", etype=hex(ETH_P_IP)[2:], key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0015_inacl_deny(self): - """ Input L2 ACL test - deny + """Input L2 ACL test - deny Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -552,57 +597,55 @@ class TestClassifyAcl(TestClassifier): - Create ACL with source MAC address. - Send and verify no received packets on pg1 interface. """ - key = 'mac_in' - self.build_classify_table( - src_mac='ffffffffffff', hit_next_index=0, key=key) + key = "mac_in" + self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_negat_test(self.IP, self.IPV4, -1) def test_0020_outacl_permit(self): - """ Output L2 ACL test - permit + """Output L2 ACL test - permit Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_out' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_out" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) def test_0025_outacl_deny(self): - """ Output L2 ACL test - deny + """Output L2 ACL test - deny Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACL with source MAC address. - Send and verify no received packets on pg1 interface. """ - key = 'mac_out' - self.build_classify_table( - src_mac='ffffffffffff', hit_next_index=0, key=key) + key = "mac_out" + self.build_classify_table(src_mac="ffffffffffff", hit_next_index=0, key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_negat_test(self.IP, self.IPV4, -1) def test_0030_inoutacl_permit(self): - """ Input+Output L2 ACL test - permit + """Input+Output L2 ACL test - permit Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. - Create ACLs with source MAC address. - Send and verify received packets on pg1 interface. """ - key = 'mac_inout' - self.build_classify_table(src_mac='ffffffffffff', key=key) + key = "mac_inout" + self.build_classify_table(src_mac="ffffffffffff", key=key) self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |