diff options
Diffstat (limited to 'test/test_dhcp6.py')
-rw-r--r-- | test/test_dhcp6.py | 805 |
1 files changed, 805 insertions, 0 deletions
diff --git a/test/test_dhcp6.py b/test/test_dhcp6.py new file mode 100644 index 00000000000..57eb113fb13 --- /dev/null +++ b/test/test_dhcp6.py @@ -0,0 +1,805 @@ +from socket import AF_INET6, inet_ntop, inet_pton + +from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \ + DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \ + DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \ + DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \ + DHCP6OptIAAddress +from scapy.layers.inet6 import IPv6, Ether, UDP +from scapy.utils6 import in6_mactoifaceid + +from framework import tag_fixme_vpp_workers +from framework import VppTestCase +from framework import tag_run_solo +from vpp_papi import VppEnum +import util +import os + + +def ip6_normalize(ip6): + return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6)) + + +class TestDHCPv6DataPlane(VppTestCase): + """ DHCPv6 Data Plane Test Case """ + + @classmethod + def setUpClass(cls): + super(TestDHCPv6DataPlane, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestDHCPv6DataPlane, cls).tearDownClass() + + def setUp(self): + super(TestDHCPv6DataPlane, self).setUp() + + self.create_pg_interfaces(range(1)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + i.config_ip6() + + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) + + def tearDown(self): + for i in self.interfaces: + i.unconfig_ip6() + i.admin_down() + super(TestDHCPv6DataPlane, self).tearDown() + + def test_dhcp_ia_na_send_solicit_receive_advertise(self): + """ Verify DHCPv6 IA NA Solicit packet and Advertise event """ + + self.vapi.dhcp6_clients_enable_disable(enable=1) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + address = {'address': '1:2:3::5', + 'preferred_time': 60, + 'valid_time': 120} + self.vapi.dhcp6_send_client_message( + server_index=0xffffffff, + mrc=1, + msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT, + sw_if_index=self.pg0.sw_if_index, + T1=20, + T2=40, + addresses=[address], + n_addresses=len( + [address])) + rx_list = self.pg0.get_capture(1) + self.assertEqual(len(rx_list), 1) + packet = rx_list[0] + + self.assertEqual(packet.haslayer(IPv6), 1) + self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) + + client_duid = packet[DHCP6OptClientId].duid + trid = packet[DHCP6_Solicit].trid + + dst = ip6_normalize(packet[IPv6].dst) + dst2 = ip6_normalize("ff02::1:2") + self.assert_equal(dst, dst2) + src = ip6_normalize(packet[IPv6].src) + src2 = ip6_normalize(self.pg0.local_ip6_ll) + self.assert_equal(src, src2) + ia_na = packet[DHCP6OptIA_NA] + self.assert_equal(ia_na.T1, 20) + self.assert_equal(ia_na.T2, 40) + self.assert_equal(len(ia_na.ianaopts), 1) + address = ia_na.ianaopts[0] + self.assert_equal(address.addr, '1:2:3::5') + self.assert_equal(address.preflft, 60) + self.assert_equal(address.validlft, 120) + + self.vapi.want_dhcp6_reply_events(enable_disable=1, + pid=os.getpid()) + + try: + ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60, + validlft=120) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), + dst=self.pg0.local_ip6_ll) / + UDP(sport=547, dport=546) / + DHCP6_Advertise(trid=trid) / + DHCP6OptServerId(duid=self.server_duid) / + DHCP6OptClientId(duid=client_duid) / + DHCP6OptPref(prefval=7) / + DHCP6OptStatusCode(statuscode=1) / + DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts) + ) + self.pg0.add_stream([p]) + self.pg_start() + + ev = self.vapi.wait_for_event(1, "dhcp6_reply_event") + + self.assert_equal(ev.preference, 7) + self.assert_equal(ev.status_code, 1) + self.assert_equal(ev.T1, 20) + self.assert_equal(ev.T2, 40) + + reported_address = ev.addresses[0] + address = ia_na_opts.getfieldval("addr") + self.assert_equal(str(reported_address.address), address) + self.assert_equal(reported_address.preferred_time, + ia_na_opts.getfieldval("preflft")) + self.assert_equal(reported_address.valid_time, + ia_na_opts.getfieldval("validlft")) + + finally: + self.vapi.want_dhcp6_reply_events(enable_disable=0) + self.vapi.dhcp6_clients_enable_disable(enable=0) + + def test_dhcp_pd_send_solicit_receive_advertise(self): + """ Verify DHCPv6 PD Solicit packet and Advertise event """ + + self.vapi.dhcp6_clients_enable_disable(enable=1) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + prefix = {'prefix': {'address': '1:2:3::', 'len': 50}, + 'preferred_time': 60, + 'valid_time': 120} + prefixes = [prefix] + self.vapi.dhcp6_pd_send_client_message( + server_index=0xffffffff, + mrc=1, + msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT, + sw_if_index=self.pg0.sw_if_index, + T1=20, + T2=40, + prefixes=prefixes, + n_prefixes=len(prefixes)) + rx_list = self.pg0.get_capture(1) + self.assertEqual(len(rx_list), 1) + packet = rx_list[0] + + self.assertEqual(packet.haslayer(IPv6), 1) + self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) + + client_duid = packet[DHCP6OptClientId].duid + trid = packet[DHCP6_Solicit].trid + + dst = ip6_normalize(packet[IPv6].dst) + dst2 = ip6_normalize("ff02::1:2") + self.assert_equal(dst, dst2) + src = ip6_normalize(packet[IPv6].src) + src2 = ip6_normalize(self.pg0.local_ip6_ll) + self.assert_equal(src, src2) + ia_pd = packet[DHCP6OptIA_PD] + self.assert_equal(ia_pd.T1, 20) + self.assert_equal(ia_pd.T2, 40) + self.assert_equal(len(ia_pd.iapdopt), 1) + prefix = ia_pd.iapdopt[0] + self.assert_equal(prefix.prefix, '1:2:3::') + self.assert_equal(prefix.plen, 50) + self.assert_equal(prefix.preflft, 60) + self.assert_equal(prefix.validlft, 120) + + self.vapi.want_dhcp6_pd_reply_events(enable_disable=1, + pid=os.getpid()) + + try: + ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60, + validlft=120) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), + dst=self.pg0.local_ip6_ll) / + UDP(sport=547, dport=546) / + DHCP6_Advertise(trid=trid) / + DHCP6OptServerId(duid=self.server_duid) / + DHCP6OptClientId(duid=client_duid) / + DHCP6OptPref(prefval=7) / + DHCP6OptStatusCode(statuscode=1) / + DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts) + ) + self.pg0.add_stream([p]) + self.pg_start() + + ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event") + + self.assert_equal(ev.preference, 7) + self.assert_equal(ev.status_code, 1) + self.assert_equal(ev.T1, 20) + self.assert_equal(ev.T2, 40) + + reported_prefix = ev.prefixes[0] + prefix = ia_pd_opts.getfieldval("prefix") + self.assert_equal( + str(reported_prefix.prefix).split('/')[0], prefix) + self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]), + ia_pd_opts.getfieldval("plen")) + self.assert_equal(reported_prefix.preferred_time, + ia_pd_opts.getfieldval("preflft")) + self.assert_equal(reported_prefix.valid_time, + ia_pd_opts.getfieldval("validlft")) + + finally: + self.vapi.want_dhcp6_pd_reply_events(enable_disable=0) + self.vapi.dhcp6_clients_enable_disable(enable=0) + + +@tag_run_solo +class TestDHCPv6IANAControlPlane(VppTestCase): + """ DHCPv6 IA NA Control Plane Test Case """ + + @classmethod + def setUpClass(cls): + super(TestDHCPv6IANAControlPlane, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestDHCPv6IANAControlPlane, cls).tearDownClass() + + def setUp(self): + super(TestDHCPv6IANAControlPlane, self).setUp() + + self.create_pg_interfaces(range(1)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) + self.client_duid = None + self.T1 = 1 + self.T2 = 2 + + fib = self.vapi.ip_route_dump(0, True) + self.initial_addresses = set(self.get_interface_addresses(fib, + self.pg0)) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index, + enable=1) + + def tearDown(self): + self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index, + enable=0) + + for i in self.interfaces: + i.admin_down() + + super(TestDHCPv6IANAControlPlane, self).tearDown() + + @staticmethod + def get_interface_addresses(fib, pg): + lst = [] + for entry in fib: + if entry.route.prefix.prefixlen == 128: + path = entry.route.paths[0] + if path.sw_if_index == pg.sw_if_index: + lst.append(str(entry.route.prefix.network_address)) + return lst + + def get_addresses(self): + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + return addresses.difference(self.initial_addresses) + + def validate_duid_ll(self, duid): + DUID_LL(duid) + + def validate_packet(self, packet, msg_type, is_resend=False): + try: + self.assertEqual(packet.haslayer(msg_type), 1) + client_duid = packet[DHCP6OptClientId].duid + if self.client_duid is None: + self.client_duid = client_duid + self.validate_duid_ll(client_duid) + else: + self.assertEqual(self.client_duid, client_duid) + if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind: + server_duid = packet[DHCP6OptServerId].duid + self.assertEqual(server_duid, self.server_duid) + if is_resend: + self.assertEqual(self.trid, packet[msg_type].trid) + else: + self.trid = packet[msg_type].trid + ip = packet[IPv6] + udp = packet[UDP] + self.assertEqual(ip.dst, 'ff02::1:2') + self.assertEqual(udp.sport, 546) + self.assertEqual(udp.dport, 547) + dhcpv6 = packet[msg_type] + elapsed_time = dhcpv6[DHCP6OptElapsedTime] + if (is_resend): + self.assertNotEqual(elapsed_time.elapsedtime, 0) + else: + self.assertEqual(elapsed_time.elapsedtime, 0) + except BaseException: + packet.show() + raise + + def wait_for_packet(self, msg_type, timeout=None, is_resend=False): + if timeout is None: + timeout = 3 + rx_list = self.pg0.get_capture(1, timeout=timeout) + packet = rx_list[0] + self.validate_packet(packet, msg_type, is_resend=is_resend) + + def wait_for_solicit(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend) + + def wait_for_request(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend) + + def wait_for_renew(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend) + + def wait_for_rebind(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend) + + def wait_for_release(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend) + + def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None): + if t1 is None: + t1 = self.T1 + if t2 is None: + t2 = self.T2 + if ianaopts is None: + opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2) + else: + opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), + dst=self.pg0.local_ip6_ll) / + UDP(sport=547, dport=546) / + msg_type(trid=self.trid) / + DHCP6OptServerId(duid=self.server_duid) / + DHCP6OptClientId(duid=self.client_duid) / + opt_ia_na + ) + self.pg0.add_stream([p]) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + def send_advertise(self, t1=None, t2=None, ianaopts=None): + self.send_packet(DHCP6_Advertise, t1, t2, ianaopts) + + def send_reply(self, t1=None, t2=None, ianaopts=None): + self.send_packet(DHCP6_Reply, t1, t2, ianaopts) + + def test_T1_and_T2_timeouts(self): + """ Test T1 and T2 timeouts """ + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + self.send_reply() + + self.sleep(1) + + self.wait_for_renew() + + self.pg_enable_capture(self.pg_interfaces) + + self.sleep(1) + + self.wait_for_rebind() + + def test_addresses(self): + """ Test handling of addresses """ + + ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1, + validlft=2) + + self.wait_for_solicit() + self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts) + self.wait_for_request() + self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts) + self.sleep(0.1) + + # check FIB for new address + new_addresses = self.get_addresses() + self.assertEqual(len(new_addresses), 1) + addr = list(new_addresses)[0] + self.assertEqual(addr, '7:8::2') + + self.sleep(2) + + # check that the address is deleted + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + def test_sending_client_messages_solicit(self): + """ VPP receives messages from DHCPv6 client """ + + self.wait_for_solicit() + self.send_packet(DHCP6_Solicit) + self.send_packet(DHCP6_Request) + self.send_packet(DHCP6_Renew) + self.send_packet(DHCP6_Rebind) + self.sleep(1) + self.wait_for_solicit(is_resend=True) + + def test_sending_inappropriate_packets(self): + """ Server sends messages with inappropriate message types """ + + self.wait_for_solicit() + self.send_reply() + self.wait_for_solicit(is_resend=True) + self.send_advertise() + self.wait_for_request() + self.send_advertise() + self.wait_for_request(is_resend=True) + self.send_reply() + self.wait_for_renew() + + def test_no_address_available_in_advertise(self): + """ Advertise message contains NoAddrsAvail status code """ + + self.wait_for_solicit() + noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail + self.send_advertise(ianaopts=noavail) + self.wait_for_solicit(is_resend=True) + + def test_preferred_greater_than_valid_lifetime(self): + """ Preferred lifetime is greater than valid lifetime """ + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3) + self.send_reply(ianaopts=ia_na_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + def test_T1_greater_than_T2(self): + """ T1 is greater than T2 """ + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8) + self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + +@tag_fixme_vpp_workers +class TestDHCPv6PDControlPlane(VppTestCase): + """ DHCPv6 PD Control Plane Test Case """ + + @classmethod + def setUpClass(cls): + super(TestDHCPv6PDControlPlane, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestDHCPv6PDControlPlane, cls).tearDownClass() + + def setUp(self): + super(TestDHCPv6PDControlPlane, self).setUp() + + self.create_pg_interfaces(range(2)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) + self.client_duid = None + self.T1 = 1 + self.T2 = 2 + + fib = self.vapi.ip_route_dump(0, True) + self.initial_addresses = set(self.get_interface_addresses(fib, + self.pg1)) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.prefix_group = 'my-pd-prefix-group' + + self.vapi.dhcp6_pd_client_enable_disable( + enable=1, + sw_if_index=self.pg0.sw_if_index, + prefix_group=self.prefix_group) + + def tearDown(self): + self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, + enable=0) + + for i in self.interfaces: + i.admin_down() + + super(TestDHCPv6PDControlPlane, self).tearDown() + + @staticmethod + def get_interface_addresses(fib, pg): + lst = [] + for entry in fib: + if entry.route.prefix.prefixlen == 128: + path = entry.route.paths[0] + if path.sw_if_index == pg.sw_if_index: + lst.append(str(entry.route.prefix.network_address)) + return lst + + def get_addresses(self): + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg1)) + return addresses.difference(self.initial_addresses) + + def validate_duid_ll(self, duid): + DUID_LL(duid) + + def validate_packet(self, packet, msg_type, is_resend=False): + try: + self.assertEqual(packet.haslayer(msg_type), 1) + client_duid = packet[DHCP6OptClientId].duid + if self.client_duid is None: + self.client_duid = client_duid + self.validate_duid_ll(client_duid) + else: + self.assertEqual(self.client_duid, client_duid) + if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind: + server_duid = packet[DHCP6OptServerId].duid + self.assertEqual(server_duid, self.server_duid) + if is_resend: + self.assertEqual(self.trid, packet[msg_type].trid) + else: + self.trid = packet[msg_type].trid + ip = packet[IPv6] + udp = packet[UDP] + self.assertEqual(ip.dst, 'ff02::1:2') + self.assertEqual(udp.sport, 546) + self.assertEqual(udp.dport, 547) + dhcpv6 = packet[msg_type] + elapsed_time = dhcpv6[DHCP6OptElapsedTime] + if (is_resend): + self.assertNotEqual(elapsed_time.elapsedtime, 0) + else: + self.assertEqual(elapsed_time.elapsedtime, 0) + except BaseException: + packet.show() + raise + + def wait_for_packet(self, msg_type, timeout=None, is_resend=False): + if timeout is None: + timeout = 3 + rx_list = self.pg0.get_capture(1, timeout=timeout) + packet = rx_list[0] + self.validate_packet(packet, msg_type, is_resend=is_resend) + + def wait_for_solicit(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend) + + def wait_for_request(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend) + + def wait_for_renew(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend) + + def wait_for_rebind(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend) + + def wait_for_release(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend) + + def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None): + if t1 is None: + t1 = self.T1 + if t2 is None: + t2 = self.T2 + if iapdopt is None: + opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2) + else: + opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt) + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), + dst=self.pg0.local_ip6_ll) / + UDP(sport=547, dport=546) / + msg_type(trid=self.trid) / + DHCP6OptServerId(duid=self.server_duid) / + DHCP6OptClientId(duid=self.client_duid) / + opt_ia_pd + ) + self.pg0.add_stream([p]) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + def send_advertise(self, t1=None, t2=None, iapdopt=None): + self.send_packet(DHCP6_Advertise, t1, t2, iapdopt) + + def send_reply(self, t1=None, t2=None, iapdopt=None): + self.send_packet(DHCP6_Reply, t1, t2, iapdopt) + + def test_T1_and_T2_timeouts(self): + """ Test T1 and T2 timeouts """ + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + self.send_reply() + + self.sleep(1) + + self.wait_for_renew() + + self.pg_enable_capture(self.pg_interfaces) + + self.sleep(1) + + self.wait_for_rebind() + + def test_prefixes(self): + """ Test handling of prefixes """ + + address1 = '::2:0:0:0:405/60' + address2 = '::76:0:0:0:406/62' + try: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group) + + ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2, + validlft=3) + + self.wait_for_solicit() + self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts) + self.wait_for_request() + self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts) + self.sleep(0.1) + + # check FIB for new address + new_addresses = self.get_addresses() + self.assertEqual(len(new_addresses), 1) + addr = list(new_addresses)[0] + self.assertEqual(addr, '7:8:0:2::405') + + self.sleep(1) + + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address2, + prefix_group=self.prefix_group) + + self.sleep(1) + + # check FIB contains 2 addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg1)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 2) + addr1 = list(new_addresses)[0] + addr2 = list(new_addresses)[1] + if addr1 == '7:8:0:76::406': + addr1, addr2 = addr2, addr1 + self.assertEqual(addr1, '7:8:0:2::405') + self.assertEqual(addr2, '7:8:0:76::406') + + self.sleep(1) + + # check that the addresses are deleted + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg1)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + finally: + if address1 is not None: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, is_add=0) + if address2 is not None: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address2, + prefix_group=self.prefix_group, is_add=0) + + def test_sending_client_messages_solicit(self): + """ VPP receives messages from DHCPv6 client """ + + self.wait_for_solicit() + self.send_packet(DHCP6_Solicit) + self.send_packet(DHCP6_Request) + self.send_packet(DHCP6_Renew) + self.send_packet(DHCP6_Rebind) + self.sleep(1) + self.wait_for_solicit(is_resend=True) + + def test_sending_inappropriate_packets(self): + """ Server sends messages with inappropriate message types """ + + self.wait_for_solicit() + self.send_reply() + self.wait_for_solicit(is_resend=True) + self.send_advertise() + self.wait_for_request() + self.send_advertise() + self.wait_for_request(is_resend=True) + self.send_reply() + self.wait_for_renew() + + def test_no_prefix_available_in_advertise(self): + """ Advertise message contains NoPrefixAvail status code """ + + self.wait_for_solicit() + noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail + self.send_advertise(iapdopt=noavail) + self.wait_for_solicit(is_resend=True) + + def test_preferred_greater_than_valid_lifetime(self): + """ Preferred lifetime is greater than valid lifetime """ + + address1 = '::2:0:0:0:405/60' + try: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group) + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4, + validlft=3) + self.send_reply(iapdopt=ia_pd_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg1)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + finally: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + is_add=0) + + def test_T1_greater_than_T2(self): + """ T1 is greater than T2 """ + + address1 = '::2:0:0:0:405/60' + try: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group) + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4, + validlft=8) + self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg1)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + finally: + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + prefix_group=self.prefix_group, + address_with_prefix=address1, + is_add=False) |