1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
# $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $
# -*- coding: utf-8 -*-
"""Internet Protocol, version 6."""
import dpkt
from decorators import deprecated
class IP6(dpkt.Packet):
__hdr__ = (
('v_fc_flow', 'I', 0x60000000L),
('plen', 'H', 0), # payload length (not including header)
('nxt', 'B', 0), # next header protocol
('hlim', 'B', 0), # hop limit
('src', '16s', ''),
('dst', '16s', '')
)
# XXX - to be shared with IP. We cannot refer to the ip module
# right now because ip.__load_protos() expects the IP6 class to be
# defined.
_protosw = None
@property
def v(self):
return self.v_fc_flow >> 28
@v.setter
def v(self, v):
self.v_fc_flow = (self.v_fc_flow & ~0xf0000000L) | (v << 28)
@property
def fc(self):
return (self.v_fc_flow >> 20) & 0xff
@fc.setter
def fc(self, v):
self.v_fc_flow = (self.v_fc_flow & ~0xff00000L) | (v << 20)
@property
def flow(self):
return self.v_fc_flow & 0xfffff
@flow.setter
def flow(self, v):
self.v_fc_flow = (self.v_fc_flow & ~0xfffff) | (v & 0xfffff)
# Deprecated methods, will be removed in the future
# =================================================
@deprecated
def _get_v(self):
return self.v
@deprecated
def _set_v(self, v):
self.v = v
@deprecated
def _get_fc(self):
return self.fc
@deprecated
def _set_fc(self, v):
self.rc = v
@deprecated
def _get_flow(self):
return self.flow
@deprecated
def _set_flow(self, v):
self.flow = v
# =================================================
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.extension_hdrs = dict(((i, None) for i in ext_hdrs))
if self.plen:
buf = self.data[:self.plen]
else: # due to jumbo payload or TSO
buf = self.data
next = self.nxt
while next in ext_hdrs:
ext = ext_hdrs_cls[next](buf)
self.extension_hdrs[next] = ext
buf = buf[ext.length:]
next = ext.nxt
# set the payload protocol id
setattr(self, 'p', next)
try:
self.data = self._protosw[next](buf)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
self.data = buf
def headers_str(self):
"""Output extension headers in order defined in RFC1883 (except dest opts)"""
header_str = ""
for hdr in ext_hdrs:
if not self.extension_hdrs[hdr] is None:
header_str += str(self.extension_hdrs[hdr])
return header_str
def __str__(self):
if (self.nxt == 6 or self.nxt == 17 or self.nxt == 58) and not self.data.sum:
# XXX - set TCP, UDP, and ICMPv6 checksums
p = str(self.data)
s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.nxt, len(p))
s = dpkt.in_cksum_add(0, s)
s = dpkt.in_cksum_add(s, p)
try:
self.data.sum = dpkt.in_cksum_done(s)
except AttributeError:
pass
return self.pack_hdr() + self.headers_str() + str(self.data)
@classmethod
def set_proto(cls, p, pktclass):
cls._protosw[p] = pktclass
@classmethod
def get_proto(cls, p):
return cls._protosw[p]
import ip
# We are most likely still in the middle of ip.__load_protos() which
# implicitly loads this module through __import__(), so the content of
# ip.IP._protosw is still incomplete at the moment. By sharing the
# same dictionary by reference as opposed to making a copy, when
# ip.__load_protos() finishes, we will also automatically get the most
# up-to-date dictionary.
IP6._protosw = ip.IP._protosw
class IP6ExtensionHeader(dpkt.Packet):
"""
An extension header is very similar to a 'sub-packet'.
We just want to re-use all the hdr unpacking etc.
"""
pass
class IP6OptsHeader(IP6ExtensionHeader):
__hdr__ = (
('nxt', 'B', 0), # next extension header protocol
('len', 'B', 0) # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
setattr(self, 'length', (self.len + 1) * 8)
options = []
index = 0
while index < self.length - 2:
opt_type = ord(self.data[index])
# PAD1 option
if opt_type == 0:
index += 1
continue
opt_length = ord(self.data[index + 1])
if opt_type == 1: # PADN option
# PADN uses opt_length bytes in total
index += opt_length + 2
continue
options.append(
{'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]})
# add the two chars and the option_length, to move to the next option
index += opt_length + 2
setattr(self, 'options', options)
class IP6HopOptsHeader(IP6OptsHeader): pass
class IP6DstOptsHeader(IP6OptsHeader): pass
class IP6RoutingHeader(IP6ExtensionHeader):
__hdr__ = (
('nxt', 'B', 0), # next extension header protocol
('len', 'B', 0), # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0)
('type', 'B', 0), # routing type (currently, only 0 is used)
('segs_left', 'B', 0), # remaining segments in route, until destination (<= 23)
('rsvd_sl_bits', 'I', 0), # reserved (1 byte), strict/loose bitmap for addresses
)
@property
def sl_bits(self):
return self.rsvd_sl_bits & 0xffffff
@sl_bits.setter
def sl_bits(self, v):
self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
# Deprecated methods, will be removed in the future
# =================================================
def _get_sl_bits(self): return self.sl_bits
def _set_sl_bits(self, v): self.sl_bits = v
# =================================================
def unpack(self, buf):
hdr_size = 8
addr_size = 16
dpkt.Packet.unpack(self, buf)
addresses = []
num_addresses = self.len / 2
buf = buf[hdr_size:hdr_size + num_addresses * addr_size]
for i in range(num_addresses):
addresses.append(buf[i * addr_size: i * addr_size + addr_size])
self.data = buf
setattr(self, 'addresses', addresses)
setattr(self, 'length', self.len * 8 + 8)
class IP6FragmentHeader(IP6ExtensionHeader):
__hdr__ = (
('nxt', 'B', 0), # next extension header protocol
('resv', 'B', 0), # reserved, set to 0
('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag
('id', 'I', 0) # fragments id
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
setattr(self, 'length', self.__hdr_len__)
@property
def frag_off(self):
return self.frag_off_resv_m >> 3
@frag_off.setter
def frag_off(self, v):
self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
@property
def m_flag(self):
return self.frag_off_resv_m & 1
@m_flag.setter
def m_flag(self, v):
self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfffe) | v
# Deprecated methods, will be removed in the future
# =================================================
@deprecated
def _get_frag_off(self): return self.flag_off
@deprecated
def _set_frag_off(self, v): self.flag_off = v
@deprecated
def _get_m_flag(self): return self.m_flag
@deprecated
def _set_m_flag(self, v): self.m_flag = v
# =================================================
class IP6AHHeader(IP6ExtensionHeader):
__hdr__ = (
('nxt', 'B', 0), # next extension header protocol
('len', 'B', 0), # length of header in 4 octet units (ignoring first 2 units)
('resv', 'H', 0), # reserved, 2 bytes of 0
('spi', 'I', 0), # SPI security parameter index
('seq', 'I', 0) # sequence no.
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
setattr(self, 'length', (self.len + 2) * 4)
setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
class IP6ESPHeader(IP6ExtensionHeader):
def unpack(self, buf):
raise NotImplementedError("ESP extension headers are not supported.")
ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP,
ip.IP_PROTO_DSTOPTS]
ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader,
ip.IP_PROTO_ROUTING: IP6RoutingHeader,
ip.IP_PROTO_FRAGMENT: IP6FragmentHeader,
ip.IP_PROTO_ESP: IP6ESPHeader,
ip.IP_PROTO_AH: IP6AHHeader,
ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}
def test_ipg():
s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00'
_ip = IP6(s)
# print `ip`
_ip.data.sum = 0
s2 = str(_ip)
IP6(s)
# print `ip2`
assert (s == s2)
def test_ip6_routing_header():
s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
ip = IP6(s)
s2 = str(ip)
# 43 is Routing header id
assert (len(ip.extension_hdrs[43].addresses) == 2)
assert (ip.tcp)
assert (s == s2)
def test_ip6_fragment_header():
s = '\x06\xee\xff\xfb\x00\x00\xff\xff'
fh = IP6FragmentHeader(s)
# s2 = str(fh) variable 's2' is not used
str(fh)
assert (fh.nxt == 6)
assert (fh.id == 65535)
assert (fh.frag_off == 8191)
assert (fh.m_flag == 1)
def test_ip6_options_header():
s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
options = IP6OptsHeader(s).options
assert (len(options) == 3)
def test_ip6_ah_header():
s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
ah = IP6AHHeader(s)
assert (ah.length == 24)
assert (ah.auth_data == 'xxxxxxxx')
assert (ah.spi == 0x2020202)
assert (ah.seq == 0x1010101)
def test_ip6_extension_headers():
p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
ip = IP6(p)
o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
options = IP6HopOptsHeader(o)
ip.extension_hdrs[0] = options
fh = '\x06\xee\xff\xfb\x00\x00\xff\xff'
ip.extension_hdrs[44] = IP6FragmentHeader(fh)
ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
ip.extension_hdrs[51] = IP6AHHeader(ah)
do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
ip.extension_hdrs[60] = IP6DstOptsHeader(do)
assert (len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
if __name__ == '__main__':
test_ipg()
test_ip6_routing_header()
test_ip6_fragment_header()
test_ip6_options_header()
test_ip6_ah_header()
test_ip6_extension_headers()
print 'Tests Successful...'
|