diff options
Diffstat (limited to 'test/test_mtu.py')
-rw-r--r-- | test/test_mtu.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/test/test_mtu.py b/test/test_mtu.py new file mode 100644 index 00000000000..abf0c94b268 --- /dev/null +++ b/test/test_mtu.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +"""IP4 and IP6 MTU functional tests""" + +# +# Add tests for: +# - sub interfaces +# - Verify that adjacencies inherit MTU correctly +# - Verify that sub-interfaces inherit MTU correctly +# - Different types of interfaces? +# +import unittest +from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig +from scapy.layers.inet import ICMP +from framework import VppTestCase, VppTestRunner +from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto +from socket import AF_INET, AF_INET6, inet_pton +import StringIO + +""" Test_mtu is a subclass of VPPTestCase classes. + MTU tests. +""" + + +def reassemble(listoffragments): + buffer = StringIO.StringIO() + first = listoffragments[0] + buffer.seek(20) + for pkt in listoffragments: + # pkt.show2() + buffer.seek(pkt[IP].frag*8) + buffer.write(pkt[IP].payload) + first.len = len(buffer.getvalue()) + 20 + first.flags = 0 + del(first.chksum) + header = str(first[IP])[:20] + return first[IP].__class__(header + buffer.getvalue()) + + +class TestMTU(VppTestCase): + """ MTU Test Case """ + + @classmethod + def setUpClass(cls): + super(TestMTU, cls).setUpClass() + cls.create_pg_interfaces(range(2)) + cls.interfaces = list(cls.pg_interfaces) + + def setUp(cls): + super(TestMTU, cls).setUp() + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.config_ip6() + i.disable_ipv6_ra() + i.resolve_arp() + i.resolve_ndp() + + def tearDown(self): + super(TestMTU, self).tearDown() + if not self.vpp_dead: + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + + def validate(self, rx, expected): + self.assertEqual(rx, expected.__class__(str(expected))) + + def validate_bytes(self, rx, expected): + self.assertEqual(rx, expected) + + def payload(self, len): + return 'x' * len + + def get_mtu(self, sw_if_index): + rv = self.vapi.sw_interface_dump() + for i in rv: + if i.sw_if_index == sw_if_index: + return i.link_mtu + return 0 + + def test_ip4_mtu(self): + """ IP4 MTU test """ + + # + # TODO: Link MTU is 216 bytes 'off'. Fix when L3 MTU patches committed + # + mtu_offset = 216 + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, + flags='DF') + + # TODO: Re-enable when MTU fixes are committed + current_mtu = self.get_mtu(self.pg1.sw_if_index) + current_mtu -= mtu_offset + + p_payload = UDP(sport=1234, dport=1234) / self.payload( + current_mtu - 20 - 8) + + p4 = p_ether / p_ip4 / p_payload + p4_reply = p_ip4 / p_payload + p4_reply.ttl -= 1 + rx = self.send_and_expect(self.pg0, p4*11, self.pg1) + for p in rx: + self.validate(p[1], p4_reply) + + # MTU + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 576 + mtu_offset) + self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index) - mtu_offset) + + # Should fail. Too large MTU + p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed', + nexthopmtu=576, chksum=0x2dbb) + icmp4_reply = (IP(src=self.pg0.local_ip4, + dst=self.pg0.remote_ip4, + ttl=254, len=576, id=0) / + p_icmp4 / p_ip4 / p_payload) + icmp4_reply[1].ttl -= 1 + n = icmp4_reply.__class__(str(icmp4_reply)) + s = str(icmp4_reply) + icmp4_reply = s[0:576] + rx = self.send_and_expect(self.pg0, p4*11, self.pg0) + for p in rx: + # p.show2() + # n.show2() + self.validate_bytes(str(p[1]), icmp4_reply) + + ''' + # Now with DF off. Expect fragments. + # First go with 1500 byte packets. + p_payload = UDP(sport=1234, dport=1234) / self.payload( + 1500 - 20 - 8) + p4 = p_ether / p_ip4 / p_payload + p4.flags = 0 + p4_reply = p_ip4 / p_payload + p4_reply.ttl = 62 # check this + p4_reply.flags = 0 + p4_reply.id = 256 + self.pg_enable_capture() + self.pg0.add_stream(p4*1) + self.pg_start() + rx = self.pg1.get_capture(3) + print('RX', len(rx)) + reass_pkt = reassemble(rx) + self.validate(reass_pkt, p4_reply) + ''' + # Now what happens with a 9K frame + ''' + p_payload = UDP(sport=1234, dport=1234) / self.payload( + current_mtu - 20 - 8) + p4 = p_ether / p_ip4 / p_payload + p4.flags = 0 + p4_reply = p_ip4 / p_payload + p4_reply.ttl = 62 # check this + p4_reply.flags = 0 + p4_reply.id = 512 + + self.pg_enable_capture() + self.pg0.add_stream(p4*1) + self.pg_start() + rx = self.pg1.get_capture(16) + reass_pkt = reassemble(rx) + reass_pkt.show2() + p4_reply.show2() + self.validate(reass_pkt, p4_reply) + ''' + # Reset MTU + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, current_mtu) + + @unittest.skip("Enable when IPv6 fragmentation is added") + def test_ip6_mtu(self): + """ IP6 MTU test """ + + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) + + current_mtu = self.get_mtu(self.pg1.sw_if_index) + p_payload = UDP(sport=1234, dport=1234) / self.payload( + current_mtu - 40 - 8) + + p6 = p_ether / p_ip6 / p_payload + p6_reply = p_ip6 / p_payload + p6_reply.hlim -= 1 + rx = self.send_and_expect(self.pg0, p6*9, self.pg1) + for p in rx: + self.validate(p[1], p6_reply) + + # MTU (only checked on encap) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 1280) + self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index)) + + # Should fail. Too large MTU + p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4c7a) + icmp6_reply = (IPv6(src=self.pg0.local_ip6, + dst=self.pg0.remote_ip6, + hlim=254, plen=1240) / + p_icmp6 / p_ip6 / p_payload) + icmp6_reply[2].hlim -= 1 + n = icmp6_reply.__class__(str(icmp6_reply)) + s = str(icmp6_reply) + icmp6_reply = s[0:1280] + + rx = self.send_and_expect(self.pg0, p6*9, self.pg0) + for p in rx: + self.validate_bytes(str(p[1]), icmp6_reply) + + # Reset MTU + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, current_mtu) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |