summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/dpkt-1.8.6/dpkt/ip6.py
blob: 38002fa6aa54699a31e84a1442370d7a6236ca81 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $

"""Internet Protocol, version 6."""

import dpkt

class IP6(dpkt.Packet):
    __hdr__ = (
        ('v_fc_flow', 'I', 0x60000000L),
        ('plen', 'H', 0),	# payload length (not including header)
        ('nxt', 'B', 0),	# next header protocol
        ('hlim', 'B', 0),	# hop limit
        ('src', '16s', ''),
        ('dst', '16s', '')
        )

    # XXX - to be shared with IP.  We cannot refer to the ip module
    # right now because ip.__load_protos() expects the IP6 class to be
    # defined.
    _protosw = None
    
    def _get_v(self):
        return self.v_fc_flow >> 28
    def _set_v(self, v):
        self.v_fc_flow = (self.v_fc_flow & ~0xf0000000L) | (v << 28)
    v = property(_get_v, _set_v)

    def _get_fc(self):
        return (self.v_fc_flow >> 20) & 0xff
    def _set_fc(self, v):
        self.v_fc_flow = (self.v_fc_flow & ~0xff00000L) | (v << 20)
    fc = property(_get_fc, _set_fc)

    def _get_flow(self):
        return self.v_fc_flow & 0xfffff
    def _set_flow(self, v):
        self.v_fc_flow = (self.v_fc_flow & ~0xfffff) | (v & 0xfffff)
    flow = property(_get_flow, _set_flow)

    def unpack(self, buf):
        dpkt.Packet.unpack(self, buf)
        self.extension_hdrs = dict(((i, None) for i in ext_hdrs))
        
        if self.plen:
            buf = self.data[:self.plen]
        else:  # due to jumbo payload or TSO
            buf = self.data
        
        next = self.nxt
        
        while (next in ext_hdrs):
            ext = ext_hdrs_cls[next](buf)
            self.extension_hdrs[next] = ext
            buf = buf[ext.length:]
            next = ext.nxt
        
        # set the payload protocol id
        setattr(self, 'p', next)
        
        try:
            self.data = self._protosw[next](buf)
            setattr(self, self.data.__class__.__name__.lower(), self.data)
        except (KeyError, dpkt.UnpackError):
            self.data = buf
    
    def headers_str(self):
        """
        Output extension headers in order defined in RFC1883 (except dest opts)
        """
        
        header_str = ""
        
        for hdr in ext_hdrs:
            if not self.extension_hdrs[hdr] is None:
                header_str += str(self.extension_hdrs[hdr])
        return header_str
        

    def __str__(self):
        if (self.nxt == 6 or self.nxt == 17 or self.nxt == 58) and \
               not self.data.sum:
            # XXX - set TCP, UDP, and ICMPv6 checksums
            p = str(self.data)
            s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.nxt, len(p))
            s = dpkt.in_cksum_add(0, s)
            s = dpkt.in_cksum_add(s, p)
            try:
                self.data.sum = dpkt.in_cksum_done(s)
            except AttributeError:
                pass
        return self.pack_hdr() + self.headers_str() + str(self.data)

    def set_proto(cls, p, pktclass):
        cls._protosw[p] = pktclass
    set_proto = classmethod(set_proto)

    def get_proto(cls, p):
        return cls._protosw[p]
    get_proto = classmethod(get_proto)

import ip
# We are most likely still in the middle of ip.__load_protos() which
# implicitly loads this module through __import__(), so the content of
# ip.IP._protosw is still incomplete at the moment.  By sharing the
# same dictionary by reference as opposed to making a copy, when
# ip.__load_protos() finishes, we will also automatically get the most
# up-to-date dictionary.
IP6._protosw = ip.IP._protosw

class IP6ExtensionHeader(dpkt.Packet): 
    """
    An extension header is very similar to a 'sub-packet'.
    We just want to re-use all the hdr unpacking etc.
    """
    pass
    
class IP6OptsHeader(IP6ExtensionHeader):
    __hdr__ = (
        ('nxt', 'B', 0),           # next extension header protocol
        ('len', 'B', 0)            # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header
        )
        
    def unpack(self, buf):
        dpkt.Packet.unpack(self, buf)       
        setattr(self, 'length', (self.len + 1) * 8)
        options = []
        
        index = 0
        
        while (index < self.length - 2):
            opt_type = ord(self.data[index])
            
            # PAD1 option
            if opt_type == 0:                    
                index += 1
                continue;
            
            opt_length = ord(self.data[index + 1])
            
            if opt_type == 1: # PADN option
                # PADN uses opt_length bytes in total
                index += opt_length + 2
                continue
            
            options.append({'type': opt_type, 'opt_length': opt_length, 'data': self.data[index + 2:index + 2 + opt_length]})
            
            # add the two chars and the option_length, to move to the next option
            index += opt_length + 2            
        
        setattr(self, 'options', options)

class IP6HopOptsHeader(IP6OptsHeader): pass
    
class IP6DstOptsHeader(IP6OptsHeader): pass    
    
class IP6RoutingHeader(IP6ExtensionHeader):
    __hdr__ = (
        ('nxt', 'B', 0),            # next extension header protocol
        ('len', 'B', 0),            # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0)
        ('type', 'B', 0),           # routing type (currently, only 0 is used)
        ('segs_left', 'B', 0),      # remaining segments in route, until destination (<= 23)
        ('rsvd_sl_bits', 'I', 0),   # reserved (1 byte), strict/loose bitmap for addresses
        )

    def _get_sl_bits(self):
        return self.rsvd_sl_bits & 0xffffff
    def _set_sl_bits(self, v):
        self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
    sl_bits = property(_get_sl_bits, _set_sl_bits)
    
    def unpack(self, buf):
        hdr_size = 8
        addr_size = 16
        
        dpkt.Packet.unpack(self, buf)
        
        addresses = []
        num_addresses = self.len / 2
        buf = buf[hdr_size:hdr_size + num_addresses * addr_size]
        
        for i in range(num_addresses):
            addresses.append(buf[i * addr_size: i * addr_size + addr_size])
        
        self.data = buf
        setattr(self, 'addresses', addresses)
        setattr(self, 'length', self.len * 8 + 8)

class IP6FragmentHeader(IP6ExtensionHeader):
    __hdr__ = (
        ('nxt', 'B', 0),             # next extension header protocol
        ('resv', 'B', 0),            # reserved, set to 0
        ('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag
        ('id', 'I', 0)               # fragments id
        )
        
    def unpack(self, buf):
        dpkt.Packet.unpack(self, buf)
        setattr(self, 'length', self.__hdr_len__)
        
    def _get_frag_off(self):
        return self.frag_off_resv_m >> 3
    def _set_frag_off(self, v):
        self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
    frag_off = property(_get_frag_off, _set_frag_off)
    
    def _get_m_flag(self):
        return self.frag_off_resv_m & 1
    def _set_m_flag(self, v):
        self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfffe) | v
    m_flag = property(_get_m_flag, _set_m_flag)

class IP6AHHeader(IP6ExtensionHeader):
    __hdr__ = (
        ('nxt', 'B', 0),             # next extension header protocol
        ('len', 'B', 0),             # length of header in 4 octet units (ignoring first 2 units)
        ('resv', 'H', 0),            # reserved, 2 bytes of 0
        ('spi', 'I', 0),             # SPI security parameter index
        ('seq', 'I', 0)              # sequence no.
        )
        
    def unpack(self, buf):
        dpkt.Packet.unpack(self, buf)
        setattr(self, 'length', (self.len + 2) * 4)
        setattr(self, 'auth_data', self.data[:(self.len - 1) * 4])
    
    
class IP6ESPHeader(IP6ExtensionHeader):
    def unpack(self, buf):
        raise NotImplementedError("ESP extension headers are not supported.")


ext_hdrs = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, ip.IP_PROTO_DSTOPTS]
ext_hdrs_cls = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader, 
                ip.IP_PROTO_ROUTING: IP6RoutingHeader,
                ip.IP_PROTO_FRAGMENT: IP6FragmentHeader, 
                ip.IP_PROTO_ESP: IP6ESPHeader, 
                ip.IP_PROTO_AH: IP6AHHeader, 
                ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}

if __name__ == '__main__':
    import unittest

    class IP6TestCase(unittest.TestCase):
        
        def test_IP6(self):
            s = '`\x00\x00\x00\x00(\x06@\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11$\xff\xfe\x8c\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80r\xcd\xca\x00\x16\x04\x84F\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\t\x00\x00\x02\x04\x05\xa0\x01\x03\x03\x00\x01\x01\x08\n}\x185?\x00\x00\x00\x00'
            ip = IP6(s)
            #print `ip`
            ip.data.sum = 0
            s2 = str(ip)
            ip2 = IP6(s)
            #print `ip2`
            assert(s == s2)
            
        def test_IP6RoutingHeader(self):
            s = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
            ip = IP6(s)
            s2 = str(ip)
            # 43 is Routing header id
            assert(len(ip.extension_hdrs[43].addresses) == 2)
            assert(ip.tcp)
            assert(s == s2)
            
            
        def test_IP6FragmentHeader(self):
            s = '\x06\xee\xff\xfb\x00\x00\xff\xff'
            fh = IP6FragmentHeader(s)
            s2 = str(fh)
            assert(fh.nxt == 6)
            assert(fh.id == 65535)
            assert(fh.frag_off == 8191)
            assert(fh.m_flag == 1)
            
        def test_IP6OptionsHeader(self):
            s = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
            options = IP6OptsHeader(s).options
            assert(len(options) == 3)
            
        def test_IP6AHHeader(self):
            s = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
            ah = IP6AHHeader(s)
            assert(ah.length == 24)
            assert(ah.auth_data == 'xxxxxxxx')
            assert(ah.spi == 0x2020202)
            assert(ah.seq == 0x1010101)
            
        def test_IP6ExtensionHeaders(self):
            p = '`\x00\x00\x00\x00<+@ H\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca G\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02\x00\x00\x00\x00 \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91\x7f\x00\x00'
            ip = IP6(p)
            
            o = ';\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00'
            options = IP6HopOptsHeader(o)

            ip.extension_hdrs[0] = options
            
            fh = '\x06\xee\xff\xfb\x00\x00\xff\xff'
            ip.extension_hdrs[44] = IP6FragmentHeader(fh)
            
            ah = ';\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
            ip.extension_hdrs[51] = IP6AHHeader(ah)
            
            do = ';\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
            ip.extension_hdrs[60] = IP6DstOptsHeader(do)
            
            assert(len([k for k in ip.extension_hdrs if (not ip.extension_hdrs[k] is None)]) == 5)
            
    unittest.main()