summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--test/test_trace_filter.py120
1 files changed, 110 insertions, 10 deletions
diff --git a/test/test_trace_filter.py b/test/test_trace_filter.py
index bde732d289f..feb764c8a9e 100644
--- a/test/test_trace_filter.py
+++ b/test/test_trace_filter.py
@@ -5,6 +5,13 @@ import unittest
from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from scapy.layers.vxlan import VXLAN
+from scapy.layers.geneve import GENEVE
+from scapy.compat import raw
+
class TestTracefilter(VppTestCase):
""" Packet Tracer Filter Test """
@@ -19,9 +26,32 @@ class TestTracefilter(VppTestCase):
def setUp(self):
super(TestTracefilter, self).setUp()
+ self.create_pg_interfaces(range(1))
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
def tearDown(self):
super(TestTracefilter, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig()
+ i.admin_down()
+
+ def cli(self, cmd):
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, 'reply'):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+ return r
+
+ # check number of hits for classifier
+ def assert_hits(self, n):
+ r = self.cli("show classify table verbose 2")
+ self.assertTrue(r.retval == 0)
+ self.assertTrue(hasattr(r, 'reply'))
+ self.assertTrue(r.reply.find("hits %i" % n) != -1)
def test_mactime_unitTest(self):
""" Packet Tracer Filter Test """
@@ -44,21 +74,91 @@ class TestTracefilter(VppTestCase):
"classify filter trace mask l3 ip4 src\n"
" match l3 ip4 src 192.168.1.15",
"trace add pg-input 100 filter",
- "pa en"]
+ "pa en classifyme"]
for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, 'reply'):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
+ self.cli(cmd)
# Check for 9 classifier hits, which is the right answer
- r = self.vapi.cli_return_response("show classify table verbose 2")
+ self.assert_hits(9)
+
+ # cleanup
+ self.cli("pa de classifyme")
+ self.cli("classify filter trace del mask l3 ip4 src "
+ "match l3 ip4 src 192.168.1.15")
+
+ # install a classify rule, inject traffic and check for hits
+ def assert_classify(self, mask, match, packets, n=None):
+ r = self.cli(
+ "classify filter trace mask hex %s match hex %s" %
+ (mask, match))
self.assertTrue(r.retval == 0)
- self.assertTrue(hasattr(r, 'reply'))
- self.assertTrue(r.reply.find("hits 9") != -1)
+ r = self.cli("trace add pg-input %i filter" % len(packets))
+ self.assertTrue(r.retval == 0)
+ self.pg0.add_stream(packets)
+ self.cli("pa en")
+ self.assert_hits(n if n is not None else len(packets))
+ self.cli("clear trace")
+ self.cli(
+ "classify filter trace del mask hex %s match hex %s" %
+ (mask, match))
+
+ def test_encap(self):
+ """ Packet Tracer Filter Test with encap """
+
+ # the packet we are trying to match
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+ UDP() /
+ VXLAN() /
+ Ether() /
+ IP() /
+ UDP() /
+ GENEVE(vni=1234) /
+ Ether() /
+ IP(src='192.168.4.167') /
+ UDP() /
+ Raw('\xa5' * 100))
+
+ #
+ # compute filter mask & value
+ # we compute it by XOR'ing a template packet with a modified packet
+ # we need to set checksums to 0 to make sure scapy will not recompute
+ # them
+ #
+ tmpl = (Ether() /
+ IP(chksum=0) /
+ UDP(chksum=0) /
+ VXLAN() /
+ Ether() /
+ IP(chksum=0) /
+ UDP(chksum=0) /
+ GENEVE(vni=0) /
+ Ether() /
+ IP(src='0.0.0.0', chksum=0))
+ ori = raw(tmpl)
+
+ # the mask
+ tmpl[GENEVE].vni = 0xffffff
+ user = tmpl[GENEVE].payload
+ user[IP].src = '255.255.255.255'
+ new = raw(tmpl)
+ mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+
+ # this does not match (wrong vni)
+ tmpl[GENEVE].vni = 1
+ user = tmpl[GENEVE].payload
+ user[IP].src = '192.168.4.167'
+ new = raw(tmpl)
+ match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+ self.assert_classify(mask, match, [p] * 11, 0)
+
+ # this must match
+ tmpl[GENEVE].vni = 1234
+ new = raw(tmpl)
+ match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+ self.assert_classify(mask, match, [p] * 17)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)