diff options
Diffstat (limited to 'test/test_ip6.py')
-rw-r--r-- | test/test_ip6.py | 124 |
1 files changed, 101 insertions, 23 deletions
diff --git a/test/test_ip6.py b/test/test_ip6.py index 21d8025d583..2f19bcf1547 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -18,7 +18,7 @@ from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \ ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \ ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \ ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \ - ICMPv6TimeExceeded + ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \ in6_mactoifaceid, in6_ismaddr from scapy.utils import inet_pton, inet_ntop @@ -172,7 +172,6 @@ class TestIPv6(TestIPv6ND): :ivar list interfaces: pg interfaces and subinterfaces. :ivar dict flows: IPv4 packet flows in test. - :ivar list pg_if_packet_sizes: packet sizes in test. *TODO:* Create AD sub interface """ @@ -195,8 +194,7 @@ class TestIPv6(TestIPv6ND): self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if] # packet sizes - self.pg_if_packet_sizes = [64, 512, 1518, 9018] - self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4] + self.pg_if_packet_sizes = [64, 1500, 9020] self.interfaces = list(self.pg_interfaces) self.interfaces.extend(self.sub_interfaces) @@ -250,27 +248,45 @@ class TestIPv6(TestIPv6ND): (count, percent)) percent += 1 - def create_stream(self, src_if, packet_sizes): + def modify_packet(self, src_if, packet_size, pkt): + """Add load, set destination IP and extend packet to required packet + size for defined interface. + + :param VppInterface src_if: Interface to create packet for. + :param int packet_size: Required packet size. + :param Scapy pkt: Packet to be modified. + """ + dst_if_idx = packet_size / 10 % 2 + dst_if = self.flows[src_if][dst_if_idx] + info = self.create_packet_info(src_if, dst_if) + payload = self.info_to_payload(info) + p = pkt/Raw(payload) + p[IPv6].dst = dst_if.remote_ip6 + info.data = p.copy() + if isinstance(src_if, VppSubInterface): + p = src_if.add_dot1_layer(p) + self.extend_packet(p, packet_size) + + return p + + def create_stream(self, src_if): """Create input packet stream for defined interface. :param VppInterface src_if: Interface to create packet stream for. - :param list packet_sizes: Required packet sizes. """ - pkts = [] - for i in range(0, 257): - dst_if = self.flows[src_if][i % 2] - info = self.create_packet_info(src_if, dst_if) - payload = self.info_to_payload(info) - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) / - UDP(sport=1234, dport=1234) / - Raw(payload)) - info.data = p.copy() - if isinstance(src_if, VppSubInterface): - p = src_if.add_dot1_layer(p) - size = packet_sizes[(i // 2) % len(packet_sizes)] - self.extend_packet(p, size) - pkts.append(p) + hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0 + pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / + IPv6(src=src_if.remote_ip6) / + UDP(sport=1234, dport=1234)) + + pkts = [self.modify_packet(src_if, i, pkt_tmpl) + for i in xrange(self.pg_if_packet_sizes[0], + self.pg_if_packet_sizes[1], 10)] + pkts_b = [self.modify_packet(src_if, i, pkt_tmpl) + for i in xrange(self.pg_if_packet_sizes[1] + hdr_ext, + self.pg_if_packet_sizes[2] + hdr_ext, 50)] + pkts.extend(pkts_b) + return pkts def verify_capture(self, dst_if, capture): @@ -333,11 +349,11 @@ class TestIPv6(TestIPv6ND): - Send and verify received packets on each interface. """ - pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes) + pkts = self.create_stream(self.pg0) self.pg0.add_stream(pkts) for i in self.sub_interfaces: - pkts = self.create_stream(i, self.sub_if_packet_sizes) + pkts = self.create_stream(i) i.parent.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -898,6 +914,68 @@ class TestIPv6(TestIPv6ND): self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0) +class TestICMPv6Echo(VppTestCase): + """ ICMPv6 Echo Test Case """ + + def setUp(self): + super(TestICMPv6Echo, self).setUp() + + # create 1 pg interface + self.create_pg_interfaces(range(1)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + super(TestICMPv6Echo, self).tearDown() + for i in self.pg_interfaces: + i.unconfig_ip6() + i.ip6_disable() + i.admin_down() + + def test_icmpv6_echo(self): + """ VPP replies to ICMPv6 Echo Request + + Test scenario: + + - Receive ICMPv6 Echo Request message on pg0 interface. + - Check outgoing ICMPv6 Echo Reply message on pg0 interface. + """ + + icmpv6_id = 0xb + icmpv6_seq = 5 + icmpv6_data = '\x0a' * 18 + p_echo_request = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IPv6(src=self.pg0.remote_ip6, + dst=self.pg0.local_ip6) / + ICMPv6EchoRequest(id=icmpv6_id, seq=icmpv6_seq, + data=icmpv6_data)) + + self.pg0.add_stream(p_echo_request) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + rx = self.pg0.get_capture(1) + rx = rx[0] + ether = rx[Ether] + ipv6 = rx[IPv6] + icmpv6 = rx[ICMPv6EchoReply] + + self.assertEqual(ether.src, self.pg0.local_mac) + self.assertEqual(ether.dst, self.pg0.remote_mac) + + self.assertEqual(ipv6.src, self.pg0.local_ip6) + self.assertEqual(ipv6.dst, self.pg0.remote_ip6) + + self.assertEqual(icmp6types[icmpv6.type], "Echo Reply") + self.assertEqual(icmpv6.id, icmpv6_id) + self.assertEqual(icmpv6.seq, icmpv6_seq) + self.assertEqual(icmpv6.data, icmpv6_data) + + class TestIPv6RD(TestIPv6ND): """ IPv6 Router Discovery Test Case """ |