diff options
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/map/test/test_map.py | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/plugins/map/test/test_map.py b/src/plugins/map/test/test_map.py index 592e49e499a..03913ce1466 100644 --- a/src/plugins/map/test/test_map.py +++ b/src/plugins/map/test/test_map.py @@ -11,8 +11,8 @@ from util import fragment_rfc791, fragment_rfc8200 import scapy.compat from scapy.layers.l2 import Ether from scapy.packet import Raw -from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment -from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded +from scapy.layers.inet import IP, UDP, ICMP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment class TestMAP(VppTestCase): @@ -435,6 +435,26 @@ class TestMAP(VppTestCase): def validate(self, rx, expected): self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected))) + def validate_frag(self, p6_frag, p_ip6_expected): + self.assertFalse(p6_frag.haslayer(IP)) + self.assertTrue(p6_frag.haslayer(IPv6)) + self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment)) + self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src) + self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst) + + def validate_frag_payload_len(self, rx, proto, payload_len_expected): + payload_total = 0 + for p in rx: + payload_total += p[IPv6].plen + + # First fragment has proto + payload_total -= len(proto()) + + # Every fragment has IPv6 fragment header + payload_total -= len(IPv6ExtHdrFragment()) * len(rx) + + self.assertEqual(payload_total, payload_len_expected) + def payload(self, len): return 'x' * len @@ -579,28 +599,34 @@ class TestMAP(VppTestCase): self.send_and_assert_no_replies(self.pg1, p6*1) # Packet fragmentation - payload = UDP(sport=40000, dport=4000) / self.payload(1453) + payload_len = 1453 + payload = UDP(sport=40000, dport=4000) / self.payload(payload_len) p4 = (p_ether / p_ip4 / payload) self.pg_enable_capture() self.pg0.add_stream(p4) self.pg_start() rx = self.pg1.get_capture(2) + + p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0', + dst='2001:db8:1e0::c0a8:1:e') for p in rx: - pass - # TODO: Manual validation - # self.validate(p[1], icmp4_reply) + self.validate_frag(p, p_ip6_translated) + + self.validate_frag_payload_len(rx, UDP, payload_len) # Packet fragmentation send fragments - payload = UDP(sport=40000, dport=4000) / self.payload(1453) + payload = UDP(sport=40000, dport=4000) / self.payload(payload_len) p4 = (p_ether / p_ip4 / payload) - frags = fragment(p4, fragsize=1000) + frags = fragment_rfc791(p4, fragsize=1000) self.pg_enable_capture() self.pg0.add_stream(frags) self.pg_start() rx = self.pg1.get_capture(2) + for p in rx: - pass - # p.show2() + self.validate_frag(p, p_ip6_translated) + + self.validate_frag_payload_len(rx, UDP, payload_len) # reass_pkt = reassemble(rx) # p4_reply.ttl -= 1 |