#!/usr/bin/env python3 import unittest import socket import struct import six from framework import VppTestCase, VppTestRunner, running_extended_tests from framework import tag_run_solo from vpp_neighbor import VppNeighbor from vpp_ip_route import find_route, VppIpTable from util import mk_ll_addr import scapy.compat from scapy.layers.l2 import Ether, getmacbyip, ARP, Dot1Q from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, in6_getnsmac from scapy.utils6 import in6_mactoifaceid from scapy.layers.dhcp import DHCP, BOOTP, DHCPTypes from scapy.layers.dhcp6 import DHCP6, DHCP6_Solicit, DHCP6_RelayForward, \ DHCP6_RelayReply, DHCP6_Advertise, DHCP6OptRelayMsg, DHCP6OptIfaceId, \ DHCP6OptStatusCode, DHCP6OptVSS, DHCP6OptClientLinkLayerAddr, DHCP6_Request from socket import AF_INET, AF_INET6, inet_pton, inet_ntop from scapy.utils6 import in6_ptop from vpp_papi import mac_pton, VppEnum from vpp_sub_interface import VppDot1QSubint from vpp_qos import VppQosEgressMap, VppQosMark from vpp_dhcp import VppDHCPClient, VppDHCPProxy DHCP4_CLIENT_PORT = 68 DHCP4_SERVER_PORT = 67 DHCP6_CLIENT_PORT = 547 DHCP6_SERVER_PORT = 546 @tag_run_solo class TestDHCP(VppTestCase): """ DHCP Test Case """ @classmethod def setUpClass(cls): super(TestDHCP, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestDHCP, cls).tearDownClass() def setUp(self): super(TestDHCP, self).setUp() # create 6 pg interfaces for pg0 to pg5 self.create_pg_interfaces(range(6)) self.tables = [] # pg0 to 2 are IP configured in VRF 0, 1 and 2. # pg3 to 5 are non IP-configured in VRF 0, 1 and 2. table_id = 0 for table_id in range(1, 4): tbl4 = VppIpTable(self, table_id) tbl4.add_vpp_config() self.tables.append(tbl4) tbl6 = VppIpTable(self, table_id, is_ip6=1) tbl6.add_vpp_config() self.tables.append(tbl6) table_id = 0 for i in self.pg_interfaces[:3]: i.admin_up() i.set_table_ip4(table_id) i.set_table_ip6(table_id) i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() table_id += 1 table_id = 0 for i in self.pg_interfaces[3:]: i.admin_up() i.set_table_ip4(table_id) i.set_table_ip6(table_id) table_id += 1 def tearDown(self): for i in self.pg_interfaces[:3]: i.unconfig_ip4() i.unconfig_ip6() for i in self.pg_interfaces: i.set_table_ip4(0) i.set_table_ip6(0) i.admin_down() super(TestDHCP, self).tearDown() def verify_dhcp_has_option(self, pkt, option, value): dhcp = pkt[DHCP] found = False for i in dhcp.options: if isinstance(i, tuple): if i[0] == option: self.assertEqual(i[1], value) found = True self.assertTrue(found) def validate_relay_options(self, pkt, intf, ip_addr, vpn_id, fib_id, oui): dhcp = pkt[DHCP] found = 0 data = [] id_len = len(vpn_id) for i in dhcp.options: if isinstance(i, tuple): if i[0] == "relay_agent_Information": # # There are two sb-options present - each of length 6. # data = i[1] if oui != 0: self.assertEqual(len(data), 24) elif len(vpn_id) > 0: self.assertEqual(len(data), len(vpn_id) + 17) else: self.assertEqual(len(data), 12) # # First sub-option is ID 1, len 4, then encoded # sw_if_index. This test uses low valued indicies # so [2:4] are 0. # The ID space is VPP internal - so no matching value # scapy # self.assertEqual(six.byte2int(data[0:1]), 1) self.assertEqual(six.byte2int(data[1:2]), 4) self.assertEqual(six.byte2int(data[2:3]), 0) self.assertEqual(six.byte2int(data[3:4]), 0) self.assertEqual(six.byte2int(data[4:5]), 0) self.assertEqual(six.byte2int(data[5:6]), intf._sw_if_index) # # next sub-option is the IP address of the client side # interface. # sub-option ID=5, length (of a v4 address)=4 # claddr = socket.inet_pton(AF_INET, ip_addr) self.assertEqual(six.byte2int(data[6:7]), 5) self.assertEqual(six.byte2int(data[7:8]), 4) self.assertEqual(data[8], claddr[0]) self.assertEqual(data[9], claddr[1]) self.assertEqual(data[10], claddr[2]) self.assertEqual(data[11], claddr[3]) if oui != 0: # sub-option 151 encodes vss_type 1, # the 3 byte oui and the 4 byte fib_id self.assertEqual(id_len, 0) self.assertEqual(six.byte2int(data[12:13]), 151) self.assertEqual(six.byte2int(data[13:14]), 8) self.assertEqual(six.byte2int(data[14:15]), 1) self.assertEqual(six.byte2int(data[15:16]), 0) self.assertEqual(six.byte2int(data[16:17]), 0) self.assertEqual(six.byte2int(data[17:18]), oui) self.assertEqual(six.byte2int(data[18:19]), 0) self.assertEqual(six.byte2int(data[19:20]), 0) self.assertEqual(six.byte2int(data[20:21]), 0) self.assertEqual(six.byte2int(data[21:22]), fib_id) # VSS control sub-option self.assertEqual(six.byte2int(data[22:23]), 152) self.assertEqual(six.byte2int(data[23:24]), 0) if id_len > 0: # sub-option 151 encode vss_type of 0 # followerd by vpn_id in ascii self.assertEqual(oui, 0) self.assertEqual(six.byte2int(data[12:13]), 151) self.assertEqual(six.byte2int(data[13:14]), id_len + 1) self.assertEqual(six.byte2int(data[14:15]), 0) self.assertEqual(data[15:15 + id_len].decode('ascii'), vpn_id) # VSS control sub-option self.assertEqual(six.byte2int(data[15 + len(vpn_id): 16 + len(vpn_id)]), 152) self.assertEqual(six.byte2int(data[16 + len(vpn_id): 17 + len(vpn_id)]), 0) found = 1 self.assertTrue(found) return data def verify_dhcp_msg_type(self, pkt, name): dhcp = pkt[DHCP] found = False for o in dhcp.options: if isinstance(o, tuple): if o[0] == "message-type" \ and DHCPTypes[o[1]] == name: found = True self.assertTrue(found) def verify_dhcp_offer(self, pkt, intf, vpn_id="", fib_id=0, oui=0): ether = pkt[Ether] self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") self.assertEqual(ether.src, intf.local_mac) ip = pkt[IP] self.assertEqual(ip.dst, "255.255.255.255") self.assertEqual(ip
/*
* Copyright (c) 2016 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <jni.h>
#include "vppjni_env.h"
// Head of the class registration list.
static vppjni_class_t *class_head;
// Head of the class registration list.
static vppjni_field_t *field_head;
void vppjni_init_register_class(vppjni_class_t *ptr)
{
vppjni_class_t **where = &class_head;
while (*where != NULL) {
where = &((*where)->next);
}
*where = ptr;
}
void vppjni_register_field(vppjni_field_t *ptr) {
vppjni_field_t **where = &field_head;
while (*where != NULL) {
where = &((*where)->next);
}
*where = ptr;
}
jobject vppjni_init_new_object(JNIEnv *env, const vppjni_class_t *ptr, va_list ap) {
jobject obj = (*env)->NewObjectV(env, ptr->jclass, ptr->jinit, ap);
if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env);
return NULL;
}
return obj;
}
int vppjni_env_init(JNIEnv *env)
{
vppjni_class_t *cwlk;
vppjni_field_t *fwlk;
for (cwlk = class_head; cwlk != NULL; cwlk = cwlk->next) {
jclass cls;
jmethodID method;
cls = (*env)->FindClass(env, cwlk->fqcn);
if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env);
vppjni_uninit(env);
return JNI_ERR;
}
method = (*env)->GetMethodID(env, cls, "<init>", cwlk->init_sig);
if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env);
vppjni_uninit(env);
return JNI_ERR;
}
cwlk->jclass = (*env)->NewGlobalRef(env, cls);
if (cwlk->jclass == NULL) {
vppjni_uninit(env);
return JNI_ERR;
}
cwlk->jinit = method;
}
for (fwlk = field_head; fwlk != NULL; fwlk = fwlk->next) {
fwlk->jfield = (*env)->GetFieldID(env, fwlk->clsref->jclass, fwlk->name, fwlk->type);
if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env);
vppjni_uninit(env);
return JNI_ERR;
}
}
return 0;
}
void vppjni_env_uninit(JNIEnv *env) {
vppjni_class_t *cwlk;
vppjni_field_t *fwlk;
for (fwlk = field_head; fwlk != NULL; fwlk = fwlk->next) {
fwlk->jfield = NULL;
}
for (cwlk = class_head; cwlk != NULL; cwlk = cwlk->next) {
if (cwlk->jclass != NULL ) {
(*env)->DeleteGlobalRef(env, cwlk->jclass);
}
cwlk->jclass = NULL;
cwlk->jinit = NULL;
}
}