summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
committerimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
commitb89efa188810bf95a9d245e69e2961b5721c3b0f (patch)
tree454273ac6c4ae972ebb8a2c86b893296970b4fa9 /scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py
parentf72c6df9d2e9998ae1f3529d729ab7930b35785a (diff)
scapy python 2/3
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py')
-rw-r--r--scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py3047
1 files changed, 0 insertions, 3047 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py b/scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py
deleted file mode 100644
index c2e4a037..00000000
--- a/scripts/external_libs/scapy-python3-0.18/scapy/layers/inet6.py
+++ /dev/null
@@ -1,3047 +0,0 @@
-#! /usr/bin/env python
-#############################################################################
-## ##
-## inet6.py --- IPv6 support for Scapy ##
-## see http://natisbad.org/IPv6/ ##
-## for more informations ##
-## ##
-## Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> ##
-## Arnaud Ebalard <arnaud.ebalard@eads.net> ##
-## ##
-## This program is free software; you can redistribute it and/or modify it ##
-## under the terms of the GNU General Public License version 2 as ##
-## published by the Free Software Foundation. ##
-## ##
-## This program is distributed in the hope that it will be useful, but ##
-## WITHOUT ANY WARRANTY; without even the implied warranty of ##
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
-## General Public License for more details. ##
-## ##
-#############################################################################
-
-"""
-IPv6 (Internet Protocol v6).
-"""
-
-
-import socket
-if not socket.has_ipv6:
- raise socket.error("can't use AF_INET6, IPv6 is disabled")
-if not hasattr(socket, "IPPROTO_IPV6"):
- # Workaround for http://bugs.python.org/issue6926
- socket.IPPROTO_IPV6 = 41
-
-if not hasattr(socket, "IPPROTO_IPIP"):
- socket.IPPROTO_IPIP = 4
-
-if not ('IPPROTO_IPIP ' in globals()):
- IPPROTO_IPIP=4
-
-
-
-from scapy.config import conf
-from scapy.layers.l2 import *
-from scapy.layers.inet import *
-from scapy.fields import *
-from scapy.packet import *
-from scapy.volatile import *
-from scapy.sendrecv import sr,sr1,srp1
-from scapy.as_resolvers import AS_resolver_riswhois
-from scapy.supersocket import SuperSocket,L3RawSocket
-from scapy.arch import *
-from scapy.utils6 import *
-
-
-#############################################################################
-# Helpers ##
-#############################################################################
-
-def get_cls(name, fallback_cls):
- return globals().get(name, fallback_cls)
-
-
-##########################
-## Neighbor cache stuff ##
-##########################
-
-conf.netcache.new_cache("in6_neighbor", 120)
-
-def neighsol(addr, src, iface, timeout=1, chainCC=0):
- """
- Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
- of the neighbor with specified IPv6 address addr. 'src' address is
- used as source of the message. Message is sent on iface. By default,
- timeout waiting for an answer is 1 second.
-
- If no answer is gathered, None is returned. Else, the answer is
- returned (ethernet frame).
- """
-
- nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
- d = inet_ntop(socket.AF_INET6, nsma)
- dm = in6_getnsmac(nsma)
- p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
- p /= ICMPv6ND_NS(tgt=addr)
- p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
- res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
- chainCC=chainCC)
-
- return res
-
-def getmacbyip6(ip6, chainCC=0):
- """
- Returns the mac address to be used for provided 'ip6' peer.
- neighborCache.get() method is used on instantiated neighbor cache.
- Resolution mechanism is described in associated doc string.
-
- (chainCC parameter value ends up being passed to sending function
- used to perform the resolution, if needed)
- """
-
- if in6_ismaddr(ip6): # Multicast
- mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
- return mac
-
- iff,a,nh = conf.route6.route(ip6, dev=conf.iface6)
-
- if iff == LOOPBACK_NAME:
- return "ff:ff:ff:ff:ff:ff"
-
- if nh != '::':
- ip6 = nh # Found next hop
-
- mac = conf.netcache.in6_neighbor.get(ip6)
- if mac:
- return mac
-
- res = neighsol(ip6, a, iff, chainCC=chainCC)
-
- if res is not None:
- if ICMPv6NDOptDstLLAddr in res:
- mac = res[ICMPv6NDOptDstLLAddr].lladdr
- else:
- mac = res.src
- conf.netcache.in6_neighbor[ip6] = mac
- return mac
-
- return None
-
-
-#############################################################################
-#############################################################################
-### IPv6 addresses manipulation routines ###
-#############################################################################
-#############################################################################
-
-class Net6(Gen): # syntax ex. fec0::/126
- """Generate a list of IPv6s from a network address or a name"""
- name = "ipv6"
- ipaddress = re.compile(r"^([a-fA-F0-9:]+)(/[1]?[0-3]?[0-9])?$")
-
- def __init__(self, net):
- self.repr = net
-
- tmp = net.split('/')+["128"]
- if not self.ipaddress.match(net):
- tmp[0]=socket.getaddrinfo(tmp[0], None, socket.AF_INET6)[0][-1][0]
-
- netmask = int(tmp[1])
- self.net = inet_pton(socket.AF_INET6, tmp[0])
- self.mask = in6_cidr2mask(netmask)
- self.plen = netmask
-
- def __iter__(self):
- def m8(i):
- if i % 8 == 0:
- return i
- #tuple = filter(lambda x: m8(x), range(8, 129))
- tuple = [ x for x in range(8, 129) if m8(x) ]
-
- a = in6_and(self.net, self.mask)
- tmp = map(lambda x: x, struct.unpack('16B', a))
-
- def parse_digit(a, netmask):
- netmask = min(8,max(netmask,0))
- a = (int(a) & (0xff<<netmask),(int(a) | (0xff>>(8-netmask)))+1)
- return a
- self.parsed = map(lambda x,y: parse_digit(x,y), tmp, map(lambda x,nm=self.plen: x-nm, tuple))
-
- def rec(n, l):
- if n and n % 2 == 0:
- sep = ':'
- else:
- sep = ''
- if n == 16:
- return l
- else:
- ll = []
- for i in range(*self.parsed[n]):
- for y in l:
- ll += [y+sep+'%.2x'%i]
- return rec(n+1, ll)
-
- return iter(rec(0, ['']))
-
- def __repr__(self):
- return "<Net6 %s>" % self.repr
-
-
-
-
-
-
-#############################################################################
-#############################################################################
-### IPv6 Class ###
-#############################################################################
-#############################################################################
-
-class IP6Field(Field):
- def __init__(self, name, default):
- Field.__init__(self, name, default, "16s")
- def h2i(self, pkt, x):
- if type(x) is str:
- try:
- x = in6_ptop(x)
- except socket.error:
- x = Net6(x)
- elif type(x) is list:
- x = map(Net6, x)
- return x
- def i2m(self, pkt, x):
- return inet_pton(socket.AF_INET6, x)
- def m2i(self, pkt, x):
- return inet_ntop(socket.AF_INET6, x)
- def any2i(self, pkt, x):
- return self.h2i(pkt,x)
- def i2repr(self, pkt, x):
- if x is None:
- return self.i2h(pkt,x)
- elif not isinstance(x, Net6) and not type(x) is list:
- if in6_isaddrTeredo(x): # print Teredo info
- server, flag, maddr, mport = teredoAddrExtractInfo(x)
- return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr,mport)
- elif in6_isaddr6to4(x): # print encapsulated address
- vaddr = in6_6to4ExtractAddr(x)
- return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
- return self.i2h(pkt, x) # No specific information to return
- def randval(self):
- return RandIP6()
-
-class SourceIP6Field(IP6Field):
- def __init__(self, name, dstname):
- IP6Field.__init__(self, name, None)
- self.dstname = dstname
- def i2m(self, pkt, x):
- if x is None:
- dst=getattr(pkt,self.dstname)
- iff,x,nh = conf.route6.route(dst)
- return IP6Field.i2m(self, pkt, x)
- def i2h(self, pkt, x):
- if x is None:
- dst=getattr(pkt,self.dstname)
- if isinstance(dst,Gen):
- r = map(conf.route6.route, dst)
- r.sort()
- if r[0] == r[-1]:
- x=r[0][1]
- else:
- warning("More than one possible route for %s"%repr(dst))
- return None
- else:
- iff,x,nh = conf.route6.route(dst)
- return IP6Field.i2h(self, pkt, x)
-
-ipv6nh = { 0:"Hop-by-Hop Option Header",
- 4:"IP",
- 6:"TCP",
- 17:"UDP",
- 41:"IPv6",
- 43:"Routing Header",
- 44:"Fragment Header",
- 47:"GRE",
- 50:"ESP Header",
- 51:"AH Header",
- 58:"ICMPv6",
- 59:"No Next Header",
- 60:"Destination Option Header",
- 135:"Mobility Header"}
-
-ipv6nhcls = { 0: "IPv6ExtHdrHopByHop",
- 4: "IP",
- 6: "TCP",
- 17: "UDP",
- 43: "IPv6ExtHdrRouting",
- 44: "IPv6ExtHdrFragment",
- #50: "IPv6ExtHrESP",
- #51: "IPv6ExtHdrAH",
- 58: "ICMPv6Unknown",
- 59: "Raw",
- 60: "IPv6ExtHdrDestOpt" }
-
-class IP6ListField(StrField):
- islist = 1
- def __init__(self, name, default, count_from=None, length_from=None):
- if default is None:
- default = []
- StrField.__init__(self, name, default)
- self.count_from = count_from
- self.length_from = length_from
-
- def i2len(self, pkt, i):
- return 16*len(i)
-
- def i2count(self, pkt, i):
- if type(i) is list:
- return len(i)
- return 0
-
- def getfield(self, pkt, s):
- c = l = None
- if self.length_from is not None:
- l = self.length_from(pkt)
- elif self.count_from is not None:
- c = self.count_from(pkt)
-
- lst = []
- ret = b""
- remain = s
- if l is not None:
- remain,ret = s[:l],s[l:]
- while remain:
- if c is not None:
- if c <= 0:
- break
- c -= 1
- addr = inet_ntop(socket.AF_INET6, remain[:16])
- lst.append(addr)
- remain = remain[16:]
- return remain+ret,lst
-
- def i2m(self, pkt, x):
- s = b''
- for y in x:
- try:
- y = inet_pton(socket.AF_INET6, y)
- except:
- y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
- y = inet_pton(socket.AF_INET6, y)
- s += y
- return s
-
- def i2repr(self,pkt,x):
- s = []
- if x == None:
- return "[]"
- for y in x:
- s.append('%s' % y)
- return "[ %s ]" % (", ".join(s))
-
-class _IPv6GuessPayload:
- name = "Dummy class that implements guess_payload_class() for IPv6"
- def default_payload_class(self,p):
- if self.nh == 58: # ICMPv6
- #t = ord(p[0])
- t = p[0]
- if len(p) > 2 and t == 139 or t == 140: # Node Info Query
- return _niquery_guesser(p)
- if len(p) >= icmp6typesminhdrlen.get(t, sys.maxsize): # Other ICMPv6 messages
- return get_cls(icmp6typescls.get(t,"Raw"), "Raw")
- return Raw
- elif self.nh == 135 and len(p) > 3: # Mobile IPv6
- #return _mip6_mhtype2cls.get(ord(p[2]), MIP6MH_Generic)
- return _mip6_mhtype2cls.get(p[2], MIP6MH_Generic)
- else:
- return get_cls(ipv6nhcls.get(self.nh,"Raw"), "Raw")
-
-class IPv6(_IPv6GuessPayload, Packet, IPTools):
- name = "IPv6"
- fields_desc = [ BitField("version" , 6 , 4),
- BitField("tc", 0, 8), #TODO: IPv6, ByteField ?
- BitField("fl", 0, 20),
- ShortField("plen", None),
- ByteEnumField("nh", 59, ipv6nh),
- ByteField("hlim", 64),
- IP6Field("src", "::2"),
- #SourceIP6Field("src", "dst"), # dst is for src @ selection
- IP6Field("dst", "::1") ]
-
- def route(self):
- dst = self.dst
- if isinstance(dst,Gen):
- dst = next(iter(dst))
- return conf.route6.route(dst)
-
- def mysummary(self):
- return "%s > %s (%i)" % (self.src,self.dst, self.nh)
-
- def post_build(self, p, pay):
- p += pay
- if self.plen is None:
- l = len(p) - 40
- p = p[:4]+struct.pack("!H", l)+p[6:]
- return p
-
- def extract_padding(self, s):
- l = self.plen
- return s[:l], s[l:]
-
- def hashret(self):
- if self.nh == 58 and isinstance(self.payload, _ICMPv6):
- if self.payload.type < 128:
- return self.payload.payload.hashret()
- elif (self.payload.type in [133,134,135,136,144,145]):
- return struct.pack("B", self.nh)+self.payload.hashret()
-
- nh = self.nh
- sd = self.dst
- ss = self.src
- if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
- # With routing header, the destination is the last
- # address of the IPv6 list if segleft > 0
- nh = self.payload.nh
- try:
- sd = self.addresses[-1]
- except IndexError:
- sd = '::1'
- # TODO: big bug with ICMPv6 error messages as the destination of IPerror6
- # could be anything from the original list ...
- if 1:
- sd = inet_pton(socket.AF_INET6, sd)
- for a in self.addresses:
- a = inet_pton(socket.AF_INET6, a)
- sd = strxor(sd, a)
- sd = inet_ntop(socket.AF_INET6, sd)
-
- if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
- nh = self.payload.nh
-
- if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
- nh = self.payload.nh
-
- if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
- foundhao = None
- for o in self.payload.options:
- if isinstance(o, HAO):
- foundhao = o
- if foundhao:
- nh = self.payload.nh # XXX what if another extension follows ?
- ss = foundhao.hoa
-
- if conf.checkIPsrc and conf.checkIPaddr:
- sd = inet_pton(socket.AF_INET6, sd)
- ss = inet_pton(socket.AF_INET6, self.src)
- return struct.pack("B",nh)+self.payload.hashret()
- else:
- return struct.pack("B", nh)+self.payload.hashret()
-
- def answers(self, other):
- if not isinstance(other, IPv6): # self is reply, other is request
- return False
- if conf.checkIPaddr:
- ss = inet_pton(socket.AF_INET6, self.src)
- sd = inet_pton(socket.AF_INET6, self.dst)
- os = inet_pton(socket.AF_INET6, other.src)
- od = inet_pton(socket.AF_INET6, other.dst)
- # request was sent to a multicast address (other.dst)
- # Check reply destination addr matches request source addr (i.e
- # sd == os) except when reply is multicasted too
- # XXX test mcast scope matching ?
- if in6_ismaddr(other.dst):
- if in6_ismaddr(self.dst):
- if ((od == sd) or
- (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))):
- return self.payload.answers(other.payload)
- return False
- if (os == sd):
- return self.payload.answers(other.payload)
- return False
- elif (sd != os): # or ss != od): <- removed for ICMP errors
- return False
- if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128:
- # ICMPv6 Error message -> generated by IPv6 packet
- # Note : at the moment, we jump the ICMPv6 specific class
- # to call answers() method of erroneous packet (over
- # initial packet). There can be cases where an ICMPv6 error
- # class could implement a specific answers method that perform
- # a specific task. Currently, don't see any use ...
- return self.payload.payload.answers(other)
- elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
- return self.payload.answers(other.payload.payload)
- elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
- return self.payload.answers(other.payload.payload)
- elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
- return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting
- elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
- return self.payload.payload.answers(other.payload.payload)
- elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance
- return self.payload.payload.answers(other.payload)
- else:
- if (self.nh != other.nh):
- return False
- return self.payload.answers(other.payload)
-
-
-conf.neighbor.register_l3(Ether, IPv6, lambda l2,l3: getmacbyip6(l3.dst))
-
-
-class IPerror6(IPv6):
- name = "IPv6 in ICMPv6"
- def answers(self, other):
- if not isinstance(other, IPv6):
- return False
- sd = inet_pton(socket.AF_INET6, self.dst)
- ss = inet_pton(socket.AF_INET6, self.src)
- od = inet_pton(socket.AF_INET6, other.dst)
- os = inet_pton(socket.AF_INET6, other.src)
-
- # Make sure that the ICMPv6 error is related to the packet scapy sent
- if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
-
- # find upper layer for self (possible citation)
- selfup = self.payload
- while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
- selfup = selfup.payload
-
- # find upper layer for other (initial packet). Also look for RH
- otherup = other.payload
- request_has_rh = False
- while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
- if isinstance(otherup, IPv6ExtHdrRouting):
- request_has_rh = True
- otherup = otherup.payload
-
- if ((ss == os and sd == od) or # <- Basic case
- (ss == os and request_has_rh)): # <- Request has a RH :
- # don't check dst address
-
- # Let's deal with possible MSS Clamping
- if (isinstance(selfup, TCP) and
- isinstance(otherup, TCP) and
- selfup.options != otherup.options): # seems clamped
-
- # Save fields modified by MSS clamping
- old_otherup_opts = otherup.options
- old_otherup_cksum = otherup.chksum
- old_otherup_dataofs = otherup.dataofs
- old_selfup_opts = selfup.options
- old_selfup_cksum = selfup.chksum
- old_selfup_dataofs = selfup.dataofs
-
- # Nullify them
- otherup.options = []
- otherup.chksum = 0
- otherup.dataofs = 0
- selfup.options = []
- selfup.chksum = 0
- selfup.dataofs = 0
-
- # Test it and save result
- s1 = bytes(selfup)
- s2 = bytes(otherup)
- l = min(len(s1), len(s2))
- res = s1[:l] == s2[:l]
-
- # recall saved values
- otherup.options = old_otherup_opts
- otherup.chksum = old_otherup_cksum
- otherup.dataofs = old_otherup_dataofs
- selfup.options = old_selfup_opts
- selfup.chksum = old_selfup_cksum
- selfup.dataofs = old_selfup_dataofs
-
- return res
-
- s1 = bytes(selfup)
- s2 = bytes(otherup)
- l = min(len(s1), len(s2))
- return s1[:l] == s2[:l]
-
- return False
-
- def mysummary(self):
- return Packet.mysummary(self)
-
-
-#############################################################################
-#############################################################################
-### Upper Layer Checksum computation ###
-#############################################################################
-#############################################################################
-
-class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
- name = "Pseudo IPv6 Header"
- fields_desc = [ IP6Field("src", "::"),
- IP6Field("dst", "::"),
- ShortField("uplen", None),
- BitField("zero", 0, 24),
- ByteField("nh", 0) ]
-
-def in6_chksum(nh, u, p):
- """
- Performs IPv6 Upper Layer checksum computation. Provided parameters are:
-
- - 'nh' : value of upper layer protocol
- - 'u' : upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
- provided with all under layers (IPv6 and all extension headers,
- for example)
- - 'p' : the payload of the upper layer provided as a string
-
- Functions operate by filling a pseudo header class instance (PseudoIPv6)
- with
- - Next Header value
- - the address of _final_ destination (if some Routing Header with non
- segleft field is present in underlayer classes, last address is used.)
- - the address of _real_ source (basically the source address of an
- IPv6 class instance available in the underlayer or the source address
- in HAO option if some Destination Option header found in underlayer
- includes this option).
- - the length is the length of provided payload string ('p')
- """
-
- ph6 = PseudoIPv6()
- ph6.nh = nh
- rthdr = 0
- hahdr = 0
- final_dest_addr_found = 0
- while u != None and not isinstance(u, IPv6):
- if (isinstance(u, IPv6ExtHdrRouting) and
- u.segleft != 0 and len(u.addresses) != 0 and
- final_dest_addr_found == 0):
- rthdr = u.addresses[-1]
- final_dest_addr_found = 1
- elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
- isinstance(u.options[0], HAO)):
- hahdr = u.options[0].hoa
- u = u.underlayer
- if u is None:
- warning("No IPv6 underlayer to compute checksum. Leaving null.")
- return 0
- if hahdr:
- ph6.src = hahdr
- else:
- ph6.src = u.src
- if rthdr:
- ph6.dst = rthdr
- else:
- ph6.dst = u.dst
- ph6.uplen = len(p)
- ph6s = bytes(ph6)
- return checksum(ph6s+p)
-
-
-#############################################################################
-#############################################################################
-### Extension Headers ###
-#############################################################################
-#############################################################################
-
-
-# Inherited by all extension header classes
-class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
- name = 'Abstract IPV6 Option Header'
- aliastypes = [IPv6, IPerror6] # TODO ...
-
-
-#################### IPv6 options for Extension Headers #####################
-
-_hbhopts = { 0x00: "Pad1",
- 0x01: "PadN",
- 0x04: "Tunnel Encapsulation Limit",
- 0x05: "Router Alert",
- 0x06: "Quick-Start",
- 0xc2: "Jumbo Payload",
- 0xc9: "Home Address Option" }
-
-class _OTypeField(ByteEnumField):
- """
- Modified BytEnumField that displays information regarding the IPv6 option
- based on its option type value (What should be done by nodes that process
- the option if they do not understand it ...)
-
- It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
- """
- pol = {0x00: "00: skip",
- 0x40: "01: discard",
- 0x80: "10: discard+ICMP",
- 0xC0: "11: discard+ICMP not mcast"}
-
- enroutechange = {0x00: "0: Don't change en-route",
- 0x20: "1: May change en-route" }
-
- def i2repr(self, pkt, x):
- s = self.i2s.get(x, repr(x))
- polstr = self.pol[(x & 0xC0)]
- enroutechangestr = self.enroutechange[(x & 0x20)]
- return "%s [%s, %s]" % (s, polstr, enroutechangestr)
-
-class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
- name = "Scapy6 Unknown Option"
- fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
- FieldLenField("optlen", None, length_of="optdata", fmt="B"),
- StrLenField("optdata", "",
- length_from = lambda pkt: pkt.optlen) ]
- def alignment_delta(self, curpos): # By default, no alignment requirement
- """
- As specified in section 4.2 of RFC 2460, every options has
- an alignment requirement ususally expressed xn+y, meaning
- the Option Type must appear at an integer multiple of x octest
- from the start of the header, plus y octet.
-
- That function is provided the current position from the
- start of the header and returns required padding length.
- """
- return 0
-
-class Pad1(Packet): # IPv6 Hop-By-Hop Option
- name = "Pad1"
- fields_desc = [ _OTypeField("otype", 0x00, _hbhopts) ]
- def alignment_delta(self, curpos): # No alignment requirement
- return 0
-
-class PadN(Packet): # IPv6 Hop-By-Hop Option
- name = "PadN"
- fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
- FieldLenField("optlen", None, length_of="optdata", fmt="B"),
- StrLenField("optdata", "",
- length_from = lambda pkt: pkt.optlen)]
- def alignment_delta(self, curpos): # No alignment requirement
- return 0
-
-class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
- name = "Router Alert"
- fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
- ByteField("optlen", 2),
- ShortEnumField("value", None,
- { 0: "Datagram contains a MLD message",
- 1: "Datagram contains RSVP message",
- 2: "Datagram contains an Active Network message" }) ]
- # TODO : Check IANA has not defined new values for value field of RouterAlertOption
- # TODO : now that we have that option, we should do something in MLD class that need it
- def alignment_delta(self, curpos): # alignment requirement : 2n+0
- x = 2 ; y = 0
- delta = x*((curpos - y + x - 1)//x) + y - curpos
- return delta
-
-class Jumbo(Packet): # IPv6 Hop-By-Hop Option
- name = "Jumbo Payload"
- fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
- ByteField("optlen", 4),
- IntField("jumboplen", None) ]
- def alignment_delta(self, curpos): # alignment requirement : 4n+2
- x = 4 ; y = 2
- delta = x*((curpos - y + x - 1)//x) + y - curpos
- return delta
-
-class HAO(Packet): # IPv6 Destination Options Header Option
- name = "Home Address Option"
- fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
- ByteField("optlen", 16),
- IP6Field("hoa", "::") ]
- def alignment_delta(self, curpos): # alignment requirement : 8n+6
- x = 8 ; y = 6
- delta = x*((curpos - y + x - 1)//x) + y - curpos
- return delta
-
-_hbhoptcls = { 0x00: Pad1,
- 0x01: PadN,
- 0x05: RouterAlert,
- 0xC2: Jumbo,
- 0xC9: HAO }
-
-
-######################## Hop-by-Hop Extension Header ########################
-
-class _HopByHopOptionsField(PacketListField):
- islist = 1
- holds_packet = 1
- def __init__(self, name, default, cls, curpos, count_from=None, length_from=None):
- self.curpos = curpos
- PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from)
-
- def i2len(self, pkt, i):
- l = len(self.i2m(pkt, i))
- return l
-
- def i2count(self, pkt, i):
- if type(i) is list:
- return len(i)
- return 0
-
- def getfield(self, pkt, s):
- c = l = None
- if self.length_from is not None:
- l = self.length_from(pkt)
- elif self.count_from is not None:
- c = self.count_from(pkt)
-
- opt = []
- ret = b""
- x = s
- if l is not None:
- x,ret = s[:l],s[l:]
- while x:
- if c is not None:
- if c <= 0:
- break
- c -= 1
- #o = ord(x[0]) # Option type
- o = x[0] # Option type
- cls = self.cls
- if o in _hbhoptcls:
- cls = _hbhoptcls[o]
- try:
- op = cls(x)
- except:
- op = self.cls(x)
- opt.append(op)
- if isinstance(op.payload, conf.raw_layer):
- x = op.payload.load
- del(op.payload)
- else:
- x = b""
- return x+ret,opt
-
- def i2m(self, pkt, x):
- autopad = None
- try:
- autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
- except:
- autopad = 1
-
- if not autopad:
- return b"".join(map(bytes, x))
-
- curpos = self.curpos
- s = b""
- for p in x:
- d = p.alignment_delta(curpos)
- curpos += d
- if d == 1:
- s += bytes(Pad1())
- elif d != 0:
- s += bytes(PadN(optdata=b'\x00'*(d-2)))
- pstr = bytes(p)
- curpos += len(pstr)
- s += pstr
-
- # Let's make the class including our option field
- # a multiple of 8 octets long
- d = curpos % 8
- if d == 0:
- return s
- d = 8 - d
- if d == 1:
- s += bytes(Pad1())
- elif d != 0:
- s += bytes(PadN(optdata=b'\x00'*(d-2)))
-
- return s
-
- def addfield(self, pkt, s, val):
- return s+self.i2m(pkt, val)
-
-class _PhantomAutoPadField(ByteField):
- def addfield(self, pkt, s, val):
- return s
-
- def getfield(self, pkt, s):
- return s, 1
-
- def i2repr(self, pkt, x):
- if x:
- return "On"
- return "Off"
-
-
-class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
- name = "IPv6 Extension Header - Hop-by-Hop Options Header"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- FieldLenField("len", None, length_of="options", fmt="B",
- adjust = lambda pkt,x: (x+2+7)//8 - 1),
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _HopByHopOptionsField("options", [], HBHOptUnknown, 2,
- length_from = lambda pkt: (8*(pkt.len+1))-2) ]
- overload_fields = {IPv6: { "nh": 0 }}
-
-
-######################## Destination Option Header ##########################
-
-class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
- name = "IPv6 Extension Header - Destination Options Header"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- FieldLenField("len", None, length_of="options", fmt="B",
- adjust = lambda pkt,x: (x+2+7)//8 - 1),
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _HopByHopOptionsField("options", [], HBHOptUnknown, 2,
- length_from = lambda pkt: (8*(pkt.len+1))-2) ]
- overload_fields = {IPv6: { "nh": 60 }}
-
-
-############################# Routing Header ################################
-
-class IPv6ExtHdrRouting(_IPv6ExtHdr):
- name = "IPv6 Option Header Routing"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- FieldLenField("len", None, count_of="addresses", fmt="B",
- adjust = lambda pkt,x:2*x), # in 8 bytes blocks
- ByteField("type", 0),
- ByteField("segleft", None),
- BitField("reserved", 0, 32), # There is meaning in this field ...
- IP6ListField("addresses", [],
- length_from = lambda pkt: 8*pkt.len)]
- overload_fields = {IPv6: { "nh": 43 }}
-
- def post_build(self, pkt, pay):
- if self.segleft is None:
- pkt = pkt[:3]+struct.pack("B", len(self.addresses))+pkt[4:]
- return _IPv6ExtHdr.post_build(self, pkt, pay)
-
-########################### Fragmentation Header ############################
-
-class IPv6ExtHdrFragment(_IPv6ExtHdr):
- name = "IPv6 Extension Header - Fragmentation header"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- BitField("res1", 0, 8),
- BitField("offset", 0, 13),
- BitField("res2", 0, 2),
- BitField("m", 0, 1),
- IntField("id", None) ]
- overload_fields = {IPv6: { "nh": 44 }}
-
-
-def defragment6(pktlist):
- """
- Performs defragmentation of a list of IPv6 packets. Packets are reordered.
- Crap is dropped. What lacks is completed by 'X' characters.
- """
-
- l = [ x for x in pktlist if IPv6ExtHdrFragment in x ] # remove non fragments
- if not l:
- return []
-
- id = l[0][IPv6ExtHdrFragment].id
-
- llen = len(l)
- l = [ x for x in l if x[IPv6ExtHdrFragment].id == id ]
- if len(l) != llen:
- warning("defragment6: some fragmented packets have been removed from list")
- llen = len(l)
-
- # reorder fragments
- i = 0
- res = []
- while l:
- min_pos = 0
- min_offset = l[0][IPv6ExtHdrFragment].offset
- for p in l:
- cur_offset = p[IPv6ExtHdrFragment].offset
- if cur_offset < min_offset:
- min_pos = 0
- min_offset = cur_offset
- res.append(l[min_pos])
- del(l[min_pos])
-
- # regenerate the fragmentable part
- fragmentable = b""
- for p in res:
- q=p[IPv6ExtHdrFragment]
- offset = 8*q.offset
- if offset != len(fragmentable):
- warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset))
- fragmentable += b"X"*(offset - len(fragmentable))
- fragmentable += bytes(q.payload)
-
- # Regenerate the unfragmentable part.
- q = res[0]
- nh = q[IPv6ExtHdrFragment].nh
- q[IPv6ExtHdrFragment].underlayer.nh = nh
- q[IPv6ExtHdrFragment].underlayer.payload = None
- q /= conf.raw_layer(load=fragmentable)
-
- return IPv6(bytes(q))
-
-
-def fragment6(pkt, fragSize):
- """
- Performs fragmentation of an IPv6 packet. Provided packet ('pkt') must already
- contain an IPv6ExtHdrFragment() class. 'fragSize' argument is the expected
- maximum size of fragments (MTU). The list of packets is returned.
-
- If packet does not contain an IPv6ExtHdrFragment class, it is returned in
- result list.
- """
-
- pkt = pkt.copy()
-
- if not IPv6ExtHdrFragment in pkt:
- # TODO : automatically add a fragment before upper Layer
- # at the moment, we do nothing and return initial packet
- # as single element of a list
- return [pkt]
-
- # If the payload is bigger than 65535, a Jumbo payload must be used, as
- # an IPv6 packet can't be bigger than 65535 bytes.
- if len(bytes(pkt[IPv6ExtHdrFragment])) > 65535:
- warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.")
- return []
-
- s = bytes(pkt) # for instantiation to get upper layer checksum right
-
- if len(s) <= fragSize:
- return [pkt]
-
- # Fragmentable part : fake IPv6 for Fragmentable part length computation
- fragPart = pkt[IPv6ExtHdrFragment].payload
- tmp = bytes(IPv6(src="::1", dst="::1")/fragPart)
- fragPartLen = len(tmp) - 40 # basic IPv6 header length
- fragPartStr = s[-fragPartLen:]
-
- # Grab Next Header for use in Fragment Header
- nh = IPv6(tmp[:40]).nh
-
- # Keep fragment header
- fragHeader = pkt[IPv6ExtHdrFragment]
- fragHeader.payload = None # detach payload
-
- # Unfragmentable Part
- unfragPartLen = len(s) - fragPartLen - 8
- unfragPart = pkt
- pkt[IPv6ExtHdrFragment].underlayer.payload = None # detach payload
-
- # Cut the fragmentable part to fit fragSize. Inner fragments have
- # a length that is an integer multiple of 8 octets. last Frag MTU
- # can be anything below MTU
- lastFragSize = fragSize - unfragPartLen - 8
- innerFragSize = lastFragSize - (lastFragSize % 8)
-
- if lastFragSize <= 0 or innerFragSize == 0:
- warning("Provided fragment size value is too low. " +
- "Should be more than %d" % (unfragPartLen + 8))
- return [unfragPart/fragHeader/fragPart]
-
- remain = fragPartStr
- res = []
- fragOffset = 0 # offset, incremeted during creation
- fragId = random.randint(0,0xffffffff) # random id ...
- if fragHeader.id is not None: # ... except id provided by user
- fragId = fragHeader.id
- fragHeader.m = 1
- fragHeader.id = fragId
- fragHeader.nh = nh
-
- # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
- while True:
- if (len(remain) > lastFragSize):
- tmp = remain[:innerFragSize]
- remain = remain[innerFragSize:]
- fragHeader.offset = fragOffset # update offset
- fragOffset += (innerFragSize // 8) # compute new one
- if IPv6 in unfragPart:
- unfragPart[IPv6].plen = None
- tempo = unfragPart/fragHeader/conf.raw_layer(load=tmp)
- res.append(tempo)
- else:
- fragHeader.offset = fragOffset # update offSet
- fragHeader.m = 0
- if IPv6 in unfragPart:
- unfragPart[IPv6].plen = None
- tempo = unfragPart/fragHeader/conf.raw_layer(load=remain)
- res.append(tempo)
- break
- return res
-
-
-############################### AH Header ###################################
-
-# class _AHFieldLenField(FieldLenField):
-# def getfield(self, pkt, s):
-# l = getattr(pkt, self.fld)
-# l = (l*8)-self.shift
-# i = self.m2i(pkt, s[:l])
-# return s[l:],i
-
-# class _AHICVStrLenField(StrLenField):
-# def i2len(self, pkt, x):
-
-
-
-# class IPv6ExtHdrAH(_IPv6ExtHdr):
-# name = "IPv6 Extension Header - AH"
-# fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
-# _AHFieldLenField("len", None, "icv"),
-# ShortField("res", 0),
-# IntField("spi", 0),
-# IntField("sn", 0),
-# _AHICVStrLenField("icv", None, "len", shift=2) ]
-# overload_fields = {IPv6: { "nh": 51 }}
-
-# def post_build(self, pkt, pay):
-# if self.len is None:
-# pkt = pkt[0]+struct.pack("!B", 2*len(self.addresses))+pkt[2:]
-# if self.segleft is None:
-# pkt = pkt[:3]+struct.pack("!B", len(self.addresses))+pkt[4:]
-# return _IPv6ExtHdr.post_build(self, pkt, pay)
-
-
-############################### ESP Header ##################################
-
-# class IPv6ExtHdrESP(_IPv6extHdr):
-# name = "IPv6 Extension Header - ESP"
-# fields_desc = [ IntField("spi", 0),
-# IntField("sn", 0),
-# # there is things to extract from IKE work
-# ]
-# overloads_fields = {IPv6: { "nh": 50 }}
-
-
-
-#############################################################################
-#############################################################################
-### ICMPv6* Classes ###
-#############################################################################
-#############################################################################
-
-icmp6typescls = { 1: "ICMPv6DestUnreach",
- 2: "ICMPv6PacketTooBig",
- 3: "ICMPv6TimeExceeded",
- 4: "ICMPv6ParamProblem",
- 128: "ICMPv6EchoRequest",
- 129: "ICMPv6EchoReply",
- 130: "ICMPv6MLQuery",
- 131: "ICMPv6MLReport",
- 132: "ICMPv6MLDone",
- 133: "ICMPv6ND_RS",
- 134: "ICMPv6ND_RA",
- 135: "ICMPv6ND_NS",
- 136: "ICMPv6ND_NA",
- 137: "ICMPv6ND_Redirect",
- #138: Do Me - RFC 2894 - Seems painful
- 139: "ICMPv6NIQuery",
- 140: "ICMPv6NIReply",
- 141: "ICMPv6ND_INDSol",
- 142: "ICMPv6ND_INDAdv",
- #143: Do Me - RFC 3810
- 144: "ICMPv6HAADRequest",
- 145: "ICMPv6HAADReply",
- 146: "ICMPv6MPSol",
- 147: "ICMPv6MPAdv",
- #148: Do Me - SEND related - RFC 3971
- #149: Do Me - SEND related - RFC 3971
- 151: "ICMPv6MRD_Advertisement",
- 152: "ICMPv6MRD_Solicitation",
- 153: "ICMPv6MRD_Termination",
- }
-
-icmp6typesminhdrlen = { 1: 8,
- 2: 8,
- 3: 8,
- 4: 8,
- 128: 8,
- 129: 8,
- 130: 24,
- 131: 24,
- 132: 24,
- 133: 8,
- 134: 16,
- 135: 24,
- 136: 24,
- 137: 40,
- #139:
- #140
- 141: 8,
- 142: 8,
- 144: 8,
- 145: 8,
- 146: 8,
- 147: 8,
- 151: 8,
- 152: 4,
- 153: 4
- }
-
-icmp6types = { 1 : "Destination unreachable",
- 2 : "Packet too big",
- 3 : "Time exceeded",
- 4 : "Parameter problem",
- 100 : "Private Experimentation",
- 101 : "Private Experimentation",
- 128 : "Echo Request",
- 129 : "Echo Reply",
- 130 : "MLD Query",
- 131 : "MLD Report",
- 132 : "MLD Done",
- 133 : "Router Solicitation",
- 134 : "Router Advertisement",
- 135 : "Neighbor Solicitation",
- 136 : "Neighbor Advertisement",
- 137 : "Redirect Message",
- 138 : "Router Renumbering",
- 139 : "ICMP Node Information Query",
- 140 : "ICMP Node Information Response",
- 141 : "Inverse Neighbor Discovery Solicitation Message",
- 142 : "Inverse Neighbor Discovery Advertisement Message",
- 143 : "Version 2 Multicast Listener Report",
- 144 : "Home Agent Address Discovery Request Message",
- 145 : "Home Agent Address Discovery Reply Message",
- 146 : "Mobile Prefix Solicitation",
- 147 : "Mobile Prefix Advertisement",
- 148 : "Certification Path Solicitation",
- 149 : "Certification Path Advertisement",
- 151 : "Multicast Router Advertisement",
- 152 : "Multicast Router Solicitation",
- 153 : "Multicast Router Termination",
- 200 : "Private Experimentation",
- 201 : "Private Experimentation" }
-
-
-class _ICMPv6(Packet):
- name = "ICMPv6 dummy class"
- overload_fields = {IPv6: {"nh": 58}}
- def post_build(self, p, pay):
- p += pay
- if self.cksum == None:
- chksum = in6_chksum(58, self.underlayer, p)
- p = p[:2]+struct.pack("!H", chksum)+p[4:]
- return p
-
- def hashret(self):
- return self.payload.hashret()
-
- def answers(self, other):
- # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
- if (isinstance(self.underlayer, IPerror6) or
- isinstance(self.underlayer, _IPv6ExtHdr) and
- isinstance(other, _ICMPv6)):
- if not ((self.type == other.type) and
- (self.code == other.code)):
- return 0
- return 1
- return 0
-
-
-class _ICMPv6Error(_ICMPv6):
- name = "ICMPv6 errors dummy class"
- def guess_payload_class(self,p):
- return IPerror6
-
-class ICMPv6Unknown(_ICMPv6):
- name = "Scapy6 ICMPv6 fallback class"
- fields_desc = [ ByteEnumField("type",1, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- StrField("msgbody", "")]
-
-
-################################## RFC 2460 #################################
-
-class ICMPv6DestUnreach(_ICMPv6Error):
- name = "ICMPv6 Destination Unreachable"
- fields_desc = [ ByteEnumField("type",1, icmp6types),
- ByteEnumField("code",0, { 0: "No route to destination",
- 1: "Communication with destination administratively prohibited",
- 2: "Beyond scope of source address",
- 3: "Address unreachable",
- 4: "Port unreachable" }),
- XShortField("cksum", None),
- XIntField("unused",0x00000000)]
-
-class ICMPv6PacketTooBig(_ICMPv6Error):
- name = "ICMPv6 Packet Too Big"
- fields_desc = [ ByteEnumField("type",2, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- IntField("mtu",1280)]
-
-class ICMPv6TimeExceeded(_ICMPv6Error):
- name = "ICMPv6 Time Exceeded"
- fields_desc = [ ByteEnumField("type",3, icmp6types),
- ByteEnumField("code",0, { 0: "hop limit exceeded in transit",
- 1: "fragment reassembly time exceeded"}),
- XShortField("cksum", None),
- XIntField("unused",0x00000000)]
-
-# The default pointer value is set to the next header field of
-# the encapsulated IPv6 packet
-class ICMPv6ParamProblem(_ICMPv6Error):
- name = "ICMPv6 Parameter Problem"
- fields_desc = [ ByteEnumField("type",4, icmp6types),
- ByteEnumField("code",0, {0: "erroneous header field encountered",
- 1: "unrecognized Next Header type encountered",
- 2: "unrecognized IPv6 option encountered"}),
- XShortField("cksum", None),
- IntField("ptr",6)]
-
-class ICMPv6EchoRequest(_ICMPv6):
- name = "ICMPv6 Echo Request"
- fields_desc = [ ByteEnumField("type", 128, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- XShortField("id",0),
- XShortField("seq",0),
- StrField("data", "")]
- def mysummary(self):
- return self.sprintf("%name% (id: %id% seq: %seq%)")
- def hashret(self):
- return struct.pack("HH",self.id,self.seq)+self.payload.hashret()
-
-
-class ICMPv6EchoReply(ICMPv6EchoRequest):
- name = "ICMPv6 Echo Reply"
- type = 129
- def answers(self, other):
- # We could match data content between request and reply.
- return (isinstance(other, ICMPv6EchoRequest) and
- self.id == other.id and self.seq == other.seq and
- self.data == other.data)
-
-
-############ ICMPv6 Multicast Listener Discovery (RFC3810) ##################
-
-# tous les messages MLD sont emis avec une adresse source lien-locale
-# -> Y veiller dans le post_build si aucune n'est specifiee
-# La valeur de Hop-Limit doit etre de 1
-# "and an IPv6 Router Alert option in a Hop-by-Hop Options
-# header. (The router alert option is necessary to cause routers to
-# examine MLD messages sent to multicast addresses in which the router
-# itself has no interest"
-class _ICMPv6ML(_ICMPv6):
- fields_desc = [ ByteEnumField("type", 130, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- ShortField("mrd", 0),
- ShortField("reserved", 0),
- IP6Field("mladdr","::")]
-
-# general queries are sent to the link-scope all-nodes multicast
-# address ff02::1, with a multicast address field of 0 and a MRD of
-# [Query Response Interval]
-# Default value for mladdr is set to 0 for a General Query, and
-# overloaded by the user for a Multicast Address specific query
-# TODO : See what we can do to automatically include a Router Alert
-# Option in a Destination Option Header.
-class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
- name = "MLD - Multicast Listener Query"
- type = 130
- mrd = 10000
- mladdr = "::" # 10s for mrd
- overload_fields = {IPv6: { "dst": "ff02::1", "hlim": 1, "nh": 58 }}
- def hashret(self):
- if self.mladdr != "::":
- return struct.pack("HH",self.mladdr)+self.payload.hashret()
- else:
- return self.payload.hashret()
-
-
-# TODO : See what we can do to automatically include a Router Alert
-# Option in a Destination Option Header.
-class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
- name = "MLD - Multicast Listener Report"
- type = 131
- overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
- # implementer le hashret et le answers
-
-# When a node ceases to listen to a multicast address on an interface,
-# it SHOULD send a single Done message to the link-scope all-routers
-# multicast address (FF02::2), carrying in its multicast address field
-# the address to which it is ceasing to listen
-# TODO : See what we can do to automatically include a Router Alert
-# Option in a Destination Option Header.
-class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
- name = "MLD - Multicast Listener Done"
- type = 132
- overload_fields = {IPv6: { "dst": "ff02::2", "hlim": 1, "nh": 58}}
-
-
-########## ICMPv6 MRD - Multicast Router Discovery (RFC 4286) ###############
-
-# TODO:
-# - 04/09/06 troglocan : find a way to automatically add a router alert
-# option for all MRD packets. This could be done in a specific
-# way when IPv6 is the under layer with some specific keyword
-# like 'exthdr'. This would allow to keep compatibility with
-# providing IPv6 fields to be overloaded in fields_desc.
-#
-# At the moment, if user inserts an IPv6 Router alert option
-# none of the IPv6 default values of IPv6 layer will be set.
-
-class ICMPv6MRD_Advertisement(_ICMPv6):
- name = "ICMPv6 Multicast Router Discovery Advertisement"
- fields_desc = [ByteEnumField("type", 151, icmp6types),
- ByteField("advinter", 20),
- XShortField("cksum", None),
- ShortField("queryint", 0),
- ShortField("robustness", 0)]
- overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}}
- # IPv6 Router Alert requires manual inclusion
- def extract_padding(self, s):
- return s[:8], s[8:]
-
-class ICMPv6MRD_Solicitation(_ICMPv6):
- name = "ICMPv6 Multicast Router Discovery Solicitation"
- fields_desc = [ByteEnumField("type", 152, icmp6types),
- ByteField("res", 0),
- XShortField("cksum", None) ]
- overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}}
- # IPv6 Router Alert requires manual inclusion
- def extract_padding(self, s):
- return s[:4], s[4:]
-
-class ICMPv6MRD_Termination(_ICMPv6):
- name = "ICMPv6 Multicast Router Discovery Termination"
- fields_desc = [ByteEnumField("type", 153, icmp6types),
- ByteField("res", 0),
- XShortField("cksum", None) ]
- overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::6A"}}
- # IPv6 Router Alert requires manual inclusion
- def extract_padding(self, s):
- return s[:4], s[4:]
-
-
-################### ICMPv6 Neighbor Discovery (RFC 2461) ####################
-
-icmp6ndopts = { 1: "Source Link-Layer Address",
- 2: "Target Link-Layer Address",
- 3: "Prefix Information",
- 4: "Redirected Header",
- 5: "MTU",
- 6: "NBMA Shortcut Limit Option", # RFC2491
- 7: "Advertisement Interval Option",
- 8: "Home Agent Information Option",
- 9: "Source Address List",
- 10: "Target Address List",
- 11: "CGA Option", # RFC 3971
- 12: "RSA Signature Option", # RFC 3971
- 13: "Timestamp Option", # RFC 3971
- 14: "Nonce option", # RFC 3971
- 15: "Trust Anchor Option", # RFC 3971
- 16: "Certificate Option", # RFC 3971
- 17: "IP Address Option", # RFC 4068
- 18: "New Router Prefix Information Option", # RFC 4068
- 19: "Link-layer Address Option", # RFC 4068
- 20: "Neighbor Advertisement Acknowledgement Option",
- 21: "CARD Request Option", # RFC 4065/4066/4067
- 22: "CARD Reply Option", # RFC 4065/4066/4067
- 23: "MAP Option", # RFC 4140
- 24: "Route Information Option", # RFC 4191
- 25: "Recusive DNS Server Option",
- 26: "IPv6 Router Advertisement Flags Option"
- }
-
-icmp6ndoptscls = { 1: "ICMPv6NDOptSrcLLAddr",
- 2: "ICMPv6NDOptDstLLAddr",
- 3: "ICMPv6NDOptPrefixInfo",
- 4: "ICMPv6NDOptRedirectedHdr",
- 5: "ICMPv6NDOptMTU",
- 6: "ICMPv6NDOptShortcutLimit",
- 7: "ICMPv6NDOptAdvInterval",
- 8: "ICMPv6NDOptHAInfo",
- 9: "ICMPv6NDOptSrcAddrList",
- 10: "ICMPv6NDOptTgtAddrList",
- #11: Do Me,
- #12: Do Me,
- #13: Do Me,
- #14: Do Me,
- #15: Do Me,
- #16: Do Me,
- 17: "ICMPv6NDOptIPAddr",
- 18: "ICMPv6NDOptNewRtrPrefix",
- 19: "ICMPv6NDOptLLA",
- #18: Do Me,
- #19: Do Me,
- #20: Do Me,
- #21: Do Me,
- #22: Do Me,
- 23: "ICMPv6NDOptMAP",
- 24: "ICMPv6NDOptRouteInfo",
- 25: "ICMPv6NDOptRDNSS",
- 26: "ICMPv6NDOptEFA"
- }
-
-class _ICMPv6NDGuessPayload:
- name = "Dummy ND class that implements guess_payload_class()"
- def guess_payload_class(self,p):
- if len(p) > 1:
- #return get_cls(icmp6ndoptscls.get(ord(p[0]),"Raw"), "Raw") # s/Raw/ICMPv6NDOptUnknown/g ?
- return get_cls(icmp6ndoptscls.get(p[0],"Raw"), "Raw") # s/Raw/ICMPv6NDOptUnknown/g ?
-
-
-# Beginning of ICMPv6 Neighbor Discovery Options.
-
-class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
- fields_desc = [ ByteField("type",None),
- FieldLenField("len",None,length_of="data",fmt="B",
- adjust = lambda pkt,x: x+2),
- StrLenField("data","",
- length_from = lambda pkt: pkt.len-2) ]
-
-# NOTE: len includes type and len field. Expressed in unit of 8 bytes
-# TODO: Revoir le coup du ETHER_ANY
-class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
- fields_desc = [ ByteField("type", 1),
- ByteField("len", 1),
- MACField("lladdr", ETHER_ANY) ]
- def mysummary(self):
- return self.sprintf("%name% %lladdr%")
-
-class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
- name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
- type = 2
-
-class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
- fields_desc = [ ByteField("type",3),
- ByteField("len",4),
- ByteField("prefixlen",None),
- BitField("L",1,1),
- BitField("A",1,1),
- BitField("R",0,1),
- BitField("res1",0,5),
- XIntField("validlifetime",0xffffffff),
- XIntField("preferredlifetime",0xffffffff),
- XIntField("res2",0x00000000),
- IP6Field("prefix","::") ]
- def mysummary(self):
- return self.sprintf("%name% %prefix%")
-
-# TODO: We should also limit the size of included packet to something
-# like (initiallen - 40 - 2)
-class TruncPktLenField(PacketLenField):
-
- def __init__(self, name, default, cls, cur_shift, length_from=None, shift=0):
- PacketLenField.__init__(self, name, default, cls, length_from=length_from)
- self.cur_shift = cur_shift
-
- def getfield(self, pkt, s):
- l = self.length_from(pkt)
- i = self.m2i(pkt, s[:l])
- return s[l:],i
-
- def m2i(self, pkt, m):
- s = None
- try: # It can happen we have sth shorter than 40 bytes
- s = self.cls(m)
- except:
- return conf.raw_layer(m)
- return s
-
- def i2m(self, pkt, x):
- s = bytes(x)
- l = len(s)
- r = (l + self.cur_shift) % 8
- l = l - r
- return s[:l]
-
- def i2len(self, pkt, i):
- return len(self.i2m(pkt, i))
-
-
-# Faire un post_build pour le recalcul de la taille (en multiple de 8 octets)
-class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
- fields_desc = [ ByteField("type",4),
- FieldLenField("len", None, length_of="pkt", fmt="B",
- adjust = lambda pkt,x:(x+8)//8),
- StrFixedLenField("res", b"\x00"*6, 6),
- TruncPktLenField("pkt", b"", IPv6, 8,
- length_from = lambda pkt: 8*pkt.len-8) ]
-
-# See which value should be used for default MTU instead of 1280
-class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery Option - MTU"
- fields_desc = [ ByteField("type",5),
- ByteField("len",1),
- XShortField("res",0),
- IntField("mtu",1280)]
-
-class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
- name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
- fields_desc = [ ByteField("type", 6),
- ByteField("len", 1),
- ByteField("shortcutlim", 40), # XXX
- ByteField("res1", 0),
- IntField("res2", 0) ]
-
-class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
- fields_desc = [ ByteField("type",7),
- ByteField("len",1),
- ShortField("res", 0),
- IntField("advint", 0) ]
- def mysummary(self):
- return self.sprintf("%name% %advint% milliseconds")
-
-class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Neighbor Discovery - Home Agent Information"
- fields_desc = [ ByteField("type",8),
- ByteField("len",1),
- ShortField("res", 0),
- ShortField("pref", 0),
- ShortField("lifetime", 1)]
- def mysummary(self):
- return self.sprintf("%name% %pref% %lifetime% seconds")
-
-# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
-
-# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
-
-class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
- name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
- fields_desc = [ ByteField("type",17),
- ByteField("len", 3),
- ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
- 2: "New Care-Of Address",
- 3: "NAR's IP address" }),
- ByteField("plen", 64),
- IntField("res", 0),
- IP6Field("addr", "::") ]
-
-class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
- name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)"
- fields_desc = [ ByteField("type",18),
- ByteField("len", 3),
- ByteField("optcode", 0),
- ByteField("plen", 64),
- IntField("res", 0),
- IP6Field("prefix", "::") ]
-
-_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
- 1: "LLA for the new AP",
- 2: "LLA of the MN",
- 3: "LLA of the NAR",
- 4: "LLA of the src of TrSolPr or PrRtAdv msg",
- 5: "AP identified by LLA belongs to current iface of router",
- 6: "No preifx info available for AP identified by the LLA",
- 7: "No fast handovers support for AP identified by the LLA" }
-
-class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
- name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)"
- fields_desc = [ ByteField("type", 19),
- ByteField("len", 1),
- ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
- MACField("lla", ETHER_ANY) ] # We only support ethernet
-
-class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
- name = "ICMPv6 Neighbor Discovery - MAP Option"
- fields_desc = [ ByteField("type", 23),
- ByteField("len", 3),
- BitField("dist", 1, 4),
- BitField("pref", 15, 4), # highest availability
- BitField("R", 1, 1),
- BitField("res", 0, 7),
- IntField("validlifetime", 0xffffffff),
- IP6Field("addr", "::") ]
-
-
-class IP6PrefixField(IP6Field):
- def __init__(self, name, default):
- IP6Field.__init__(self, name, default)
- self.length_from = lambda pkt: 8*(pkt.len - 1)
-
- def addfield(self, pkt, s, val):
- return s + self.i2m(pkt, val)
-
- def getfield(self, pkt, s):
- l = self.length_from(pkt)
- p = s[:l]
- if l < 16:
- p += b'\x00'*(16-l)
- return s[l:], self.m2i(pkt,p)
-
- def i2len(self, pkt, x):
- return len(self.i2m(pkt, x))
-
- def i2m(self, pkt, x):
- l = pkt.len
-
- if x is None:
- x = "::"
- if l is None:
- l = 1
- x = inet_pton(socket.AF_INET6, x)
-
- if l is None:
- return x
- if l in [0, 1]:
- return b""
- if l in [2, 3]:
- return x[:8*(l-1)]
-
- return x + b'\x00'*8*(l-3)
-
-class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
- name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
- fields_desc = [ ByteField("type",24),
- FieldLenField("len", None, length_of="prefix", fmt="B",
- adjust = lambda pkt,x: x//8 + 1),
- ByteField("plen", None),
- BitField("res1",0,3),
- BitField("prf",0,2),
- BitField("res2",0,3),
- IntField("rtlifetime", 0xffffffff),
- IP6PrefixField("prefix", None) ]
-
-class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
- name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
- fields_desc = [ ByteField("type", 25),
- FieldLenField("len", None, count_of="dns", fmt="B",
- adjust = lambda pkt,x: 2*x+1),
- ShortField("res", None),
- IntField("lifetime", 0xffffffff),
- IP6ListField("dns", [],
- length_from = lambda pkt: 8*(pkt.len-1)) ]
-
-class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
- name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
- fields_desc = [ ByteField("type", 26),
- ByteField("len", 1),
- BitField("res", 0, 48) ]
-
-# End of ICMPv6 Neighbor Discovery Options.
-
-class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
- name = "ICMPv6 Neighbor Discovery - Router Solicitation"
- fields_desc = [ ByteEnumField("type", 133, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- IntField("res",0) ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::2", "hlim": 255 }}
-
-class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
- name = "ICMPv6 Neighbor Discovery - Router Advertisement"
- fields_desc = [ ByteEnumField("type", 134, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- ByteField("chlim",0),
- BitField("M",0,1),
- BitField("O",0,1),
- BitField("H",0,1),
- BitEnumField("prf",1,2, { 0: "Medium (default)",
- 1: "High",
- 2: "Reserved",
- 3: "Low" } ), # RFC 4191
- BitField("P",0,1),
- BitField("res",0,2),
- ShortField("routerlifetime",1800),
- IntField("reachabletime",0),
- IntField("retranstimer",0) ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
- def answers(self, other):
- return isinstance(other, ICMPv6ND_RS)
-
-class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
- name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
- fields_desc = [ ByteEnumField("type",135, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- IntField("res", 0),
- IP6Field("tgt","::") ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
- def mysummary(self):
- return self.sprintf("%name% (tgt: %tgt%)")
-
- def hashret(self):
- return self.getbyteval("tgt")+self.payload.hashret()
-
-class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
- name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
- fields_desc = [ ByteEnumField("type",136, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- BitField("R",1,1),
- BitField("S",0,1),
- BitField("O",1,1),
- XBitField("res",0,29),
- IP6Field("tgt","::") ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
- def mysummary(self):
- return self.sprintf("%name% (tgt: %tgt%)")
-
- def hashret(self):
- return self.getbyteval("tgt")+self.payload.hashret()
-
- def answers(self, other):
- return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
-
-# associated possible options : target link-layer option, Redirected header
-class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
- name = "ICMPv6 Neighbor Discovery - Redirect"
- fields_desc = [ ByteEnumField("type",137, icmp6types),
- ByteField("code",0),
- XShortField("cksum", None),
- XIntField("res",0),
- IP6Field("tgt","::"),
- IP6Field("dst","::") ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
-
-
-################ ICMPv6 Inverse Neighbor Discovery (RFC 3122) ###############
-
-class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
- name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
- fields_desc = [ ByteField("type",9),
- FieldLenField("len", None, count_of="addrlist", fmt="B",
- adjust = lambda pkt,x: 2*x+1),
- StrFixedLenField("res", "\x00"*6, 6),
- IP6ListField("addrlist", [],
- length_from = lambda pkt: 8*(pkt.len-1)) ]
-
-class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
- name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
- type = 10
-
-
-# RFC3122
-# Options requises : source lladdr et target lladdr
-# Autres options valides : source address list, MTU
-# - Comme precise dans le document, il serait bien de prendre l'adresse L2
-# demandee dans l'option requise target lladdr et l'utiliser au niveau
-# de l'adresse destination ethernet si aucune adresse n'est precisee
-# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
-# les options.
-# Ether() must use the target lladdr as destination
-class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
- name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
- fields_desc = [ ByteEnumField("type",141, icmp6types),
- ByteField("code",0),
- XShortField("cksum",None),
- XIntField("reserved",0) ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
-# Options requises : target lladdr, target address list
-# Autres options valides : MTU
-class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
- name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
- fields_desc = [ ByteEnumField("type",142, icmp6types),
- ByteField("code",0),
- XShortField("cksum",None),
- XIntField("reserved",0) ]
- overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
-
-
-###############################################################################
-# ICMPv6 Node Information Queries (RFC 4620)
-###############################################################################
-
-# [ ] Add automatic destination address computation using computeNIGroupAddr
-# in IPv6 class (Scapy6 modification when integrated) if :
-# - it is not provided
-# - upper layer is ICMPv6NIQueryName() with a valid value
-# [ ] Try to be liberal in what we accept as internal values for _explicit_
-# DNS elements provided by users. Any string should be considered
-# valid and kept like it has been provided. At the moment, i2repr() will
-# crash on many inputs
-# [ ] Do the documentation
-# [ ] Add regression tests
-# [ ] Perform test against real machines (NOOP reply is proof of implementation).
-# [ ] Check if there are differences between different stacks. Among *BSD,
-# with others.
-# [ ] Deal with flags in a consistent way.
-# [ ] Implement compression in names2dnsrepr() and decompresiion in
-# dnsrepr2names(). Should be deactivable.
-
-icmp6_niqtypes = { 0: "NOOP",
- 2: "Node Name",
- 3: "IPv6 Address",
- 4: "IPv4 Address" }
-
-
-class _ICMPv6NIHashret:
- def hashret(self):
- return self.nonce
-
-class _ICMPv6NIAnswers:
- def answers(self, other):
- return self.nonce == other.nonce
-
-# Buggy; always returns the same value during a session
-class NonceField(StrFixedLenField):
- def __init__(self, name, default=None):
- StrFixedLenField.__init__(self, name, default, 8)
- if default is None:
- self.default = self.randval()
-
-# Compute the NI group Address. Can take a FQDN as input parameter
-def computeNIGroupAddr(name):
- import md5
- name = name.lower().split(".")[0]
- record = chr(len(name))+name
- h = md5.new(record)
- h = h.digest()
- addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
- return addr
-
-
-# Here is the deal. First, that protocol is a piece of shit. Then, we
-# provide 4 classes for the different kinds of Requests (one for every
-# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
-# data field class that is made to be smart by guessing the specifc
-# type of value provided :
-#
-# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
-# if not overriden by user
-# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
-# if not overriden
-# - Name in the other cases: code is set to 0, if not overriden by user
-#
-# Internal storage, is not only the value, but the a pair providing
-# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
-#
-# Note : I merged getfield() and m2i(). m2i() should not be called
-# directly anyway. Same remark for addfield() and i2m()
-#
-# -- arno
-
-# "The type of information present in the Data field of a query is
-# declared by the ICMP Code, whereas the type of information in a
-# Reply is determined by the Qtype"
-
-def names2dnsrepr(x):
- """
- Take as input a list of DNS names or a single DNS name
- and encode it in DNS format (with possible compression)
- If a string that is already a DNS name in DNS format
- is passed, it is returned unmodified. Result is a string.
- !!! At the moment, compression is not implemented !!!
- """
-
- if type(x) is str:
- if x and x[-1] == '\x00': # stupid heuristic
- return x.encode('ascii')
- x = [x.encode('ascii')]
- elif type(x) is bytes:
- if x and x[-1] == 0:
- return x
- x = [x]
-
- res = []
- for n in x:
- if type(n) is str:
- n = n.encode('ascii')
- termin = b"\x00"
- if n.count(b'.') == 0: # single-component gets one more
- termin += bytes([0])
- n = b"".join(map(lambda y: chr(len(y)).encode('ascii')+y, n.split(b"."))) + termin
- res.append(n)
- return b"".join(res)
-
-
-def dnsrepr2names(x):
- """
- Take as input a DNS encoded string (possibly compressed)
- and returns a list of DNS names contained in it.
- If provided string is already in printable format
- (does not end with a null character, a one element list
- is returned). Result is a list.
- """
- res = []
- cur = b""
- if type(x) is str:
- x = x.encode('ascii')
- while x:
- #l = ord(x[0])
- l = x[0]
- x = x[1:]
- if l == 0:
- if cur and cur[-1] == ord('.'):
- cur = cur[:-1]
- res.append(cur)
- cur = b""
- #if x and ord(x[0]) == 0: # single component
- if x and x[0] == 0: # single component
- x = x[1:]
- continue
- if l & 0xc0: # XXX TODO : work on that -- arno
- raise Exception("DNS message can't be compressed at this point!")
- else:
- cur += x[:l]+b"."
- x = x[l:]
- return res
-
-
-class NIQueryDataField(StrField):
- def __init__(self, name, default):
- StrField.__init__(self, name, default)
-
- def i2h(self, pkt, x):
- if x is None:
- return x
- t,val = x
- if t == 1:
- val = dnsrepr2names(val)[0]
- return val
-
- def h2i(self, pkt, x):
- if x is tuple and type(x[0]) is int:
- return x
-
- val = None
- try: # Try IPv6
- inet_pton(socket.AF_INET6, x)
- val = (0, x)
- except:
- try: # Try IPv4
- inet_pton(socket.AF_INET, x)
- val = (2, x)
- except: # Try DNS
- if x is None:
- x = b""
- x = names2dnsrepr(x)
- val = (1, x)
- return val
-
- def i2repr(self, pkt, x):
- t,val = x
- if t == 1: # DNS Name
- # we don't use dnsrepr2names() to deal with
- # possible weird data extracted info
- res = []
- weird = None
- while val:
- #l = ord(val[0])
- l = val[0]
- val = val[1:]
- if l == 0:
- if (len(res) > 1 and val): # fqdn with data behind
- weird = val
- elif len(val) > 1: # single label with data behind
- weird = val[1:]
- break
- res.append(val[:l]+".")
- val = val[l:]
- tmp = "".join(res)
- if tmp and tmp[-1] == '.':
- tmp = tmp[:-1]
- return tmp
- return repr(val)
-
- def getfield(self, pkt, s):
- qtype = getattr(pkt, "qtype")
- if qtype == 0: # NOOP
- return s, (0, b"")
- else:
- code = getattr(pkt, "code")
- if code == 0: # IPv6 Addr
- return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
- elif code == 2: # IPv4 Addr
- return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
- else: # Name or Unknown
- return b"", (1, s)
-
- def addfield(self, pkt, s, val):
- if ((type(val) is tuple and val[1] is None) or
- val is None):
- val = (1, b"")
- t = val[0]
- if t == 1:
- if type(val[1]) is str:
- tmp = val[1].encode('ascii')
- else:
- tmp = val[1]
- return s + tmp
- elif t == 0:
- return s + inet_pton(socket.AF_INET6, val[1])
- else:
- return s + inet_pton(socket.AF_INET, val[1])
-
-class NIQueryCodeField(ByteEnumField):
- def i2m(self, pkt, x):
- if x is None:
- d = pkt.getfieldval("data")
- if d is None:
- return 1
- elif d[0] == 0: # IPv6 address
- return 0
- elif d[0] == 1: # Name
- return 1
- elif d[0] == 2: # IPv4 address
- return 2
- else:
- return 1
- return x
-
-
-_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
-
-#_niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
-# 8: "Link-local addresses", 16: "Site-local addresses",
-# 32: "Global addresses" }
-
-# "This NI type has no defined flags and never has a Data Field". Used
-# to know if the destination is up and implements NI protocol.
-class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
- name = "ICMPv6 Node Information Query - NOOP Query"
- fields_desc = [ ByteEnumField("type", 139, icmp6types),
- NIQueryCodeField("code", None, _niquery_code),
- XShortField("cksum", None),
- ShortEnumField("qtype", 0, icmp6_niqtypes),
- BitField("unused", 0, 10),
- FlagsField("flags", 0, 6, "TACLSG"),
- NonceField("nonce", None),
- NIQueryDataField("data", None) ]
-
-class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
- name = "ICMPv6 Node Information Query - IPv6 Name Query"
- qtype = 2
-
-# We ask for the IPv6 address of the peer
-class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
- name = "ICMPv6 Node Information Query - IPv6 Address Query"
- qtype = 3
- flags = 0x3E
-
-class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
- name = "ICMPv6 Node Information Query - IPv4 Address Query"
- qtype = 4
-
-_nireply_code = { 0: "Successful Reply",
- 1: "Response Refusal",
- 3: "Unknown query type" }
-
-_nireply_flags = { 1: "Reply set incomplete",
- 2: "All unicast addresses",
- 4: "IPv4 addresses",
- 8: "Link-local addresses",
- 16: "Site-local addresses",
- 32: "Global addresses" }
-
-# Internal repr is one of those :
-# (0, "some string") : unknow qtype value are mapped to that one
-# (3, [ (ttl, ip6), ... ])
-# (4, [ (ttl, ip4), ... ])
-# (2, [ttl, dns_names]) : dns_names is one string that contains
-# all the DNS names. Internally it is kept ready to be sent
-# (undissected). i2repr() decode it for user. This is to
-# make build after dissection bijective.
-#
-# I also merged getfield() and m2i(), and addfield() and i2m().
-class NIReplyDataField(StrField):
-
- def i2h(self, pkt, x):
- if x is None:
- return x
- t,val = x
- if t == 2:
- ttl, dnsnames = val
- val = [ttl] + dnsrepr2names(dnsnames)
- return val
-
- def h2i(self, pkt, x):
- qtype = 0 # We will decode it as string if not
- # overridden through 'qtype' in pkt
-
- # No user hint, let's use 'qtype' value for that purpose
- if type(x) is not tuple:
- if pkt is not None:
- qtype = getattr(pkt, "qtype")
- else:
- qtype = x[0]
- x = x[1]
-
- # From that point on, x is the value (second element of the tuple)
-
- if qtype == 2: # DNS name
- if type(x) is str: # listify the string
- x = x.encode('ascii')
- x = [x]
- elif type(x) is bytes:
- x = [x]
- if type(x) is list and x and type(x[0]) is not int: # ttl was omitted : use 0
- x = [0] + x
- ttl = x[0]
- names = x[1:]
- return (2, [ttl, names2dnsrepr(names)])
-
- elif qtype in [3, 4]: # IPv4 or IPv6 addr
- if type(x) is str or type(x) is bytes:
- x = [x] # User directly provided an IP, instead of list
-
- # List elements are not tuples, user probably
- # omitted ttl value : we will use 0 instead
- def addttl(x):
- if type(x) is str or type(x) is bytes:
- return (0, x)
- return x
-
- return (qtype, list(map(addttl, x)))
-
- return (qtype, x)
-
-
- def addfield(self, pkt, s, val):
- t,tmp = val
- if tmp is None:
- tmp = b""
- if t == 2:
- ttl,dnsstr = tmp
- return s+ struct.pack("!I", ttl) + dnsstr
- elif t == 3:
- #return s + "".join(map(lambda (x,y): struct.pack("!I", x)+inet_pton(socket.AF_INET6, y), tmp))
- return s + b"".join(map(lambda a: struct.pack("!I", a[0])+inet_pton(socket.AF_INET6, a[1]), tmp))
- elif t == 4:
- #return s + "".join(map(lambda (x,y): struct.pack("!I", x)+inet_pton(socket.AF_INET, y), tmp))
- return s + b"".join(map(lambda a: struct.pack("!I", a[0])+inet_pton(socket.AF_INET, a[1]), tmp))
- else:
- return s + tmp
-
- def getfield(self, pkt, s):
- code = getattr(pkt, "code")
- if code != 0:
- return s, (0, b"")
-
- qtype = getattr(pkt, "qtype")
- if qtype == 0: # NOOP
- return s, (0, b"")
-
- elif qtype == 2:
- if len(s) < 4:
- return s, (0, b"")
- ttl = struct.unpack("!I", s[:4])[0]
- return b"", (2, [ttl, s[4:]])
-
- elif qtype == 3: # IPv6 addresses with TTLs
- # XXX TODO : get the real length
- res = []
- while len(s) >= 20: # 4 + 16
- ttl = struct.unpack("!I", s[:4])[0]
- ip = inet_ntop(socket.AF_INET6, s[4:20])
- res.append((ttl, ip))
- s = s[20:]
- return s, (3, res)
-
- elif qtype == 4: # IPv4 addresses with TTLs
- # XXX TODO : get the real length
- res = []
- while len(s) >= 8: # 4 + 4
- ttl = struct.unpack("!I", s[:4])[0]
- ip = inet_ntop(socket.AF_INET, s[4:8])
- res.append((ttl, ip))
- s = s[8:]
- return s, (4, res)
- else:
- # XXX TODO : implement me and deal with real length
- return b"", (0, s)
-
- def i2repr(self, pkt, x):
- if x is None:
- return "[]"
-
- if type(x) is tuple and len(x) == 2:
- t, val = x
- if t == 2: # DNS names
- ttl,l = val
- l = dnsrepr2names(l)
- return "ttl:%d %s" % (ttl, ", ".join(l))
- elif t == 3 or t == 4:
- #return "[ %s ]" % (", ".join(map(lambda (x,y): "(%d, %s)" % (x, y), val)))
- return "[ %s ]" % (", ".join(map(lambda a: "(%d, %s)" % a, val)))
- return repr(val)
- return repr(x) # XXX should not happen
-
-# By default, sent responses have code set to 0 (successful)
-class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
- name = "ICMPv6 Node Information Reply - NOOP Reply"
- fields_desc = [ ByteEnumField("type", 140, icmp6types),
- ByteEnumField("code", 0, _nireply_code),
- XShortField("cksum", None),
- ShortEnumField("qtype", 0, icmp6_niqtypes),
- BitField("unused", 0, 10),
- FlagsField("flags", 0, 6, "TACLSG"),
- NonceField("nonce", None),
- NIReplyDataField("data", None)]
-
-class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
- name = "ICMPv6 Node Information Reply - Node Names"
- qtype = 2
-
-class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
- name = "ICMPv6 Node Information Reply - IPv6 addresses"
- qtype = 3
-
-class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
- name = "ICMPv6 Node Information Reply - IPv4 addresses"
- qtype = 4
-
-class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
- name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
- code = 1
-
-class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
- name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
- code = 2
-
-
-def _niquery_guesser(p):
- cls = conf.raw_layer
- #type = ord(p[0])
- type = p[0]
- if type == 139: # Node Info Query specific stuff
- if len(p) > 6:
- qtype, = struct.unpack("!H", p[4:6])
- cls = { 0: ICMPv6NIQueryNOOP,
- 2: ICMPv6NIQueryName,
- 3: ICMPv6NIQueryIPv6,
- 4: ICMPv6NIQueryIPv4 }.get(qtype, conf.raw_layer)
- elif type == 140: # Node Info Reply specific stuff
- #code = ord(p[1])
- code = p[1]
- if code == 0:
- if len(p) > 6:
- qtype, = struct.unpack("!H", p[4:6])
- cls = { 2: ICMPv6NIReplyName,
- 3: ICMPv6NIReplyIPv6,
- 4: ICMPv6NIReplyIPv4 }.get(qtype, ICMPv6NIReplyNOOP)
- elif code == 1:
- cls = ICMPv6NIReplyRefuse
- elif code == 2:
- cls = ICMPv6NIReplyUnknown
- return cls
-
-
-#############################################################################
-#############################################################################
-### Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) ###
-#############################################################################
-#############################################################################
-
-# Mobile IPv6 ICMPv6 related classes
-
-class ICMPv6HAADRequest(_ICMPv6):
- name = 'ICMPv6 Home Agent Address Discovery Request'
- fields_desc = [ ByteEnumField("type", 144, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- XShortField("id", None),
- BitEnumField("R", 1, 1, {1: 'MR'}),
- XBitField("res", 0, 15) ]
- def hashret(self):
- return struct.pack("!H",self.id)+self.payload.hashret()
-
-class ICMPv6HAADReply(_ICMPv6):
- name = 'ICMPv6 Home Agent Address Discovery Reply'
- fields_desc = [ ByteEnumField("type", 145, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- XShortField("id", None),
- BitEnumField("R", 1, 1, {1: 'MR'}),
- XBitField("res", 0, 15),
- IP6ListField('addresses', None) ]
- def hashret(self):
- return struct.pack("!H",self.id)+self.payload.hashret()
-
- def answers(self, other):
- if not isinstance(other, ICMPv6HAADRequest):
- return 0
- return self.id == other.id
-
-class ICMPv6MPSol(_ICMPv6):
- name = 'ICMPv6 Mobile Prefix Solicitation'
- fields_desc = [ ByteEnumField("type", 146, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- XShortField("id", None),
- XShortField("res", 0) ]
- def _hashret(self):
- return struct.pack("!H",self.id)
-
-class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
- name = 'ICMPv6 Mobile Prefix Advertisement'
- fields_desc = [ ByteEnumField("type", 147, icmp6types),
- ByteField("code", 0),
- XShortField("cksum", None),
- XShortField("id", None),
- BitEnumField("flags", 2, 2, {2: 'M', 1:'O'}),
- XBitField("res", 0, 14) ]
- def hashret(self):
- return struct.pack("!H",self.id)
-
- def answers(self, other):
- return isinstance(other, ICMPv6MPSol)
-
-# Mobile IPv6 Options classes
-
-
-_mobopttypes = { 2: "Binding Refresh Advice",
- 3: "Alternate Care-of Address",
- 4: "Nonce Indices",
- 5: "Binding Authorization Data",
- 6: "Mobile Network Prefix (RFC3963)",
- 7: "Link-Layer Address (RFC4068)",
- 8: "Mobile Node Identifier (RFC4283)",
- 9: "Mobility Message Authentication (RFC4285)",
- 10: "Replay Protection (RFC4285)",
- 11: "CGA Parameters Request (RFC4866)",
- 12: "CGA Parameters (RFC4866)",
- 13: "Signature (RFC4866)",
- 14: "Home Keygen Token (RFC4866)",
- 15: "Care-of Test Init (RFC4866)",
- 16: "Care-of Test (RFC4866)" }
-
-
-class _MIP6OptAlign:
- """ Mobile IPv6 options have alignment requirements of the form x*n+y.
- This class is inherited by all MIPv6 options to help in computing the
- required Padding for that option, i.e. the need for a Pad1 or PadN
- option before it. They only need to provide x and y as class
- parameters. (x=0 and y=0 are used when no alignment is required)"""
- def alignment_delta(self, curpos):
- x = self.x ; y = self.y
- if x == 0 and y ==0:
- return 0
- delta = x*((curpos - y + x - 1)//x) + y - curpos
- return delta
-
-
-class MIP6OptBRAdvice(_MIP6OptAlign, Packet):
- name = 'Mobile IPv6 Option - Binding Refresh Advice'
- fields_desc = [ ByteEnumField('otype', 2, _mobopttypes),
- ByteField('olen', 2),
- ShortField('rinter', 0) ]
- x = 2 ; y = 0# alignment requirement: 2n
-
-class MIP6OptAltCoA(_MIP6OptAlign, Packet):
- name = 'MIPv6 Option - Alternate Care-of Address'
- fields_desc = [ ByteEnumField('otype', 3, _mobopttypes),
- ByteField('olen', 16),
- IP6Field("acoa", "::") ]
- x = 8 ; y = 6 # alignment requirement: 8n+6
-
-class MIP6OptNonceIndices(_MIP6OptAlign, Packet):
- name = 'MIPv6 Option - Nonce Indices'
- fields_desc = [ ByteEnumField('otype', 4, _mobopttypes),
- ByteField('olen', 16),
- ShortField('hni', 0),
- ShortField('coni', 0) ]
- x = 2 ; y = 0 # alignment requirement: 2n
-
-class MIP6OptBindingAuthData(_MIP6OptAlign, Packet):
- name = 'MIPv6 Option - Binding Authorization Data'
- fields_desc = [ ByteEnumField('otype', 5, _mobopttypes),
- ByteField('olen', 16),
- BitField('authenticator', 0, 96) ]
- x = 8 ; y = 2 # alignment requirement: 8n+2
-
-class MIP6OptMobNetPrefix(_MIP6OptAlign, Packet): # NEMO - RFC 3963
- name = 'NEMO Option - Mobile Network Prefix'
- fields_desc = [ ByteEnumField("otype", 6, _mobopttypes),
- ByteField("olen", 18),
- ByteField("reserved", 0),
- ByteField("plen", 64),
- IP6Field("prefix", "::") ]
- x = 8 ; y = 4 # alignment requirement: 8n+4
-
-class MIP6OptLLAddr(_MIP6OptAlign, Packet): # Sect 6.4.4 of RFC 4068
- name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
- fields_desc = [ ByteEnumField("otype", 7, _mobopttypes),
- ByteField("olen", 7),
- ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
- ByteField("pad", 0),
- MACField("lla", ETHER_ANY) ] # Only support ethernet
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptMNID(_MIP6OptAlign, Packet): # RFC 4283
- name = "MIPv6 Option - Mobile Node Identifier"
- fields_desc = [ ByteEnumField("otype", 8, _mobopttypes),
- FieldLenField("olen", None, length_of="id", fmt="B",
- adjust = lambda pkt,x: x+1),
- ByteEnumField("subtype", 1, {1: "NAI"}),
- StrLenField("id", "",
- length_from = lambda pkt: pkt.olen-1) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-# We only support decoding and basic build. Automatic HMAC computation is
-# too much work for our current needs. It is left to the user (I mean ...
-# you). --arno
-class MIP6OptMsgAuth(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 5)
- name = "MIPv6 Option - Mobility Message Authentication"
- fields_desc = [ ByteEnumField("otype", 9, _mobopttypes),
- FieldLenField("olen", None, length_of="authdata", fmt="B",
- adjust = lambda pkt,x: x+5),
- ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option",
- 2: "MN-AAA authentication mobility option"}),
- IntField("mspi", None),
- StrLenField("authdata", "A"*12,
- length_from = lambda pkt: pkt.olen-5) ]
- x = 4 ; y = 1 # alignment requirement: 4n+1
-
-# Extracted from RFC 1305 (NTP) :
-# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
-# in seconds relative to 0h on 1 January 1900. The integer part is in the
-# first 32 bits and the fraction part in the last 32 bits.
-class NTPTimestampField(LongField):
- epoch = (1900, 1, 1, 0, 0, 0, 5, 1, 0)
- def i2repr(self, pkt, x):
- if x < ((50*31536000)<<32):
- return "Some date a few decades ago (%d)" % x
-
- # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
- # January 1st 1970 :
- delta = -2209075761
- i = int(x >> 32)
- j = float(x & 0xffffffff) * 2.0**-32
- res = i + j + delta
- from time import strftime
- t = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(res))
-
- return "%s (%d)" % (t, x)
-
-class MIP6OptReplayProtection(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 6)
- name = "MIPv6 option - Replay Protection"
- fields_desc = [ ByteEnumField("otype", 10, _mobopttypes),
- ByteField("olen", 8),
- NTPTimestampField("timestamp", 0) ]
- x = 8 ; y = 2 # alignment requirement: 8n+2
-
-class MIP6OptCGAParamsReq(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.6)
- name = "MIPv6 option - CGA Parameters Request"
- fields_desc = [ ByteEnumField("otype", 11, _mobopttypes),
- ByteField("olen", 0) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-# XXX TODO: deal with CGA param fragmentation and build of defragmented
-# XXX version. Passing of a big CGAParam structure should be
-# XXX simplified. Make it hold packets, by the way --arno
-class MIP6OptCGAParams(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.1)
- name = "MIPv6 option - CGA Parameters"
- fields_desc = [ ByteEnumField("otype", 12, _mobopttypes),
- FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
- StrLenField("cgaparams", "",
- length_from = lambda pkt: pkt.olen) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptSignature(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.2)
- name = "MIPv6 option - Signature"
- fields_desc = [ ByteEnumField("otype", 13, _mobopttypes),
- FieldLenField("olen", None, length_of="sig", fmt="B"),
- StrLenField("sig", "",
- length_from = lambda pkt: pkt.olen) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptHomeKeygenToken(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.3)
- name = "MIPv6 option - Home Keygen Token"
- fields_desc = [ ByteEnumField("otype", 14, _mobopttypes),
- FieldLenField("olen", None, length_of="hkt", fmt="B"),
- StrLenField("hkt", "",
- length_from = lambda pkt: pkt.olen) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptCareOfTestInit(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.4)
- name = "MIPv6 option - Care-of Test Init"
- fields_desc = [ ByteEnumField("otype", 15, _mobopttypes),
- ByteField("olen", 0) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptCareOfTest(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.5)
- name = "MIPv6 option - Care-of Test"
- fields_desc = [ ByteEnumField("otype", 16, _mobopttypes),
- FieldLenField("olen", None, length_of="cokt", fmt="B"),
- StrLenField("cokt", '\x00'*8,
- length_from = lambda pkt: pkt.olen) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-class MIP6OptUnknown(_MIP6OptAlign, Packet):
- name = 'Scapy6 - Unknown Mobility Option'
- fields_desc = [ ByteEnumField("otype", 6, _mobopttypes),
- FieldLenField("olen", None, length_of="odata", fmt="B"),
- StrLenField("odata", "",
- length_from = lambda pkt: pkt.olen) ]
- x = 0 ; y = 0 # alignment requirement: none
-
-moboptcls = { 0: Pad1,
- 1: PadN,
- 2: MIP6OptBRAdvice,
- 3: MIP6OptAltCoA,
- 4: MIP6OptNonceIndices,
- 5: MIP6OptBindingAuthData,
- 6: MIP6OptMobNetPrefix,
- 7: MIP6OptLLAddr,
- 8: MIP6OptMNID,
- 9: MIP6OptMsgAuth,
- 10: MIP6OptReplayProtection,
- 11: MIP6OptCGAParamsReq,
- 12: MIP6OptCGAParams,
- 13: MIP6OptSignature,
- 14: MIP6OptHomeKeygenToken,
- 15: MIP6OptCareOfTestInit,
- 16: MIP6OptCareOfTest }
-
-
-# Main Mobile IPv6 Classes
-
-mhtypes = { 0: 'BRR',
- 1: 'HoTI',
- 2: 'CoTI',
- 3: 'HoT',
- 4: 'CoT',
- 5: 'BU',
- 6: 'BA',
- 7: 'BE',
- 8: 'Fast BU',
- 9: 'Fast BA',
- 10: 'Fast NA' }
-
-# From http://www.iana.org/assignments/mobility-parameters
-bastatus = { 0: 'Binding Update accepted',
- 1: 'Accepted but prefix discovery necessary',
- 128: 'Reason unspecified',
- 129: 'Administratively prohibited',
- 130: 'Insufficient resources',
- 131: 'Home registration not supported',
- 132: 'Not home subnet',
- 133: 'Not home agent for this mobile node',
- 134: 'Duplicate Address Detection failed',
- 135: 'Sequence number out of window',
- 136: 'Expired home nonce index',
- 137: 'Expired care-of nonce index',
- 138: 'Expired nonces',
- 139: 'Registration type change disallowed',
- 140: 'Mobile Router Operation not permitted',
- 141: 'Invalid Prefix',
- 142: 'Not Authorized for Prefix',
- 143: 'Forwarding Setup failed (prefixes missing)',
- 144: 'MIPV6-ID-MISMATCH',
- 145: 'MIPV6-MESG-ID-REQD',
- 146: 'MIPV6-AUTH-FAIL',
- 147: 'Permanent home keygen token unavailable',
- 148: 'CGA and signature verification failed',
- 149: 'Permanent home keygen token exists',
- 150: 'Non-null home nonce index expected' }
-
-
-class _MobilityHeader(Packet):
- name = 'Dummy IPv6 Mobility Header'
- overload_fields = { IPv6: { "nh": 135 }}
-
- def post_build(self, p, pay):
- p += pay
- l = self.len
- if self.len is None:
- l = (len(p)-8)//8
- p = bytes([p[0]]) + struct.pack("B", l) + p[2:]
- if self.cksum is None:
- cksum = in6_chksum(135, self.underlayer, p)
- else:
- cksum = self.cksum
- p = p[:4]+struct.pack("!H", cksum)+p[6:]
- return p
-
-
-class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
- name = "IPv6 Mobility Header - Generic Message"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None),
- ByteEnumField("mhtype", None, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- StrLenField("msg", b"\x00"*2,
- length_from = lambda pkt: 8*pkt.len-6) ]
-
-
-
-# TODO: make a generic _OptionsField
-class _MobilityOptionsField(PacketListField):
- islist = 1
- holds_packet = 1
-
- def __init__(self, name, default, cls, curpos, count_from=None, length_from=None):
- self.curpos = curpos
- PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from)
-
- def getfield(self, pkt, s):
- l = self.length_from(pkt)
- return s[l:],self.m2i(pkt, s[:l])
-
- def i2len(self, pkt, i):
- return len(self.i2m(pkt, i))
-
- def m2i(self, pkt, x):
- opt = []
- while x:
- #o = ord(x[0]) # Option type
- o = x[0] # Option type
- cls = self.cls
- if o in moboptcls:
- cls = moboptcls[o]
- try:
- op = cls(x)
- except:
- op = self.cls(x)
- opt.append(op)
- if isinstance(op.payload, conf.raw_layer):
- x = op.payload.load
- del(op.payload)
- else:
- x = b""
- return opt
-
- def i2m(self, pkt, x):
- autopad = None
- try:
- autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
- except:
- autopad = 1
-
- if not autopad:
- return b"".join(map(str, x))
-
- curpos = self.curpos
- s = b""
- for p in x:
- d = p.alignment_delta(curpos)
- curpos += d
- if d == 1:
- s += bytes(Pad1())
- elif d != 0:
- s += bytes(PadN(optdata=b'\x00'*(d-2)))
- pstr = bytes(p)
- curpos += len(pstr)
- s += pstr
-
- # Let's make the class including our option field
- # a multiple of 8 octets long
- d = curpos % 8
- if d == 0:
- return s
- d = 8 - d
- if d == 1:
- s +=bytes(Pad1())
- elif d != 0:
- s += bytes(PadN(optdata=b'\x00'*(d-2)))
-
- return s
-
- def addfield(self, pkt, s, val):
- return s+self.i2m(pkt, val)
-
-class MIP6MH_BRR(_MobilityHeader):
- name = "IPv6 Mobility Header - Binding Refresh Request"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None),
- ByteEnumField("mhtype", 0, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- ShortField("res2", None),
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _MobilityOptionsField("options", [], MIP6OptUnknown, 8,
- length_from = lambda pkt: 8*pkt.len) ]
- overload_fields = { IPv6: { "nh": 135 } }
- def hashret(self):
- # Hack: BRR, BU and BA have the same hashret that returns the same
- # value "\x00\x08\x09" (concatenation of mhtypes). This is
- # because we need match BA with BU and BU with BRR. --arno
- return b"\x00\x08\x09"
-
-class MIP6MH_HoTI(_MobilityHeader):
- name = "IPv6 Mobility Header - Home Test Init"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None),
- ByteEnumField("mhtype", 1, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- StrFixedLenField("reserved", "\x00"*2, 2),
- StrFixedLenField("cookie", "\x00"*8, 8),
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _MobilityOptionsField("options", [], MIP6OptUnknown, 16,
- length_from = lambda pkt: 8*(pkt.len-1)) ]
- overload_fields = { IPv6: { "nh": 135 } }
- def hashret(self):
- return self.cookie
-
-class MIP6MH_CoTI(MIP6MH_HoTI):
- name = "IPv6 Mobility Header - Care-of Test Init"
- mhtype = 2
- def hashret(self):
- return self.cookie
-
-class MIP6MH_HoT(_MobilityHeader):
- name = "IPv6 Mobility Header - Home Test"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None),
- ByteEnumField("mhtype", 3, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- ShortField("index", None),
- StrFixedLenField("cookie", "\x00"*8, 8),
- StrFixedLenField("token", "\x00"*8, 8),
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _MobilityOptionsField("options", [], MIP6OptUnknown, 24,
- length_from = lambda pkt: 8*(pkt.len-2)) ]
- overload_fields = { IPv6: { "nh": 135 } }
- def hashret(self):
- return self.cookie
- def answers(self):
- if (isinstance(other, MIP6MH_HoTI) and
- self.cookie == other.cookie):
- return 1
- return 0
-
-class MIP6MH_CoT(MIP6MH_HoT):
- name = "IPv6 Mobility Header - Care-of Test"
- mhtype = 4
- def hashret(self):
- return self.cookie
-
- def answers(self):
- if (isinstance(other, MIP6MH_CoTI) and
- self.cookie == other.cookie):
- return 1
- return 0
-
-class LifetimeField(ShortField):
- def i2repr(self, pkt, x):
- return "%d sec" % (4*x)
-
-class MIP6MH_BU(_MobilityHeader):
- name = "IPv6 Mobility Header - Binding Update"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
- ByteEnumField("mhtype", 5, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- XShortField("seq", None), # TODO: ShortNonceField
- FlagsField("flags", "KHA", 7, "PRMKLHA"),
- XBitField("reserved", 0, 9),
- LifetimeField("mhtime", 3), # unit == 4 seconds
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _MobilityOptionsField("options", [], MIP6OptUnknown, 12,
- length_from = lambda pkt: 8*pkt.len - 4) ]
- overload_fields = { IPv6: { "nh": 135 } }
-
- def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
- return "\x00\x08\x09"
-
- def answers(self, other):
- if isinstance(other, MIP6MH_BRR):
- return 1
- return 0
-
-class MIP6MH_BA(_MobilityHeader):
- name = "IPv6 Mobility Header - Binding ACK"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
- ByteEnumField("mhtype", 6, mhtypes),
- ByteField("res", None),
- XShortField("cksum", None),
- ByteEnumField("status", 0, bastatus),
- FlagsField("flags", "K", 3, "PRK"),
- XBitField("res2", None, 5),
- XShortField("seq", None), # TODO: ShortNonceField
- XShortField("mhtime", 0), # unit == 4 seconds
- _PhantomAutoPadField("autopad", 1), # autopad activated by default
- _MobilityOptionsField("options", [], MIP6OptUnknown, 12,
- length_from = lambda pkt: 8*pkt.len-4) ]
- overload_fields = { IPv6: { "nh": 135 }}
-
- def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
- return "\x00\x08\x09"
-
- def answers(self, other):
- if (isinstance(other, MIP6MH_BU) and
- other.mhtype == 5 and
- self.mhtype == 6 and
- other.flags & 0x1 and # Ack request flags is set
- self.seq == other.seq):
- return 1
- return 0
-
-_bestatus = { 1: 'Unknown binding for Home Address destination option',
- 2: 'Unrecognized MH Type value' }
-
-# TODO: match Binding Error to its stimulus
-class MIP6MH_BE(_MobilityHeader):
- name = "IPv6 Mobility Header - Binding Error"
- fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
- ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
- ByteEnumField("mhtype", 7, mhtypes),
- ByteField("res", 0),
- XShortField("cksum", None),
- ByteEnumField("status", 0, _bestatus),
- ByteField("reserved", 0),
- IP6Field("ha", "::"),
- _MobilityOptionsField("options", [], MIP6OptUnknown, 24,
- length_from = lambda pkt: 8*(pkt.len-2)) ]
- overload_fields = { IPv6: { "nh": 135 }}
-
-_mip6_mhtype2cls = { 0: MIP6MH_BRR,
- 1: MIP6MH_HoTI,
- 2: MIP6MH_CoTI,
- 3: MIP6MH_HoT,
- 4: MIP6MH_CoT,
- 5: MIP6MH_BU,
- 6: MIP6MH_BA,
- 7: MIP6MH_BE }
-
-
-#############################################################################
-#############################################################################
-### Traceroute6 ###
-#############################################################################
-#############################################################################
-
-class AS_resolver6(AS_resolver_riswhois):
- def _resolve_one(self, ip):
- """
- overloaded version to provide a Whois resolution on the
- embedded IPv4 address if the address is 6to4 or Teredo.
- Otherwise, the native IPv6 address is passed.
- """
-
- if in6_isaddr6to4(ip): # for 6to4, use embedded @
- tmp = inet_pton(socket.AF_INET6, ip)
- addr = inet_ntop(socket.AF_INET, tmp[2:6])
- elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
- addr = teredoAddrExtractInfo(ip)[2]
- else:
- addr = ip
-
- _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
-
- return ip,asn,desc
-
-class TracerouteResult6(TracerouteResult):
- def show(self):
- #return self.make_table(lambda (s,r): (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 !
- return self.make_table(lambda s,r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 !
- s.hlim,
- r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}"+
- "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}"+
- "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}"+
- "{ICMPv6EchoReply:%ir,type%}")))
-
- def get_trace(self):
- trace = {}
-
- for s,r in self.res:
- if IPv6 not in s:
- continue
- d = s[IPv6].dst
- if d not in trace:
- trace[d] = {}
-
- t = not (ICMPv6TimeExceeded in r or
- ICMPv6DestUnreach in r or
- ICMPv6PacketTooBig in r or
- ICMPv6ParamProblem in r)
-
- trace[d][s[IPv6].hlim] = r[IPv6].src, t
-
- for k in trace.values():
- #m = filter(lambda x: k[x][1], k.keys())
- m = [ x for x in k.keys() if k[x][1] ]
- if not m:
- continue
- m = min(m)
- for l in k.keys():
- if l > m:
- del(k[l])
-
- return trace
-
- def graph(self, ASres=AS_resolver6(), **kargs):
- TracerouteResult.graph(self, ASres=ASres, **kargs)
-
-def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
- l4 = None, timeout=2, verbose=None, **kargs):
- """
- Instant TCP traceroute using IPv6 :
- traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
- """
- if verbose is None:
- verbose = conf.verb
-
- if l4 is None:
- a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/TCP(seq=RandInt(),sport=sport, dport=dport),
- timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs)
- else:
- a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/l4,
- timeout=timeout, verbose=verbose, **kargs)
-
- a = TracerouteResult6(a.res)
-
- if verbose:
- a.display()
-
- return a,b
-
-#############################################################################
-#############################################################################
-### Sockets ###
-#############################################################################
-#############################################################################
-
-class L3RawSocket6(L3RawSocket):
- def __init__(self, type = ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0):
- L3RawSocket.__init__(self, type, filter, iface, promisc)
- # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292)
- self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW)
- self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
-
-def IPv6inIP(dst='203.178.135.36', src=None):
- _IPv6inIP.dst = dst
- _IPv6inIP.src = src
- if not conf.L3socket == _IPv6inIP:
- _IPv6inIP.cls = conf.L3socket
- else:
- del(conf.L3socket)
- return _IPv6inIP
-
-class _IPv6inIP(SuperSocket):
- dst = '127.0.0.1'
- src = None
- cls = None
-
- def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args):
- SuperSocket.__init__(self, family, type, proto)
- self.worker = self.cls(**args)
-
- def set(self, dst, src=None):
- _IPv6inIP.src = src
- _IPv6inIP.dst = dst
-
- def nonblock_recv(self):
- p = self.worker.nonblock_recv()
- return self._recv(p)
-
- def recv(self, x):
- p = self.worker.recv(x)
- return self._recv(p, x)
-
- def _recv(self, p, x=MTU):
- if p is None:
- return p
- elif isinstance(p, IP):
- # TODO: verify checksum
- if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
- if isinstance(p.payload, IPv6):
- return p.payload
- return p
-
- def send(self, x):
- return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6)/x)
-
-
-#############################################################################
-#############################################################################
-### Layers binding ###
-#############################################################################
-#############################################################################
-
-conf.l3types.register(ETH_P_IPV6, IPv6)
-conf.l2types.register(31, IPv6)
-
-bind_layers(Ether, IPv6, type = 0x86dd )
-bind_layers(CookedLinux, IPv6, proto = 0x86dd )
-bind_layers(IPerror6, TCPerror, nh = socket.IPPROTO_TCP )
-bind_layers(IPerror6, UDPerror, nh = socket.IPPROTO_UDP )
-bind_layers(IPv6, TCP, nh = socket.IPPROTO_TCP )
-bind_layers(IPv6, UDP, nh = socket.IPPROTO_UDP )
-bind_layers(IP, IPv6, proto = socket.IPPROTO_IPV6 )
-bind_layers(IPv6, IPv6, nh = socket.IPPROTO_IPV6 )
-
-bind_layers(IPv6, IP, nh = IPPROTO_IPIP )