From 3f844d0bc900e5db40ba74724e2b61e7943682d3 Mon Sep 17 00:00:00 2001 From: Neale Ranns Date: Sat, 18 Feb 2017 00:03:54 -0800 Subject: Proxy ND (RFC4389 - or a sub-set thereof). This allows the 'emulation' of bridging. That is hosts in one sub-net reachable via differenet interfaces. Introducate a new API command: ip6 nd proxy this indicates 2 things; 1) that host is reachable out of interface . VPP will thus install that route. 2) NS requests sent to will be responeded to (i.e. proxied). Change-Id: I863f967fdb5097ab3b574769c70afdbfc8d5478a Signed-off-by: Neale Ranns --- test/test_ip6.py | 293 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 276 insertions(+), 17 deletions(-) (limited to 'test/test_ip6.py') diff --git a/test/test_ip6.py b/test/test_ip6.py index b8278329..070e2d7d 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -1,17 +1,20 @@ #!/usr/bin/env python import unittest -import socket +from socket import AF_INET6 from framework import VppTestCase, VppTestRunner from vpp_sub_interface import VppSubInterface, VppDot1QSubint from vpp_pg_interface import is_ipv6_misc +from vpp_neighbor import find_nbr from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \ ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \ - ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo + ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \ + ICMPv6ND_NA, ICMPv6NDOptDstLLAddr + from util import ppp from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \ in6_mactoifaceid, in6_ismaddr @@ -24,7 +27,71 @@ def mk_ll_addr(mac): return addr -class TestIPv6(VppTestCase): +class TestIPv6ND(VppTestCase): + def validate_ra(self, intf, rx, dst_ip=None): + if not dst_ip: + dst_ip = intf.remote_ip6 + + # unicasted packets must come to the unicast mac + self.assertEqual(rx[Ether].dst, intf.remote_mac) + + # and from the router's MAC + self.assertEqual(rx[Ether].src, intf.local_mac) + + # the rx'd RA should be addressed to the sender's source + self.assertTrue(rx.haslayer(ICMPv6ND_RA)) + self.assertEqual(in6_ptop(rx[IPv6].dst), + in6_ptop(dst_ip)) + + # and come from the router's link local + self.assertTrue(in6_islladdr(rx[IPv6].src)) + self.assertEqual(in6_ptop(rx[IPv6].src), + in6_ptop(mk_ll_addr(intf.local_mac))) + + def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None): + if not dst_ip: + dst_ip = intf.remote_ip6 + if not tgt_ip: + dst_ip = intf.local_ip6 + + # unicasted packets must come to the unicast mac + self.assertEqual(rx[Ether].dst, intf.remote_mac) + + # and from the router's MAC + self.assertEqual(rx[Ether].src, intf.local_mac) + + # the rx'd NA should be addressed to the sender's source + self.assertTrue(rx.haslayer(ICMPv6ND_NA)) + self.assertEqual(in6_ptop(rx[IPv6].dst), + in6_ptop(dst_ip)) + + # and come from the target address + self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip)) + + # Dest link-layer options should have the router's MAC + dll = rx[ICMPv6NDOptDstLLAddr] + self.assertEqual(dll.lladdr, intf.local_mac) + + def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None, + filter_out_fn=is_ipv6_misc): + intf.add_stream(pkts) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = intf.get_capture(1, filter_out_fn=filter_out_fn) + + self.assertEqual(len(rx), 1) + rx = rx[0] + self.validate_ra(intf, rx, dst_ip) + + def send_and_assert_no_replies(self, intf, pkts, remark): + intf.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + intf.assert_nothing_captured(remark=remark) + + +class TestIPv6(TestIPv6ND): """ IPv6 Test Case """ @classmethod @@ -112,7 +179,7 @@ class TestIPv6(VppTestCase): n_int = len(self.interfaces) percent = 0 counter = 0.0 - dest_addr = socket.inet_pton(socket.AF_INET6, "fd02::1") + dest_addr = inet_pton(AF_INET6, "fd02::1") dest_addr_len = 128 for i in self.interfaces: next_hop_address = i.local_ip6n @@ -225,12 +292,6 @@ class TestIPv6(VppTestCase): pkts = i.parent.get_capture() self.verify_capture(i, pkts) - def send_and_assert_no_replies(self, intf, pkts, remark): - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - intf.assert_nothing_captured(remark=remark) - def test_ns(self): """ IPv6 Neighbour Solicitation Exceptions @@ -243,8 +304,8 @@ class TestIPv6(VppTestCase): # # An NS from a non link source address # - nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) - d = inet_ntop(socket.AF_INET6, nsma) + nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(AF_INET6, nsma) p = (Ether(dst=in6_getnsmac(nsma)) / IPv6(dst=d, src="2002::2") / @@ -261,8 +322,8 @@ class TestIPv6(VppTestCase): # not a member of FAILS # if 0: - nsma = in6_getnsma(inet_pton(socket.AF_INET6, "fd::ffff")) - d = inet_ntop(socket.AF_INET6, nsma) + nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff")) + d = inet_ntop(AF_INET6, nsma) p = (Ether(dst=in6_getnsmac(nsma)) / IPv6(dst=d, src=self.pg0.remote_ip6) / @@ -277,8 +338,8 @@ class TestIPv6(VppTestCase): # # An NS whose target address is one the router does not own # - nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6)) - d = inet_ntop(socket.AF_INET6, nsma) + nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(AF_INET6, nsma) p = (Ether(dst=in6_getnsmac(nsma)) / IPv6(dst=d, src=self.pg0.remote_ip6) / @@ -418,7 +479,7 @@ class TestIPv6(VppTestCase): # Send the RS multicast # self.pg0.ip6_ra_config(send_unicast=1) - dmac = in6_getnsmac(inet_pton(socket.AF_INET6, "ff02::2")) + dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2")) ll = mk_ll_addr(self.pg0.remote_mac) p = (Ether(dst=dmac, src=self.pg0.remote_mac) / IPv6(dst="ff02::2", src=ll) / @@ -629,5 +690,203 @@ class TestIPv6(VppTestCase): # self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0) + +class IPv6NDProxyTest(TestIPv6ND): + """ IPv6 ND ProxyTest Case """ + + def setUp(self): + super(IPv6NDProxyTest, self).setUp() + + # create 3 pg interfaces + self.create_pg_interfaces(range(3)) + + # pg0 is the master interface, with the configured subnet + self.pg0.admin_up() + self.pg0.config_ip6() + self.pg0.resolve_ndp() + + self.pg1.ip6_enable() + self.pg2.ip6_enable() + + def tearDown(self): + super(IPv6NDProxyTest, self).tearDown() + + def test_nd_proxy(self): + """ IPv6 Proxy ND """ + + # + # Generate some hosts in the subnet that we are proxying + # + self.pg0.generate_remote_hosts(8) + + nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6)) + d = inet_ntop(AF_INET6, nsma) + + # + # Send an NS for one of those remote hosts on one of the proxy links + # expect no response since it's from an address that is not + # on the link that has the prefix configured + # + ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) / + IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) / + ICMPv6ND_NS(tgt=self.pg0.local_ip6) / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)) + + self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS") + + # + # Add proxy support for the host + # + self.vapi.ip6_nd_proxy( + inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), + self.pg1.sw_if_index) + + # + # try that NS again. this time we expect an NA back + # + self.pg1.add_stream(ns_pg1) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = self.pg1.get_capture(1) + + self.validate_na(self.pg1, rx[0], + dst_ip=self.pg0._remote_hosts[2].ip6, + tgt_ip=self.pg0.local_ip6) + + # + # ... and that we have an entry in the ND cache + # + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg0._remote_hosts[2].ip6, + inet=AF_INET6)) + + # + # ... and we can route traffic to it + # + t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(dst=self.pg0._remote_hosts[2].ip6, + src=self.pg0.remote_ip6) / + UDP(sport=10000, dport=20000) / + Raw('\xa5' * 100)) + + self.pg0.add_stream(t) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = self.pg1.get_capture(1) + rx = rx[0] + + self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac) + self.assertEqual(rx[Ether].src, self.pg1.local_mac) + + self.assertEqual(rx[IPv6].src, t[IPv6].src) + self.assertEqual(rx[IPv6].dst, t[IPv6].dst) + + # + # Test we proxy for the host on the main interface + # + ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) / + IPv6(dst=d, src=self.pg0.remote_ip6) / + ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) + + self.pg0.add_stream(ns_pg0) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = self.pg0.get_capture(1) + + self.validate_na(self.pg0, rx[0], + tgt_ip=self.pg0._remote_hosts[2].ip6, + dst_ip=self.pg0.remote_ip6) + + # + # Setup and resolve proxy for another host on another interface + # + ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) / + IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) / + ICMPv6ND_NS(tgt=self.pg0.local_ip6) / + ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)) + + self.vapi.ip6_nd_proxy( + inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), + self.pg2.sw_if_index) + + self.pg2.add_stream(ns_pg2) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = self.pg2.get_capture(1) + + self.validate_na(self.pg2, rx[0], + dst_ip=self.pg0._remote_hosts[3].ip6, + tgt_ip=self.pg0.local_ip6) + + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg0._remote_hosts[3].ip6, + inet=AF_INET6)) + + # + # hosts can communicate. pg2->pg1 + # + t2 = (Ether(dst=self.pg2.local_mac, + src=self.pg0.remote_hosts[3].mac) / + IPv6(dst=self.pg0._remote_hosts[2].ip6, + src=self.pg0._remote_hosts[3].ip6) / + UDP(sport=10000, dport=20000) / + Raw('\xa5' * 100)) + + self.pg2.add_stream(t2) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = self.pg1.get_capture(1) + rx = rx[0] + + self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac) + self.assertEqual(rx[Ether].src, self.pg1.local_mac) + + self.assertEqual(rx[IPv6].src, t2[IPv6].src) + self.assertEqual(rx[IPv6].dst, t2[IPv6].dst) + + # + # remove the proxy configs + # + self.vapi.ip6_nd_proxy( + inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6), + self.pg1.sw_if_index, + is_del=1) + self.vapi.ip6_nd_proxy( + inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6), + self.pg2.sw_if_index, + is_del=1) + + self.assertFalse(find_nbr(self, + self.pg2.sw_if_index, + self.pg0._remote_hosts[3].ip6, + inet=AF_INET6)) + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg0._remote_hosts[2].ip6, + inet=AF_INET6)) + + # + # no longer proxy-ing... + # + self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured") + self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured") + self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured") + + # + # no longer forwarding. traffic generates NS out of the glean/main + # interface + # + self.pg2.add_stream(t2) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + rx = self.pg0.get_capture(1) + + self.assertTrue(rx[0].haslayer(ICMPv6ND_NS)) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg