from socket import AF_INET6 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \ DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \ DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \ DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \ DHCP6OptIAAddress from scapy.layers.inet6 import IPv6, Ether, UDP from scapy.utils6 import in6_mactoifaceid from scapy.utils import inet_ntop, inet_pton from framework import VppTestCase import util def ip6_normalize(ip6): return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6)) class TestDHCPv6DataPlane(VppTestCase): """ DHCPv6 Data Plane Test Case """ @classmethod def setUpClass(cls): super(TestDHCPv6DataPlane, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestDHCPv6DataPlane, cls).tearDownClass() def setUp(self): super(TestDHCPv6DataPlane, self).setUp() self.create_pg_interfaces(range(1)) self.interfaces = list(self.pg_interfaces) for i in self.interfaces: i.admin_up() i.config_ip6() self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) def tearDown(self): for i in self.interfaces: i.unconfig_ip6() i.admin_down() super(TestDHCPv6DataPlane, self).tearDown() def test_dhcp_ia_na_send_solicit_receive_advertise(self): """ Verify DHCPv6 IA NA Solicit packet and Advertise event """ self.vapi.dhcp6_clients_enable_disable() self.pg_enable_capture(self.pg_interfaces) self.pg_start() address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05' address = {'address': address_bin, 'preferred_time': 60, 'valid_time': 120} self.vapi.dhcp6_send_client_message(msg_type=1, sw_if_index=self.pg0.sw_if_index, T1=20, T2=40, addresses=[address], n_addresses=len([address])) rx_list = self.pg0.get_capture(1) self.assertEqual(len(rx_list), 1) packet = rx_list[0] self.assertEqual(packet.haslayer(IPv6), 1) self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) client_duid = packet[DHCP6OptClientId].duid trid = packet[DHCP6_Solicit].trid dst = ip6_normalize(packet[IPv6].dst) dst2 = ip6_normalize("ff02::1:2") self.assert_equal(dst, dst2) src = ip6_normalize(packet[IPv6].src) src2 = ip6_normalize(self.pg0.local_ip6_ll) self.assert_equal(src, src2) ia_na = packet[DHCP6OptIA_NA] self.assert_equal(ia_na.T1, 20) self.assert_equal(ia_na.T2, 40) self.assert_equal(len(ia_na.ianaopts), 1) address = ia_na.ianaopts[0] self.assert_equal(address.addr, '1:2:3::5') self.assert_equal(address.preflft, 60) self.assert_equal(address.validlft, 120) self.vapi.want_dhcp6_reply_events() try: ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60, validlft=120) p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll) / UDP(sport=547, dport=546) / DHCP6_Advertise(trid=trid) / DHCP6OptServerId(duid=self.server_duid) / DHCP6OptClientId(duid=client_duid) / DHCP6OptPref(prefval=7) / DHCP6OptStatusCode(statuscode=1) / DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts) ) self.pg0.add_stream([p]) self.pg_start() ev = self.vapi.wait_for_event(1, "dhcp6_reply_event") self.assert_equal(ev.preference, 7) self.assert_equal(ev.status_code, 1) self.assert_equal(ev.T1, 20) self.assert_equal(ev.T2, 40) reported_address = ev.addresses[0] address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr")) self.assert_equal(reported_address.address, address) self.assert_equal(reported_address.preferred_time, ia_na_opts.getfieldval("preflft")) self.assert_equal(reported_address.valid_time, ia_na_opts.getfieldval("validlft")) finally: self.vapi.want_dhcp6_reply_events(enable_disable=0) def test_dhcp_pd_send_solicit_receive_advertise(self): """ Verify DHCPv6 PD Solicit packet and Advertise event """ self.vapi.dhcp6_clients_enable_disable() self.pg_enable_capture(self.pg_interfaces) self.pg_start() prefix_bin = '\00\01\00\02\00\03' + '\00' * 10 prefix = {'prefix': prefix_bin, 'prefix_length': 50, 'preferred_time': 60, 'valid_time': 120} self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index, T1=20, T2=40, prefixes=[prefix]) rx_list = self.pg0.get_capture(1) self.assertEqual(len(rx_list), 1) packet = rx_list[0] self.assertEqual(packet.haslayer(IPv6), 1) self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) client_duid = packet[DHCP6OptClientId].duid trid = packet[DHCP6_Solicit].trid dst = ip6_normalize(packet[IPv6].dst) dst2 = ip6_normalize("ff02::1:2") self.assert_equal(dst, dst2) src = ip6_normalize(packet[IPv6].src) src2 = ip6_normalize(self.pg0.local_ip6_ll) self.assert_equal(src, src2) ia_pd = packet[DHCP6OptIA_PD] self.assert_equal(ia_pd.T1, 20) self.assert_equal(ia_pd.T2, 40) self.assert_equal(len(ia_pd.iapdopt), 1) prefix = ia_pd.iapdopt[0] self.assert_equal(prefix.prefix, '1:2:3::') self.assert_equal(prefix.plen, 50) self.assert_equal(prefix.preflft, 60) self.assert_equal(prefix.validlft, 120) self.vapi.want_dhcp6_pd_reply_events() try: ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60, validlft=120) p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll) / UDP(sport=547, dport=546) / DHCP6_Advertise(trid=trid) / DHCP6OptServerId(duid=self.server_duid) / DHCP6OptClientId(duid=client_duid) / DHCP6OptPref(prefval=7) / DHCP6OptStatusCode(statuscode=1) / DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts) ) self.pg0.add_stream([p]) self.pg_start() ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event") self.assert_equal(ev.preference, 7) self.assert_equal(ev.status_code, 1) self.assert_equal(ev.T1, 20) self.assert_equal(ev.T2, 40) reported_prefix = ev.prefixes[0] prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix")) self.assert_equal(reported_prefix.prefix, prefix) self.assert_equal(reported_prefix.prefix_length, ia_pd_opts.getfieldval("plen")) self.assert_equal(reported_prefix.preferred_time, ia_pd_opts.getfieldval("preflft")) self.assert_equal(reported_prefix.valid_time, ia_pd_opts.getfieldval("validlft")) finally: self.vapi.want_dhcp6_pd_reply_events(enable_disable=0) class TestDHCPv6IANAControlPlane(VppTestCase): """ DHCPv6 IA NA Control Plane Test Case """ @classmethod def setUpClass(cls): super(TestDHCPv6IANAControlPlane, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestDHCPv6IANAControlPlane, cls).tearDownClass() def setUp(self): super(TestDHCPv6IANAControlPlane, self).setUp() self.create_pg_interfaces(range(1)) self.interfaces = list(self.pg_interfaces) for i in self.interfaces: i.admin_up() self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) self.client_duid = None self.T1 = 1 self.T2 = 2 fib = self.vapi.ip6_fib_dump() self.initial_addresses = set(self.get_interface_addresses(fib, self.pg0)) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index) def tearDown(self): self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0) for i in self.interfaces: i.admin_down() super(TestDHCPv6IANAControlPlane, self).tearDown() @staticmethod def get_interface_addresses(fib, pg): lst = [] for entry in fib: if entry.address_length == 128: path = entry.path[0] if path.sw_if_index == pg.sw_if_index: lst.append(entry.address) return lst def get_addresses(self): fib = self.vapi.ip6_fib_dump() addresses = set(self.get_interface_addresses(fib, self.pg0)) return addresses.difference(self.initial_addresses) def validate_duid_ll(self, duid): DUID_LL(duid) def validate_packet(self, packet, msg_type, is_resend=False): try: self.assertEqual(packet.haslayer(msg_type), 1) client_duid = packet[DHCP6OptClientId].duid if self.client_duid is None: self.client_duid = client_duid self.validate_duid_ll(client_duid) else: self.assertEqual(self.client_duid, client_duid) if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind: server_duid = packet[DHCP6OptServerId].duid self.assertEqual(server_duid, self.server_duid) if is_resend: self.assertEqual(self.trid, packet[msg_type].trid) else: self.trid = packet[msg_type].trid ip = packet[IPv6] udp = packet[UDP] self.assertEqual(ip.dst, 'ff02::1:2') self.assertEqual(udp.sport, 546) self.assertEqual(udp.dport, 547) dhcpv6 = packet[msg_type] elapsed_time = dhcpv6[DHCP6OptElapsedTime] if (is_resend): self.assertNotEqual(elapsed_time.elapsedtime, 0) else: self.assertEqual(elapsed_time.elapsedtime, 0) except: packet.show() raise def wait_for_packet(self, msg_type, timeout=None, is_resend=False): if timeout is None: timeout = 3 rx_list = self.pg0.get_capture(1, timeout=timeout) packe
/*
 * Copyright (c) 2016 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <vnet/mpls/mpls_types.h>
#include <vnet/dpo/drop_dpo.h>

#include <vnet/fib/fib_table.h>
#include <vnet/fib/fib_entry.h>
#include <vnet/fib/fib_entry_src.h>
#include <vnet/fib/mpls_fib.h>

/**
 * Source initialisation Function
 */
static void
fib_entry_src_mpls_init (fib_entry_src_t *src)
{
    mpls_eos_bit_t eos;

    src->fes_flags = FIB_ENTRY_SRC_FLAG_NONE;
    src->u.mpls.fesm_label = MPLS_LABEL_INVALID;

    FOR_EACH_MPLS_EOS_BIT(eos)
    {
        src->u.mpls.fesm_lfes[eos] = FIB_NODE_INDEX_INVALID;
    }
}

/**
 * Source deinitialisation Function
 */
static void
fib_entry_src_mpls_deinit (fib_entry_src_t *src)
{
}

static void
fib_entry_src_mpls_remove (fib_entry_src_t *src)
{
    src->fes_pl = FIB_NODE_INDEX_INVALID;
    src->u.mpls.fesm_label = MPLS_LABEL_INVALID;
}

static void
fib_entry_src_mpls_add (fib_entry_src_t *src,
                        const fib_entry_t *entry,
                        fib_entry_flag_t flags,
                        dpo_proto_t proto,
                        const dpo_id_t *dpo)
{
    src->fes_pl =
        fib_path_list_create_special(proto,
                                     FIB_PATH_LIST_FLAG_DROP,
                                     drop_dpo_get(proto));
}

static void
fib_entry_src_mpls_set_data (fib_entry_src_t *src,
                             const fib_entry_t *entry,
                             const void *data)
{
    fib_protocol_t payload_proto;
    fib_node_index_t fei;
    mpls_label_t label;
    mpls_eos_bit_t eos;

    /*
     * post MPLS table alloc and the possible rea-alloc of fib entrys
     * the entry pointer will no longer be valid. so save its index
     */
    payload_proto = entry->fe_prefix.fp_proto;
    fei = fib_entry_get_index(entry);
    label = *(mpls_label_t*)data;

    if (MPLS_LABEL_INVALID == label)
    {
        /*
         * removing the local label
         */
        FOR_EACH_MPLS_EOS_BIT(eos)
        {
            fib_table_entry_delete_index(src->u.mpls.fesm_lfes[eos],
                                         FIB_SOURCE_SPECIAL);
        }
        fib_table_unlock(MPLS_FIB_DEFAULT_TABLE_ID,
                         FIB_PROTOCOL_MPLS,
                         FIB_SOURCE_MPLS);
        src->u.mpls.fesm_label = label;
    }
    else
    {
        fib_prefix_t prefix = {
            .fp_proto = FIB_PROTOCOL_MPLS,