summaryrefslogtreecommitdiffstats
path: root/src/plugins/dhcp/test/test_dhcp6.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/dhcp/test/test_dhcp6.py')
-rw-r--r--src/plugins/dhcp/test/test_dhcp6.py805
1 files changed, 0 insertions, 805 deletions
diff --git a/src/plugins/dhcp/test/test_dhcp6.py b/src/plugins/dhcp/test/test_dhcp6.py
deleted file mode 100644
index 57eb113fb13..00000000000
--- a/src/plugins/dhcp/test/test_dhcp6.py
+++ /dev/null
@@ -1,805 +0,0 @@
-from socket import AF_INET6, inet_ntop, inet_pton
-
-from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
- DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
- DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
- DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
- DHCP6OptIAAddress
-from scapy.layers.inet6 import IPv6, Ether, UDP
-from scapy.utils6 import in6_mactoifaceid
-
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase
-from framework import tag_run_solo
-from vpp_papi import VppEnum
-import util
-import os
-
-
-def ip6_normalize(ip6):
- return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
-
-
-class TestDHCPv6DataPlane(VppTestCase):
- """ DHCPv6 Data Plane Test Case """
-
- @classmethod
- def setUpClass(cls):
- super(TestDHCPv6DataPlane, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestDHCPv6DataPlane, cls).tearDownClass()
-
- def setUp(self):
- super(TestDHCPv6DataPlane, self).setUp()
-
- self.create_pg_interfaces(range(1))
- self.interfaces = list(self.pg_interfaces)
- for i in self.interfaces:
- i.admin_up()
- i.config_ip6()
-
- self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
-
- def tearDown(self):
- for i in self.interfaces:
- i.unconfig_ip6()
- i.admin_down()
- super(TestDHCPv6DataPlane, self).tearDown()
-
- def test_dhcp_ia_na_send_solicit_receive_advertise(self):
- """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
-
- self.vapi.dhcp6_clients_enable_disable(enable=1)
-
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- address = {'address': '1:2:3::5',
- 'preferred_time': 60,
- 'valid_time': 120}
- self.vapi.dhcp6_send_client_message(
- server_index=0xffffffff,
- mrc=1,
- msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
- sw_if_index=self.pg0.sw_if_index,
- T1=20,
- T2=40,
- addresses=[address],
- n_addresses=len(
- [address]))
- rx_list = self.pg0.get_capture(1)
- self.assertEqual(len(rx_list), 1)
- packet = rx_list[0]
-
- self.assertEqual(packet.haslayer(IPv6), 1)
- self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
-
- client_duid = packet[DHCP6OptClientId].duid
- trid = packet[DHCP6_Solicit].trid
-
- dst = ip6_normalize(packet[IPv6].dst)
- dst2 = ip6_normalize("ff02::1:2")
- self.assert_equal(dst, dst2)
- src = ip6_normalize(packet[IPv6].src)
- src2 = ip6_normalize(self.pg0.local_ip6_ll)
- self.assert_equal(src, src2)
- ia_na = packet[DHCP6OptIA_NA]
- self.assert_equal(ia_na.T1, 20)
- self.assert_equal(ia_na.T2, 40)
- self.assert_equal(len(ia_na.ianaopts), 1)
- address = ia_na.ianaopts[0]
- self.assert_equal(address.addr, '1:2:3::5')
- self.assert_equal(address.preflft, 60)
- self.assert_equal(address.validlft, 120)
-
- self.vapi.want_dhcp6_reply_events(enable_disable=1,
- pid=os.getpid())
-
- try:
- ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
- validlft=120)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- DHCP6_Advertise(trid=trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=client_duid) /
- DHCP6OptPref(prefval=7) /
- DHCP6OptStatusCode(statuscode=1) /
- DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
- )
- self.pg0.add_stream([p])
- self.pg_start()
-
- ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
-
- self.assert_equal(ev.preference, 7)
- self.assert_equal(ev.status_code, 1)
- self.assert_equal(ev.T1, 20)
- self.assert_equal(ev.T2, 40)
-
- reported_address = ev.addresses[0]
- address = ia_na_opts.getfieldval("addr")
- self.assert_equal(str(reported_address.address), address)
- self.assert_equal(reported_address.preferred_time,
- ia_na_opts.getfieldval("preflft"))
- self.assert_equal(reported_address.valid_time,
- ia_na_opts.getfieldval("validlft"))
-
- finally:
- self.vapi.want_dhcp6_reply_events(enable_disable=0)
- self.vapi.dhcp6_clients_enable_disable(enable=0)
-
- def test_dhcp_pd_send_solicit_receive_advertise(self):
- """ Verify DHCPv6 PD Solicit packet and Advertise event """
-
- self.vapi.dhcp6_clients_enable_disable(enable=1)
-
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
- 'preferred_time': 60,
- 'valid_time': 120}
- prefixes = [prefix]
- self.vapi.dhcp6_pd_send_client_message(
- server_index=0xffffffff,
- mrc=1,
- msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
- sw_if_index=self.pg0.sw_if_index,
- T1=20,
- T2=40,
- prefixes=prefixes,
- n_prefixes=len(prefixes))
- rx_list = self.pg0.get_capture(1)
- self.assertEqual(len(rx_list), 1)
- packet = rx_list[0]
-
- self.assertEqual(packet.haslayer(IPv6), 1)
- self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
-
- client_duid = packet[DHCP6OptClientId].duid
- trid = packet[DHCP6_Solicit].trid
-
- dst = ip6_normalize(packet[IPv6].dst)
- dst2 = ip6_normalize("ff02::1:2")
- self.assert_equal(dst, dst2)
- src = ip6_normalize(packet[IPv6].src)
- src2 = ip6_normalize(self.pg0.local_ip6_ll)
- self.assert_equal(src, src2)
- ia_pd = packet[DHCP6OptIA_PD]
- self.assert_equal(ia_pd.T1, 20)
- self.assert_equal(ia_pd.T2, 40)
- self.assert_equal(len(ia_pd.iapdopt), 1)
- prefix = ia_pd.iapdopt[0]
- self.assert_equal(prefix.prefix, '1:2:3::')
- self.assert_equal(prefix.plen, 50)
- self.assert_equal(prefix.preflft, 60)
- self.assert_equal(prefix.validlft, 120)
-
- self.vapi.want_dhcp6_pd_reply_events(enable_disable=1,
- pid=os.getpid())
-
- try:
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
- validlft=120)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- DHCP6_Advertise(trid=trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=client_duid) /
- DHCP6OptPref(prefval=7) /
- DHCP6OptStatusCode(statuscode=1) /
- DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
- )
- self.pg0.add_stream([p])
- self.pg_start()
-
- ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
-
- self.assert_equal(ev.preference, 7)
- self.assert_equal(ev.status_code, 1)
- self.assert_equal(ev.T1, 20)
- self.assert_equal(ev.T2, 40)
-
- reported_prefix = ev.prefixes[0]
- prefix = ia_pd_opts.getfieldval("prefix")
- self.assert_equal(
- str(reported_prefix.prefix).split('/')[0], prefix)
- self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
- ia_pd_opts.getfieldval("plen"))
- self.assert_equal(reported_prefix.preferred_time,
- ia_pd_opts.getfieldval("preflft"))
- self.assert_equal(reported_prefix.valid_time,
- ia_pd_opts.getfieldval("validlft"))
-
- finally:
- self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
- self.vapi.dhcp6_clients_enable_disable(enable=0)
-
-
-@tag_run_solo
-class TestDHCPv6IANAControlPlane(VppTestCase):
- """ DHCPv6 IA NA Control Plane Test Case """
-
- @classmethod
- def setUpClass(cls):
- super(TestDHCPv6IANAControlPlane, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
-
- def setUp(self):
- super(TestDHCPv6IANAControlPlane, self).setUp()
-
- self.create_pg_interfaces(range(1))
- self.interfaces = list(self.pg_interfaces)
- for i in self.interfaces:
- i.admin_up()
-
- self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
- self.client_duid = None
- self.T1 = 1
- self.T2 = 2
-
- fib = self.vapi.ip_route_dump(0, True)
- self.initial_addresses = set(self.get_interface_addresses(fib,
- self.pg0))
-
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
- enable=1)
-
- def tearDown(self):
- self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
- enable=0)
-
- for i in self.interfaces:
- i.admin_down()
-
- super(TestDHCPv6IANAControlPlane, self).tearDown()
-
- @staticmethod
- def get_interface_addresses(fib, pg):
- lst = []
- for entry in fib:
- if entry.route.prefix.prefixlen == 128:
- path = entry.route.paths[0]
- if path.sw_if_index == pg.sw_if_index:
- lst.append(str(entry.route.prefix.network_address))
- return lst
-
- def get_addresses(self):
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg0))
- return addresses.difference(self.initial_addresses)
-
- def validate_duid_ll(self, duid):
- DUID_LL(duid)
-
- def validate_packet(self, packet, msg_type, is_resend=False):
- try:
- self.assertEqual(packet.haslayer(msg_type), 1)
- client_duid = packet[DHCP6OptClientId].duid
- if self.client_duid is None:
- self.client_duid = client_duid
- self.validate_duid_ll(client_duid)
- else:
- self.assertEqual(self.client_duid, client_duid)
- if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
- server_duid = packet[DHCP6OptServerId].duid
- self.assertEqual(server_duid, self.server_duid)
- if is_resend:
- self.assertEqual(self.trid, packet[msg_type].trid)
- else:
- self.trid = packet[msg_type].trid
- ip = packet[IPv6]
- udp = packet[UDP]
- self.assertEqual(ip.dst, 'ff02::1:2')
- self.assertEqual(udp.sport, 546)
- self.assertEqual(udp.dport, 547)
- dhcpv6 = packet[msg_type]
- elapsed_time = dhcpv6[DHCP6OptElapsedTime]
- if (is_resend):
- self.assertNotEqual(elapsed_time.elapsedtime, 0)
- else:
- self.assertEqual(elapsed_time.elapsedtime, 0)
- except BaseException:
- packet.show()
- raise
-
- def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
- if timeout is None:
- timeout = 3
- rx_list = self.pg0.get_capture(1, timeout=timeout)
- packet = rx_list[0]
- self.validate_packet(packet, msg_type, is_resend=is_resend)
-
- def wait_for_solicit(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
-
- def wait_for_request(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
-
- def wait_for_renew(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
-
- def wait_for_rebind(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
-
- def wait_for_release(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
-
- def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
- if t1 is None:
- t1 = self.T1
- if t2 is None:
- t2 = self.T2
- if ianaopts is None:
- opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
- else:
- opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- msg_type(trid=self.trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=self.client_duid) /
- opt_ia_na
- )
- self.pg0.add_stream([p])
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- def send_advertise(self, t1=None, t2=None, ianaopts=None):
- self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
-
- def send_reply(self, t1=None, t2=None, ianaopts=None):
- self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
-
- def test_T1_and_T2_timeouts(self):
- """ Test T1 and T2 timeouts """
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- self.send_reply()
-
- self.sleep(1)
-
- self.wait_for_renew()
-
- self.pg_enable_capture(self.pg_interfaces)
-
- self.sleep(1)
-
- self.wait_for_rebind()
-
- def test_addresses(self):
- """ Test handling of addresses """
-
- ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
- validlft=2)
-
- self.wait_for_solicit()
- self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
- self.wait_for_request()
- self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
- self.sleep(0.1)
-
- # check FIB for new address
- new_addresses = self.get_addresses()
- self.assertEqual(len(new_addresses), 1)
- addr = list(new_addresses)[0]
- self.assertEqual(addr, '7:8::2')
-
- self.sleep(2)
-
- # check that the address is deleted
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg0))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
- def test_sending_client_messages_solicit(self):
- """ VPP receives messages from DHCPv6 client """
-
- self.wait_for_solicit()
- self.send_packet(DHCP6_Solicit)
- self.send_packet(DHCP6_Request)
- self.send_packet(DHCP6_Renew)
- self.send_packet(DHCP6_Rebind)
- self.sleep(1)
- self.wait_for_solicit(is_resend=True)
-
- def test_sending_inappropriate_packets(self):
- """ Server sends messages with inappropriate message types """
-
- self.wait_for_solicit()
- self.send_reply()
- self.wait_for_solicit(is_resend=True)
- self.send_advertise()
- self.wait_for_request()
- self.send_advertise()
- self.wait_for_request(is_resend=True)
- self.send_reply()
- self.wait_for_renew()
-
- def test_no_address_available_in_advertise(self):
- """ Advertise message contains NoAddrsAvail status code """
-
- self.wait_for_solicit()
- noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail
- self.send_advertise(ianaopts=noavail)
- self.wait_for_solicit(is_resend=True)
-
- def test_preferred_greater_than_valid_lifetime(self):
- """ Preferred lifetime is greater than valid lifetime """
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
- self.send_reply(ianaopts=ia_na_opts)
-
- self.sleep(0.5)
-
- # check FIB contains no addresses
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg0))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
- def test_T1_greater_than_T2(self):
- """ T1 is greater than T2 """
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
- self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
-
- self.sleep(0.5)
-
- # check FIB contains no addresses
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg0))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
-
-@tag_fixme_vpp_workers
-class TestDHCPv6PDControlPlane(VppTestCase):
- """ DHCPv6 PD Control Plane Test Case """
-
- @classmethod
- def setUpClass(cls):
- super(TestDHCPv6PDControlPlane, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestDHCPv6PDControlPlane, cls).tearDownClass()
-
- def setUp(self):
- super(TestDHCPv6PDControlPlane, self).setUp()
-
- self.create_pg_interfaces(range(2))
- self.interfaces = list(self.pg_interfaces)
- for i in self.interfaces:
- i.admin_up()
-
- self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
- self.client_duid = None
- self.T1 = 1
- self.T2 = 2
-
- fib = self.vapi.ip_route_dump(0, True)
- self.initial_addresses = set(self.get_interface_addresses(fib,
- self.pg1))
-
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- self.prefix_group = 'my-pd-prefix-group'
-
- self.vapi.dhcp6_pd_client_enable_disable(
- enable=1,
- sw_if_index=self.pg0.sw_if_index,
- prefix_group=self.prefix_group)
-
- def tearDown(self):
- self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
- enable=0)
-
- for i in self.interfaces:
- i.admin_down()
-
- super(TestDHCPv6PDControlPlane, self).tearDown()
-
- @staticmethod
- def get_interface_addresses(fib, pg):
- lst = []
- for entry in fib:
- if entry.route.prefix.prefixlen == 128:
- path = entry.route.paths[0]
- if path.sw_if_index == pg.sw_if_index:
- lst.append(str(entry.route.prefix.network_address))
- return lst
-
- def get_addresses(self):
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg1))
- return addresses.difference(self.initial_addresses)
-
- def validate_duid_ll(self, duid):
- DUID_LL(duid)
-
- def validate_packet(self, packet, msg_type, is_resend=False):
- try:
- self.assertEqual(packet.haslayer(msg_type), 1)
- client_duid = packet[DHCP6OptClientId].duid
- if self.client_duid is None:
- self.client_duid = client_duid
- self.validate_duid_ll(client_duid)
- else:
- self.assertEqual(self.client_duid, client_duid)
- if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
- server_duid = packet[DHCP6OptServerId].duid
- self.assertEqual(server_duid, self.server_duid)
- if is_resend:
- self.assertEqual(self.trid, packet[msg_type].trid)
- else:
- self.trid = packet[msg_type].trid
- ip = packet[IPv6]
- udp = packet[UDP]
- self.assertEqual(ip.dst, 'ff02::1:2')
- self.assertEqual(udp.sport, 546)
- self.assertEqual(udp.dport, 547)
- dhcpv6 = packet[msg_type]
- elapsed_time = dhcpv6[DHCP6OptElapsedTime]
- if (is_resend):
- self.assertNotEqual(elapsed_time.elapsedtime, 0)
- else:
- self.assertEqual(elapsed_time.elapsedtime, 0)
- except BaseException:
- packet.show()
- raise
-
- def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
- if timeout is None:
- timeout = 3
- rx_list = self.pg0.get_capture(1, timeout=timeout)
- packet = rx_list[0]
- self.validate_packet(packet, msg_type, is_resend=is_resend)
-
- def wait_for_solicit(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
-
- def wait_for_request(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
-
- def wait_for_renew(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
-
- def wait_for_rebind(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
-
- def wait_for_release(self, timeout=None, is_resend=False):
- self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
-
- def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
- if t1 is None:
- t1 = self.T1
- if t2 is None:
- t2 = self.T2
- if iapdopt is None:
- opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
- else:
- opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- msg_type(trid=self.trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=self.client_duid) /
- opt_ia_pd
- )
- self.pg0.add_stream([p])
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- def send_advertise(self, t1=None, t2=None, iapdopt=None):
- self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
-
- def send_reply(self, t1=None, t2=None, iapdopt=None):
- self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
-
- def test_T1_and_T2_timeouts(self):
- """ Test T1 and T2 timeouts """
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- self.send_reply()
-
- self.sleep(1)
-
- self.wait_for_renew()
-
- self.pg_enable_capture(self.pg_interfaces)
-
- self.sleep(1)
-
- self.wait_for_rebind()
-
- def test_prefixes(self):
- """ Test handling of prefixes """
-
- address1 = '::2:0:0:0:405/60'
- address2 = '::76:0:0:0:406/62'
- try:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address1,
- prefix_group=self.prefix_group)
-
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
- validlft=3)
-
- self.wait_for_solicit()
- self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
- self.wait_for_request()
- self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
- self.sleep(0.1)
-
- # check FIB for new address
- new_addresses = self.get_addresses()
- self.assertEqual(len(new_addresses), 1)
- addr = list(new_addresses)[0]
- self.assertEqual(addr, '7:8:0:2::405')
-
- self.sleep(1)
-
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address2,
- prefix_group=self.prefix_group)
-
- self.sleep(1)
-
- # check FIB contains 2 addresses
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg1))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 2)
- addr1 = list(new_addresses)[0]
- addr2 = list(new_addresses)[1]
- if addr1 == '7:8:0:76::406':
- addr1, addr2 = addr2, addr1
- self.assertEqual(addr1, '7:8:0:2::405')
- self.assertEqual(addr2, '7:8:0:76::406')
-
- self.sleep(1)
-
- # check that the addresses are deleted
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg1))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
- finally:
- if address1 is not None:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address1,
- prefix_group=self.prefix_group, is_add=0)
- if address2 is not None:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address2,
- prefix_group=self.prefix_group, is_add=0)
-
- def test_sending_client_messages_solicit(self):
- """ VPP receives messages from DHCPv6 client """
-
- self.wait_for_solicit()
- self.send_packet(DHCP6_Solicit)
- self.send_packet(DHCP6_Request)
- self.send_packet(DHCP6_Renew)
- self.send_packet(DHCP6_Rebind)
- self.sleep(1)
- self.wait_for_solicit(is_resend=True)
-
- def test_sending_inappropriate_packets(self):
- """ Server sends messages with inappropriate message types """
-
- self.wait_for_solicit()
- self.send_reply()
- self.wait_for_solicit(is_resend=True)
- self.send_advertise()
- self.wait_for_request()
- self.send_advertise()
- self.wait_for_request(is_resend=True)
- self.send_reply()
- self.wait_for_renew()
-
- def test_no_prefix_available_in_advertise(self):
- """ Advertise message contains NoPrefixAvail status code """
-
- self.wait_for_solicit()
- noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
- self.send_advertise(iapdopt=noavail)
- self.wait_for_solicit(is_resend=True)
-
- def test_preferred_greater_than_valid_lifetime(self):
- """ Preferred lifetime is greater than valid lifetime """
-
- address1 = '::2:0:0:0:405/60'
- try:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address1,
- prefix_group=self.prefix_group)
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
- validlft=3)
- self.send_reply(iapdopt=ia_pd_opts)
-
- self.sleep(0.5)
-
- # check FIB contains no addresses
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg1))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
- finally:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address1,
- prefix_group=self.prefix_group,
- is_add=0)
-
- def test_T1_greater_than_T2(self):
- """ T1 is greater than T2 """
-
- address1 = '::2:0:0:0:405/60'
- try:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- address_with_prefix=address1,
- prefix_group=self.prefix_group)
-
- self.wait_for_solicit()
- self.send_advertise()
- self.wait_for_request()
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
- validlft=8)
- self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
-
- self.sleep(0.5)
-
- # check FIB contains no addresses
- fib = self.vapi.ip_route_dump(0, True)
- addresses = set(self.get_interface_addresses(fib, self.pg1))
- new_addresses = addresses.difference(self.initial_addresses)
- self.assertEqual(len(new_addresses), 0)
-
- finally:
- self.vapi.ip6_add_del_address_using_prefix(
- sw_if_index=self.pg1.sw_if_index,
- prefix_group=self.prefix_group,
- address_with_prefix=address1,
- is_add=False)