From 31b1a6ce1d427d08b3ae1eac75d702860c255043 Mon Sep 17 00:00:00 2001 From: Alexander Chernavin Date: Fri, 17 Jan 2020 08:31:04 -0500 Subject: tests: add map-t fragmentation verifications Type: test Change-Id: I5522e88ee178d0563c246895393e835d125f1b81 Signed-off-by: Alexander Chernavin --- src/plugins/map/test/test_map.py | 46 +++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 10 deletions(-) (limited to 'src/plugins') diff --git a/src/plugins/map/test/test_map.py b/src/plugins/map/test/test_map.py index 592e49e499a..03913ce1466 100644 --- a/src/plugins/map/test/test_map.py +++ b/src/plugins/map/test/test_map.py @@ -11,8 +11,8 @@ from util import fragment_rfc791, fragment_rfc8200 import scapy.compat from scapy.layers.l2 import Ether from scapy.packet import Raw -from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment -from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded +from scapy.layers.inet import IP, UDP, ICMP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment class TestMAP(VppTestCase): @@ -435,6 +435,26 @@ class TestMAP(VppTestCase): def validate(self, rx, expected): self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected))) + def validate_frag(self, p6_frag, p_ip6_expected): + self.assertFalse(p6_frag.haslayer(IP)) + self.assertTrue(p6_frag.haslayer(IPv6)) + self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment)) + self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src) + self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst) + + def validate_frag_payload_len(self, rx, proto, payload_len_expected): + payload_total = 0 + for p in rx: + payload_total += p[IPv6].plen + + # First fragment has proto + payload_total -= len(proto()) + + # Every fragment has IPv6 fragment header + payload_total -= len(IPv6ExtHdrFragment()) * len(rx) + + self.assertEqual(payload_total, payload_len_expected) + def payload(self, len): return 'x' * len @@ -579,28 +599,34 @@ class TestMAP(VppTestCase): self.send_and_assert_no_replies(self.pg1, p6*1) # Packet fragmentation - payload = UDP(sport=40000, dport=4000) / self.payload(1453) + payload_len = 1453 + payload = UDP(sport=40000, dport=4000) / self.payload(payload_len) p4 = (p_ether / p_ip4 / payload) self.pg_enable_capture() self.pg0.add_stream(p4) self.pg_start() rx = self.pg1.get_capture(2) + + p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0', + dst='2001:db8:1e0::c0a8:1:e') for p in rx: - pass - # TODO: Manual validation - # self.validate(p[1], icmp4_reply) + self.validate_frag(p, p_ip6_translated) + + self.validate_frag_payload_len(rx, UDP, payload_len) # Packet fragmentation send fragments - payload = UDP(sport=40000, dport=4000) / self.payload(1453) + payload = UDP(sport=40000, dport=4000) / self.payload(payload_len) p4 = (p_ether / p_ip4 / payload) - frags = fragment(p4, fragsize=1000) + frags = fragment_rfc791(p4, fragsize=1000) self.pg_enable_capture() self.pg0.add_stream(frags) self.pg_start() rx = self.pg1.get_capture(2) + for p in rx: - pass - # p.show2() + self.validate_frag(p, p_ip6_translated) + + self.validate_frag_payload_len(rx, UDP, payload_len) # reass_pkt = reassemble(rx) # p4_reply.ttl -= 1 -- cgit 1.2.3-korg