aboutsummaryrefslogtreecommitdiffstats
path: root/test/template_classifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/template_classifier.py')
-rw-r--r--test/template_classifier.py256
1 files changed, 159 insertions, 97 deletions
diff --git a/test/template_classifier.py b/test/template_classifier.py
index b8f79b4f99f..88aa1465830 100644
--- a/test/template_classifier.py
+++ b/test/template_classifier.py
@@ -3,8 +3,8 @@
import binascii
import socket
from socket import AF_INET, AF_INET6
-import unittest
import sys
+from dataclasses import dataclass
from framework import VppTestCase
@@ -15,14 +15,25 @@ from scapy.layers.inet6 import IPv6
from util import ppp
-class TestClassifier(VppTestCase):
+@dataclass
+class VarMask:
+ offset: int
+ spec: str
+
+
+@dataclass
+class VarMatch:
+ offset: int
+ value: int
+ length: int
+
+class TestClassifier(VppTestCase):
@staticmethod
def _resolve_mask_match(mask_match):
mask_match = binascii.unhexlify(mask_match)
mask_match_len = ((len(mask_match) - 1) // 16 + 1) * 16
- mask_match = mask_match + b'\0' * \
- (mask_match_len - len(mask_match))
+ mask_match = mask_match + b"\0" * (mask_match_len - len(mask_match))
return mask_match, mask_match_len
@classmethod
@@ -33,7 +44,7 @@ class TestClassifier(VppTestCase):
variables and configure VPP.
"""
super(TestClassifier, cls).setUpClass()
- cls.acl_active_table = ''
+ cls.acl_active_table = ""
cls.af = AF_INET
def setUp(self):
@@ -97,19 +108,17 @@ class TestClassifier(VppTestCase):
self.logger.info(self.vapi.cli("show classify table verbose"))
self.logger.info(self.vapi.cli("show ip fib"))
+ self.logger.info(self.vapi.cli("show error"))
- acl_active_table = 'ip_out'
- if self.af == AF_INET6:
- acl_active_table = 'ip6_out'
-
- if self.acl_active_table == acl_active_table:
+ if self.acl_active_table.endswith("out"):
self.output_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
- elif self.acl_active_table != '':
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ elif self.acl_active_table != "":
self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
+ self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0
+ )
+ self.acl_active_table = ""
for intf in self.interfaces:
if self.af == AF_INET:
@@ -121,7 +130,7 @@ class TestClassifier(VppTestCase):
super(TestClassifier, self).tearDown()
@staticmethod
- def build_mac_match(dst_mac='', src_mac='', ether_type=''):
+ def build_mac_match(dst_mac="", src_mac="", ether_type=""):
"""Build MAC ACL match data with hexstring format.
:param str dst_mac: source MAC address <x:x:x:x:x:x>
@@ -129,15 +138,16 @@ class TestClassifier(VppTestCase):
:param str ether_type: ethernet type <0-ffff>
"""
if dst_mac:
- dst_mac = dst_mac.replace(':', '')
+ dst_mac = dst_mac.replace(":", "")
if src_mac:
- src_mac = src_mac.replace(':', '')
+ src_mac = src_mac.replace(":", "")
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip('0')
+ return (
+ "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
+ ).rstrip()
@staticmethod
- def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
+ def build_mac_mask(dst_mac="", src_mac="", ether_type=""):
"""Build MAC ACL mask data with hexstring format.
:param str dst_mac: source MAC address <0-ffffffffffff>
@@ -145,12 +155,12 @@ class TestClassifier(VppTestCase):
:param str ether_type: ethernet type <0-ffff>
"""
- return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip('0')
+ return (
+ "{!s:0>12}{!s:0>12}{!s:0>4}".format(dst_mac, src_mac, ether_type)
+ ).rstrip()
@staticmethod
- def build_ip_mask(proto='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
+ def build_ip_mask(proto="", src_ip="", dst_ip="", src_port="", dst_port=""):
"""Build IP ACL mask data with hexstring format.
:param str proto: protocol number <0-ff>
@@ -160,12 +170,14 @@ class TestClassifier(VppTestCase):
:param str dst_port: destination port number <0-ffff>
"""
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ return (
+ "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
+ proto, src_ip, dst_ip, src_port, dst_port
+ )
+ ).rstrip("0")
@staticmethod
- def build_ip6_mask(nh='', src_ip='', dst_ip='',
- src_port='', dst_port=''):
+ def build_ip6_mask(nh="", src_ip="", dst_ip="", src_port="", dst_port=""):
"""Build IPv6 ACL mask data with hexstring format.
:param str nh: next header number <0-ff>
@@ -175,12 +187,26 @@ class TestClassifier(VppTestCase):
:param str dst_port: destination port number <0-ffff>
"""
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ return (
+ "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
+ nh, src_ip, dst_ip, src_port, dst_port
+ )
+ ).rstrip("0")
@staticmethod
- def build_ip_match(proto=0, src_ip='', dst_ip='',
- src_port=0, dst_port=0):
+ def build_payload_mask(masks):
+ payload_mask = ""
+
+ for mask in masks:
+ # offset is specified in bytes, convert to hex format.
+ length = (mask.offset * 2) + len(mask.spec)
+ format_spec = "{!s:0>" + str(length) + "}"
+ payload_mask += format_spec.format(mask.spec)
+
+ return payload_mask.rstrip("0")
+
+ @staticmethod
+ def build_ip_match(proto=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
"""Build IP ACL match data with hexstring format.
:param int proto: protocol number with valid option "x"
@@ -190,17 +216,18 @@ class TestClassifier(VppTestCase):
:param int dst_port: destination port number "x"
"""
if src_ip:
- src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
+ src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode("ascii")
if dst_ip:
- dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
+ dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode("ascii")
- return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ return (
+ "{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}".format(
+ hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
+ )
+ ).rstrip("0")
@staticmethod
- def build_ip6_match(nh=0, src_ip='', dst_ip='',
- src_port=0, dst_port=0):
+ def build_ip6_match(nh=0, src_ip="", dst_ip="", src_port=0, dst_port=0):
"""Build IPv6 ACL match data with hexstring format.
:param int nh: next header number with valid option "x"
@@ -212,24 +239,44 @@ class TestClassifier(VppTestCase):
"""
if src_ip:
if sys.version_info[0] == 2:
- src_ip = binascii.hexlify(socket.inet_pton(
- socket.AF_INET6, src_ip))
+ src_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, src_ip))
else:
src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
if dst_ip:
if sys.version_info[0] == 2:
- dst_ip = binascii.hexlify(socket.inet_pton(
- socket.AF_INET6, dst_ip))
+ dst_ip = binascii.hexlify(socket.inet_pton(socket.AF_INET6, dst_ip))
else:
dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
- return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ return (
+ "{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}".format(
+ hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], hex(dst_port)[2:]
+ )
+ ).rstrip("0")
- def create_stream(self, src_if, dst_if, packet_sizes,
- proto_l=UDP(sport=1234, dport=5678)):
+ @staticmethod
+ def build_payload_match(matches):
+ payload_match = ""
+
+ for match in matches:
+ sval = str(hex(match.value)[2:])
+ # offset is specified in bytes, convert to hex format.
+ length = (match.offset + match.length) * 2
+
+ format_spec = "{!s:0>" + str(length) + "}"
+ payload_match += format_spec.format(sval)
+
+ return payload_match.rstrip("0")
+
+ def create_stream(
+ self,
+ src_if,
+ dst_if,
+ packet_sizes,
+ proto_l=UDP(sport=1234, dport=5678),
+ payload_ex=None,
+ ):
"""Create input packet stream for defined interfaces.
:param VppInterface src_if: Source Interface for packet stream.
@@ -242,16 +289,25 @@ class TestClassifier(VppTestCase):
for size in packet_sizes:
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
+
+ # append any additional payload after info
+ if payload_ex is not None:
+ payload += payload_ex
+
if self.af == AF_INET:
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
- proto_l /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / proto_l
+ / Raw(payload)
+ )
elif self.af == AF_INET6:
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
- proto_l /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / proto_l
+ / Raw(payload)
+ )
info.data = p.copy()
self.extend_packet(p, size)
pkts.append(p)
@@ -280,11 +336,12 @@ class TestClassifier(VppTestCase):
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
- "Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ "Got packet on port %s: src=%u (id=%u)"
+ % (dst_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
@@ -301,17 +358,21 @@ class TestClassifier(VppTestCase):
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
-
- def create_classify_table(self, key, mask, data_offset=0):
+ i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
+ )
+ self.assertTrue(
+ remaining_packet is None,
+ "Interface %s: Packet expected from interface %s "
+ "didn't arrive" % (dst_if.name, i.name),
+ )
+
+ def create_classify_table(self, key, mask, data_offset=0, next_table_index=None):
"""Create Classify Table
:param str key: key for classify table (ex, ACL name).
:param str mask: mask value for interested traffic.
:param int data_offset:
+ :param str next_table_index
"""
mask_match, mask_match_len = self._resolve_mask_match(mask)
r = self.vapi.classify_add_del_table(
@@ -321,12 +382,15 @@ class TestClassifier(VppTestCase):
match_n_vectors=(len(mask) - 1) // 32 + 1,
miss_next_index=0,
current_data_flag=1,
- current_data_offset=data_offset)
- self.assertIsNotNone(r, 'No response msg for add_del_table')
+ current_data_offset=data_offset,
+ next_table_index=next_table_index,
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_table")
self.acl_tbl_idx[key] = r.new_table_index
- def create_classify_session(self, table_index, match, pbr_option=0,
- vrfid=0, is_add=1):
+ def create_classify_session(
+ self, table_index, match, pbr_option=0, vrfid=0, is_add=1
+ ):
"""Create Classify Session
:param int table_index: table index to identify classify table.
@@ -344,8 +408,9 @@ class TestClassifier(VppTestCase):
match_len=mask_match_len,
opaque_index=0,
action=pbr_option,
- metadata=vrfid)
- self.assertIsNotNone(r, 'No response msg for add_del_session')
+ metadata=vrfid,
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_session")
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
@@ -358,20 +423,17 @@ class TestClassifier(VppTestCase):
r = None
if self.af == AF_INET:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip4_table_index=table_index)
+ is_add, intf.sw_if_index, ip4_table_index=table_index
+ )
elif self.af == AF_INET6:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip6_table_index=table_index)
+ is_add, intf.sw_if_index, ip6_table_index=table_index
+ )
else:
r = self.vapi.input_acl_set_interface(
- is_add,
- intf.sw_if_index,
- l2_table_index=table_index)
- self.assertIsNotNone(r, 'No response msg for acl_set_interface')
+ is_add, intf.sw_if_index, l2_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for acl_set_interface")
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
@@ -384,20 +446,17 @@ class TestClassifier(VppTestCase):
r = None
if self.af == AF_INET:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip4_table_index=table_index)
+ is_add, intf.sw_if_index, ip4_table_index=table_index
+ )
elif self.af == AF_INET6:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- ip6_table_index=table_index)
+ is_add, intf.sw_if_index, ip6_table_index=table_index
+ )
else:
r = self.vapi.output_acl_set_interface(
- is_add,
- intf.sw_if_index,
- l2_table_index=table_index)
- self.assertIsNotNone(r, 'No response msg for acl_set_interface')
+ is_add, intf.sw_if_index, l2_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for acl_set_interface")
def config_pbr_fib_entry(self, intf, is_add=1):
"""Configure fib entry to route traffic toward PBR VRF table
@@ -406,10 +465,13 @@ class TestClassifier(VppTestCase):
"""
addr_len = 24
- self.vapi.ip_add_del_route(dst_address=intf.local_ip4,
- dst_address_length=addr_len,
- next_hop_address=intf.remote_ip4,
- table_id=self.pbr_vrfid, is_add=is_add)
+ self.vapi.ip_add_del_route(
+ dst_address=intf.local_ip4,
+ dst_address_length=addr_len,
+ next_hop_address=intf.remote_ip4,
+ table_id=self.pbr_vrfid,
+ is_add=is_add,
+ )
def verify_vrf(self, vrf_id):
"""