blob: 2fdc5b7fbfbd8ed4b8834f6c66793692161177d6 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
#!/usr/bin/env python
# Copyright (c) 2016 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Traffic script that receives an DHCP packet on given interface and check if
is correct DHCP DISCOVER message.
"""
import sys
from scapy.layers.inet import UDP_SERVICES
from resources.libraries.python.PacketVerifier import RxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
args = TrafficScriptArg(['rx_src_mac'], ['hostname'])
rx_if = args.get_arg('rx_if')
rx_src_mac = args.get_arg('rx_src_mac')
hostname = args.get_arg('hostname')
rx_dst_mac = 'ff:ff:ff:ff:ff:ff'
rx_src_ip = '0.0.0.0'
rx_dst_ip = '255.255.255.255'
boot_request = 1
dhcp_magic = 'c\x82Sc'
rxq = RxQueue(rx_if)
ether = rxq.recv(10)
if ether is None:
raise RuntimeError("DHCP DISCOVER Rx timeout.")
if ether.dst != rx_dst_mac:
raise RuntimeError("Destination MAC address error.")
print "Destination MAC address: OK."
if ether.src != rx_src_mac:
raise RuntimeError("Source MAC address error.")
print "Source MAC address: OK."
if ether['IP'].dst != rx_dst_ip:
raise RuntimeError("Destination IP address error.")
print "Destination IP address: OK."
if ether['IP'].src != rx_src_ip:
raise RuntimeError("Source IP address error.")
print "Source IP address: OK."
if ether['IP']['UDP'].dport != UDP_SERVICES.bootps:
raise RuntimeError("UDP destination port error.")
print "UDP destination port: OK."
if ether['IP']['UDP'].sport != UDP_SERVICES.bootpc:
raise RuntimeError("UDP source port error.")
print "UDP source port: OK."
bootp = ether['BOOTP']
if bootp.op != boot_request:
raise RuntimeError("BOOTP message type error.")
print "BOOTP message type: OK"
if bootp.ciaddr != '0.0.0.0':
raise RuntimeError("BOOTP client IP address error.")
print "BOOTP client IP address: OK"
if bootp.yiaddr != '0.0.0.0':
raise RuntimeError("BOOTP 'your' (client) IP address error.")
print "BOOTP 'your' (client) IP address: OK"
if bootp.siaddr != '0.0.0.0':
raise RuntimeError("BOOTP next server IP address error.")
print "BOOTP next server IP address: OK"
if bootp.giaddr != '0.0.0.0':
raise RuntimeError("BOOTP relay agent IP address error.")
print "BOOTP relay agent IP address: OK"
chaddr = bootp.chaddr[:bootp.hlen].encode('hex')
if chaddr != rx_src_mac.replace(':', ''):
raise RuntimeError("BOOTP client hardware address error.")
print "BOOTP client hardware address: OK"
# Check hostname
if bootp.sname != 64*'\x00':
raise RuntimeError("BOOTP server name error.")
print "BOOTP server name: OK"
# Check boot file
if bootp.file != 128*'\x00':
raise RuntimeError("BOOTP boot file name error.")
print "BOOTP boot file name: OK"
# Check bootp magic
if bootp.options != dhcp_magic:
raise RuntimeError("DHCP magic error.")
print "DHCP magic: OK"
# Check options
dhcp_options = ether['DHCP options'].options
# Option 12
hn = filter(lambda x: x[0] == 'hostname', dhcp_options)
if hostname:
try:
if hn[0][1] != hostname:
raise RuntimeError("Client's hostname doesn't match.")
except IndexError:
raise RuntimeError("Option list doesn't contain hostname option.")
else:
if len(hn) != 0:
raise RuntimeError("Option list contains hostname option.")
print "Option 12 hostname: OK"
# Option 53
mt = filter(lambda x: x[0] == 'message-type', dhcp_options)[0][1]
if mt != 1:
raise RuntimeError("Option 53 message-type error.")
print "Option 53 message-type: OK"
# Option 55
prl = filter(lambda x: x[0] == 'param_req_list', dhcp_options)[0][1]
if prl != '\x01\x1c\x02\x03\x0f\x06w\x0c,/\x1ay*':
raise RuntimeError("Option 55 param_req_list error.")
print "Option 55 param_req_list: OK"
# Option 255
if 'end' not in dhcp_options:
raise RuntimeError("end option error.")
print "end option: OK"
sys.exit(0)
if __name__ == "__main__":
main()
|