aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_dhcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_dhcp.py')
-rw-r--r--test/test_dhcp.py72
1 files changed, 62 insertions, 10 deletions
diff --git a/test/test_dhcp.py b/test/test_dhcp.py
index fce41a9b423..16b0f470b0a 100644
--- a/test/test_dhcp.py
+++ b/test/test_dhcp.py
@@ -9,7 +9,7 @@ from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
import scapy.compat
-from scapy.layers.l2 import Ether, getmacbyip, ARP
+from scapy.layers.l2 import Ether, getmacbyip, ARP, Dot1Q
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
from scapy.utils6 import in6_mactoifaceid
@@ -20,7 +20,10 @@ from scapy.layers.dhcp6 import DHCP6, DHCP6_Solicit, DHCP6_RelayForward, \
from socket import AF_INET, AF_INET6
from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_ptop
-from vpp_papi import mac_pton
+from vpp_papi import mac_pton, VppEnum
+from vpp_sub_interface import VppDot1QSubint
+from vpp_qos import VppQosEgressMap, VppQosMark
+
DHCP4_CLIENT_PORT = 68
DHCP4_SERVER_PORT = 67
@@ -210,7 +213,7 @@ class TestDHCP(VppTestCase):
data = self.validate_relay_options(pkt, intf, intf.local_ip4,
vpn_id, fib_id, oui)
- def verify_orig_dhcp_pkt(self, pkt, intf, l2_bc=True):
+ def verify_orig_dhcp_pkt(self, pkt, intf, dscp, l2_bc=True):
ether = pkt[Ether]
if l2_bc:
self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
@@ -226,14 +229,15 @@ class TestDHCP(VppTestCase):
else:
self.assertEqual(ip.dst, intf.remote_ip4)
self.assertEqual(ip.src, intf.local_ip4)
+ self.assertEqual(ip.tos, dscp)
udp = pkt[UDP]
self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
def verify_orig_dhcp_discover(self, pkt, intf, hostname, client_id=None,
- broadcast=True):
- self.verify_orig_dhcp_pkt(pkt, intf)
+ broadcast=True, dscp=0):
+ self.verify_orig_dhcp_pkt(pkt, intf, dscp)
self.verify_dhcp_msg_type(pkt, "discover")
self.verify_dhcp_has_option(pkt, "hostname", hostname)
@@ -249,8 +253,9 @@ class TestDHCP(VppTestCase):
def verify_orig_dhcp_request(self, pkt, intf, hostname, ip,
broadcast=True,
- l2_bc=True):
- self.verify_orig_dhcp_pkt(pkt, intf, l2_bc=l2_bc)
+ l2_bc=True,
+ dscp=0):
+ self.verify_orig_dhcp_pkt(pkt, intf, dscp, l2_bc=l2_bc)
self.verify_dhcp_msg_type(pkt, "request")
self.verify_dhcp_has_option(pkt, "hostname", hostname)
@@ -1229,6 +1234,7 @@ class TestDHCP(VppTestCase):
def test_dhcp_client(self):
""" DHCP Client"""
+ vdscp = VppEnum.vl_api_ip_dscp_t
hostname = 'universal-dp'
self.pg_enable_capture(self.pg_interfaces)
@@ -1316,17 +1322,20 @@ class TestDHCP(VppTestCase):
#
# Start the procedure again. this time have VPP send the client-ID
+ # and set the DSCP value
#
self.pg3.admin_down()
self.sleep(1)
self.pg3.admin_up()
self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname,
- client_id=self.pg3.local_mac)
+ client_id=self.pg3.local_mac,
+ dscp=vdscp.IP_API_DSCP_EF)
rx = self.pg3.get_capture(1)
self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname,
- self.pg3.local_mac)
+ self.pg3.local_mac,
+ dscp=vdscp.IP_API_DSCP_EF)
# TODO: VPP DHCP client should not accept DHCP OFFER message with
# the XID (Transaction ID) not matching the XID of the most recent
@@ -1339,7 +1348,8 @@ class TestDHCP(VppTestCase):
rx = self.pg3.get_capture(1)
self.verify_orig_dhcp_request(rx[0], self.pg3, hostname,
- self.pg3.local_ip4)
+ self.pg3.local_ip4,
+ dscp=vdscp.IP_API_DSCP_EF)
#
# unicast the ack to the offered address
@@ -1610,6 +1620,48 @@ class TestDHCP(VppTestCase):
#
self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname, is_add=0)
+ def test_dhcp_client_vlan(self):
+ """ DHCP Client w/ VLAN"""
+
+ vdscp = VppEnum.vl_api_ip_dscp_t
+ vqos = VppEnum.vl_api_qos_source_t
+ hostname = 'universal-dp'
+
+ self.pg_enable_capture(self.pg_interfaces)
+
+ vlan_100 = VppDot1QSubint(self, self.pg3, 100)
+ vlan_100.admin_up()
+
+ output = [scapy.compat.chb(4)] * 256
+ os = b''.join(output)
+ rows = [{'outputs': os},
+ {'outputs': os},
+ {'outputs': os},
+ {'outputs': os}]
+
+ qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
+ qm1 = VppQosMark(self, vlan_100, qem1,
+ vqos.QOS_API_SOURCE_VLAN).add_vpp_config()
+
+ #
+ # Configure DHCP client on PG3 and capture the discover sent
+ #
+ self.vapi.dhcp_client_config(vlan_100.sw_if_index,
+ hostname,
+ dscp=vdscp.IP_API_DSCP_EF)
+
+ rx = self.pg3.get_capture(1)
+
+ self.assertEqual(rx[0][Dot1Q].vlan, 100)
+ self.assertEqual(rx[0][Dot1Q].prio, 4)
+
+ self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname,
+ dscp=vdscp.IP_API_DSCP_EF)
+
+ self.vapi.dhcp_client_config(vlan_100.sw_if_index,
+ hostname,
+ is_add=0)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)