summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
committerimarom <imarom@cisco.com>2016-03-21 16:03:47 +0200
commitb89efa188810bf95a9d245e69e2961b5721c3b0f (patch)
tree454273ac6c4ae972ebb8a2c86b893296970b4fa9 /scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py
parentf72c6df9d2e9998ae1f3529d729ab7930b35785a (diff)
scapy python 2/3
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py')
-rw-r--r--scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py542
1 files changed, 0 insertions, 542 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py b/scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py
deleted file mode 100644
index 3f80ed7d..00000000
--- a/scripts/external_libs/scapy-2.3.1/scapy/layers/l2.py
+++ /dev/null
@@ -1,542 +0,0 @@
-## This file is part of Scapy
-## See http://www.secdev.org/projects/scapy for more informations
-## Copyright (C) Philippe Biondi <phil@secdev.org>
-## This program is published under a GPLv2 license
-
-"""
-Classes and functions for layer 2 protocols.
-"""
-
-import os,struct,time
-from scapy.base_classes import Net
-from scapy.config import conf
-from scapy.packet import *
-from scapy.ansmachine import *
-from scapy.plist import SndRcvList
-from scapy.fields import *
-from scapy.sendrecv import srp,srp1
-from scapy.arch import get_if_hwaddr
-
-
-
-
-#################
-## Tools ##
-#################
-
-
-class Neighbor:
- def __init__(self):
- self.resolvers = {}
-
- def register_l3(self, l2, l3, resolve_method):
- self.resolvers[l2,l3]=resolve_method
-
- def resolve(self, l2inst, l3inst):
- k = l2inst.__class__,l3inst.__class__
- if k in self.resolvers:
- return self.resolvers[k](l2inst,l3inst)
-
- def __repr__(self):
- return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2,l3 in self.resolvers)
-
-conf.neighbor = Neighbor()
-
-conf.netcache.new_cache("arp_cache", 120) # cache entries expire after 120s
-
-
-@conf.commands.register
-def getmacbyip(ip, chainCC=0):
- """Return MAC address corresponding to a given IP address"""
- if isinstance(ip,Net):
- ip = iter(ip).next()
- ip = inet_ntoa(inet_aton(ip))
- tmp = map(ord, inet_aton(ip))
- if (tmp[0] & 0xf0) == 0xe0: # mcast @
- return "01:00:5e:%.2x:%.2x:%.2x" % (tmp[1]&0x7f,tmp[2],tmp[3])
- iff,a,gw = conf.route.route(ip)
- if ( (iff == "lo") or (ip == conf.route.get_if_bcast(iff)) ):
- return "ff:ff:ff:ff:ff:ff"
- if gw != "0.0.0.0":
- ip = gw
-
- mac = conf.netcache.arp_cache.get(ip)
- if mac:
- return mac
-
- res = srp1(Ether(dst=ETHER_BROADCAST)/ARP(op="who-has", pdst=ip),
- type=ETH_P_ARP,
- iface = iff,
- timeout=2,
- verbose=0,
- chainCC=chainCC,
- nofilter=1)
- if res is not None:
- mac = res.payload.hwsrc
- conf.netcache.arp_cache[ip] = mac
- return mac
- return None
-
-
-
-### Fields
-
-class DestMACField(MACField):
- def __init__(self, name):
- MACField.__init__(self, name, None)
- def i2h(self, pkt, x):
- if x is None:
- x = conf.neighbor.resolve(pkt,pkt.payload)
- if x is None:
- x = "ff:ff:ff:ff:ff:ff"
- warning("Mac address to reach destination not found. Using broadcast.")
- return MACField.i2h(self, pkt, x)
- def i2m(self, pkt, x):
- return MACField.i2m(self, pkt, self.i2h(pkt, x))
-
-class SourceMACField(MACField):
- def __init__(self, name):
- MACField.__init__(self, name, None)
- def i2h(self, pkt, x):
- if x is None:
- iff,a,gw = pkt.payload.route()
- if iff:
- try:
- x = get_if_hwaddr(iff)
- except:
- pass
- if x is None:
- x = "00:00:00:00:00:00"
- return MACField.i2h(self, pkt, x)
- def i2m(self, pkt, x):
- return MACField.i2m(self, pkt, self.i2h(pkt, x))
-
-class ARPSourceMACField(MACField):
- def __init__(self, name):
- MACField.__init__(self, name, None)
- def i2h(self, pkt, x):
- if x is None:
- iff,a,gw = pkt.route()
- if iff:
- try:
- x = get_if_hwaddr(iff)
- except:
- pass
- if x is None:
- x = "00:00:00:00:00:00"
- return MACField.i2h(self, pkt, x)
- def i2m(self, pkt, x):
- return MACField.i2m(self, pkt, self.i2h(pkt, x))
-
-
-
-### Layers
-
-
-class Ether(Packet):
- name = "Ethernet"
- fields_desc = [ MACField("dst","00:00:00:01:00:00"),
- MACField("src","00:00:00:02:00:00"),
- XShortEnumField("type", 0x9000, ETHER_TYPES) ]
- def hashret(self):
- return struct.pack("H",self.type)+self.payload.hashret()
- def answers(self, other):
- if isinstance(other,Ether):
- if self.type == other.type:
- return self.payload.answers(other.payload)
- return 0
- def mysummary(self):
- return self.sprintf("%src% > %dst% (%type%)")
- @classmethod
- def dispatch_hook(cls, _pkt=None, *args, **kargs):
- if _pkt and len(_pkt) >= 14:
- if struct.unpack("!H", _pkt[12:14])[0] <= 1500:
- return Dot3
- return cls
-
-
-class Dot3(Packet):
- name = "802.3"
- fields_desc = [ DestMACField("dst"),
- MACField("src", ETHER_ANY),
- LenField("len", None, "H") ]
- def extract_padding(self,s):
- l = self.len
- return s[:l],s[l:]
- def answers(self, other):
- if isinstance(other,Dot3):
- return self.payload.answers(other.payload)
- return 0
- def mysummary(self):
- return "802.3 %s > %s" % (self.src, self.dst)
- @classmethod
- def dispatch_hook(cls, _pkt=None, *args, **kargs):
- if _pkt and len(_pkt) >= 14:
- if struct.unpack("!H", _pkt[12:14])[0] > 1500:
- return Ether
- return cls
-
-
-class LLC(Packet):
- name = "LLC"
- fields_desc = [ XByteField("dsap", 0x00),
- XByteField("ssap", 0x00),
- ByteField("ctrl", 0) ]
-
-conf.neighbor.register_l3(Ether, LLC, lambda l2,l3: conf.neighbor.resolve(l2,l3.payload))
-conf.neighbor.register_l3(Dot3, LLC, lambda l2,l3: conf.neighbor.resolve(l2,l3.payload))
-
-
-class CookedLinux(Packet):
- name = "cooked linux"
- fields_desc = [ ShortEnumField("pkttype",0, {0: "unicast",
- 4:"sent-by-us"}), #XXX incomplete
- XShortField("lladdrtype",512),
- ShortField("lladdrlen",0),
- StrFixedLenField("src","",8),
- XShortEnumField("proto",0x800,ETHER_TYPES) ]
-
-
-
-class SNAP(Packet):
- name = "SNAP"
- fields_desc = [ X3BytesField("OUI",0x000000),
- XShortEnumField("code", 0x000, ETHER_TYPES) ]
-
-conf.neighbor.register_l3(Dot3, SNAP, lambda l2,l3: conf.neighbor.resolve(l2,l3.payload))
-
-
-class Dot1Q(Packet):
- name = "802.1Q"
- aliastypes = [ Ether ]
- fields_desc = [ BitField("prio", 0, 3),
- BitField("id", 0, 1),
- BitField("vlan", 1, 12),
- XShortEnumField("type", 0x0000, ETHER_TYPES) ]
- def answers(self, other):
- if isinstance(other,Dot1Q):
- if ( (self.type == other.type) and
- (self.vlan == other.vlan) ):
- return self.payload.answers(other.payload)
- else:
- return self.payload.answers(other)
- return 0
- def default_payload_class(self, pay):
- if self.type <= 1500:
- return LLC
- return conf.raw_layer
- def extract_padding(self,s):
- if self.type <= 1500:
- return s[:self.type],s[self.type:]
- return s,None
- def mysummary(self):
- if isinstance(self.underlayer, Ether):
- return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%")
- else:
- return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%")
-
-
-conf.neighbor.register_l3(Ether, Dot1Q, lambda l2,l3: conf.neighbor.resolve(l2,l3.payload))
-
-class STP(Packet):
- name = "Spanning Tree Protocol"
- fields_desc = [ ShortField("proto", 0),
- ByteField("version", 0),
- ByteField("bpdutype", 0),
- ByteField("bpduflags", 0),
- ShortField("rootid", 0),
- MACField("rootmac", ETHER_ANY),
- IntField("pathcost", 0),
- ShortField("bridgeid", 0),
- MACField("bridgemac", ETHER_ANY),
- ShortField("portid", 0),
- BCDFloatField("age", 1),
- BCDFloatField("maxage", 20),
- BCDFloatField("hellotime", 2),
- BCDFloatField("fwddelay", 15) ]
-
-
-class EAPOL(Packet):
- name = "EAPOL"
- fields_desc = [ ByteField("version", 1),
- ByteEnumField("type", 0, ["EAP_PACKET", "START", "LOGOFF", "KEY", "ASF"]),
- LenField("len", None, "H") ]
-
- EAP_PACKET= 0
- START = 1
- LOGOFF = 2
- KEY = 3
- ASF = 4
- def extract_padding(self, s):
- l = self.len
- return s[:l],s[l:]
- def hashret(self):
- return chr(self.type)+self.payload.hashret()
- def answers(self, other):
- if isinstance(other,EAPOL):
- if ( (self.type == self.EAP_PACKET) and
- (other.type == self.EAP_PACKET) ):
- return self.payload.answers(other.payload)
- return 0
- def mysummary(self):
- return self.sprintf("EAPOL %EAPOL.type%")
-
-
-class EAP(Packet):
- name = "EAP"
- fields_desc = [ ByteEnumField("code", 4, {1:"REQUEST",2:"RESPONSE",3:"SUCCESS",4:"FAILURE"}),
- ByteField("id", 0),
- ShortField("len",None),
- ConditionalField(ByteEnumField("type",0, {1:"ID",4:"MD5"}), lambda pkt:pkt.code not in [EAP.SUCCESS, EAP.FAILURE])
-
- ]
-
- REQUEST = 1
- RESPONSE = 2
- SUCCESS = 3
- FAILURE = 4
- TYPE_ID = 1
- TYPE_MD5 = 4
- def answers(self, other):
- if isinstance(other,EAP):
- if self.code == self.REQUEST:
- return 0
- elif self.code == self.RESPONSE:
- if ( (other.code == self.REQUEST) and
- (other.type == self.type) ):
- return 1
- elif other.code == self.RESPONSE:
- return 1
- return 0
-
- def post_build(self, p, pay):
- if self.len is None:
- l = len(p)+len(pay)
- p = p[:2]+chr((l>>8)&0xff)+chr(l&0xff)+p[4:]
- return p+pay
-
-
-class ARP(Packet):
- name = "ARP"
- fields_desc = [ XShortField("hwtype", 0x0001),
- XShortEnumField("ptype", 0x0800, ETHER_TYPES),
- ByteField("hwlen", 6),
- ByteField("plen", 4),
- ShortEnumField("op", 1, {"who-has":1, "is-at":2, "RARP-req":3, "RARP-rep":4, "Dyn-RARP-req":5, "Dyn-RAR-rep":6, "Dyn-RARP-err":7, "InARP-req":8, "InARP-rep":9}),
- ARPSourceMACField("hwsrc"),
- SourceIPField("psrc","pdst"),
- MACField("hwdst", ETHER_ANY),
- IPField("pdst", "0.0.0.0") ]
- who_has = 1
- is_at = 2
- def answers(self, other):
- if isinstance(other,ARP):
- if ( (self.op == self.is_at) and
- (other.op == self.who_has) and
- (self.psrc == other.pdst) ):
- return 1
- return 0
- def route(self):
- dst = self.pdst
- if isinstance(dst,Gen):
- dst = iter(dst).next()
- return conf.route.route(dst)
- def extract_padding(self, s):
- return "",s
- def mysummary(self):
- if self.op == self.is_at:
- return self.sprintf("ARP is at %hwsrc% says %psrc%")
- elif self.op == self.who_has:
- return self.sprintf("ARP who has %pdst% says %psrc%")
- else:
- return self.sprintf("ARP %op% %psrc% > %pdst%")
-
-conf.neighbor.register_l3(Ether, ARP, lambda l2,l3: getmacbyip(l3.pdst))
-
-class GRErouting(Packet):
- name = "GRE routing informations"
- fields_desc = [ ShortField("address_family",0),
- ByteField("SRE_offset", 0),
- FieldLenField("SRE_len", None, "routing_info", "B"),
- StrLenField("routing_info", "", "SRE_len"),
- ]
-
-
-class GRE(Packet):
- name = "GRE"
- fields_desc = [ BitField("chksum_present",0,1),
- BitField("routing_present",0,1),
- BitField("key_present",0,1),
- BitField("seqnum_present",0,1),
- BitField("strict_route_source",0,1),
- BitField("recursion_control",0,3),
- BitField("flags",0,5),
- BitField("version",0,3),
- XShortEnumField("proto", 0x0000, ETHER_TYPES),
- ConditionalField(XShortField("chksum",None), lambda pkt:pkt.chksum_present==1 or pkt.routing_present==1),
- ConditionalField(XShortField("offset",None), lambda pkt:pkt.chksum_present==1 or pkt.routing_present==1),
- ConditionalField(XIntField("key",None), lambda pkt:pkt.key_present==1),
- ConditionalField(XIntField("seqence_number",None), lambda pkt:pkt.seqnum_present==1),
- ]
- def post_build(self, p, pay):
- p += pay
- if self.chksum_present and self.chksum is None:
- c = checksum(p)
- p = p[:4]+chr((c>>8)&0xff)+chr(c&0xff)+p[6:]
- return p
-
-
-
-
-bind_layers( Dot3, LLC, )
-bind_layers( Ether, LLC, type=122)
-bind_layers( Ether, Dot1Q, type=33024)
-bind_layers( Ether, Ether, type=1)
-bind_layers( Ether, ARP, type=2054)
-bind_layers( Ether, EAPOL, type=34958)
-bind_layers( Ether, EAPOL, dst='01:80:c2:00:00:03', type=34958)
-bind_layers( CookedLinux, LLC, proto=122)
-bind_layers( CookedLinux, Dot1Q, proto=33024)
-bind_layers( CookedLinux, Ether, proto=1)
-bind_layers( CookedLinux, ARP, proto=2054)
-bind_layers( CookedLinux, EAPOL, proto=34958)
-bind_layers( GRE, LLC, proto=122)
-bind_layers( GRE, Dot1Q, proto=33024)
-bind_layers( GRE, Ether, proto=1)
-bind_layers( GRE, ARP, proto=2054)
-bind_layers( GRE, EAPOL, proto=34958)
-bind_layers( GRE, GRErouting, { "routing_present" : 1 } )
-bind_layers( GRErouting, conf.raw_layer,{ "address_family" : 0, "SRE_len" : 0 })
-bind_layers( GRErouting, GRErouting, { } )
-bind_layers( EAPOL, EAP, type=0)
-bind_layers( LLC, STP, dsap=66, ssap=66, ctrl=3)
-bind_layers( LLC, SNAP, dsap=170, ssap=170, ctrl=3)
-bind_layers( SNAP, Dot1Q, code=33024)
-bind_layers( SNAP, Ether, code=1)
-bind_layers( SNAP, ARP, code=2054)
-bind_layers( SNAP, EAPOL, code=34958)
-bind_layers( SNAP, STP, code=267)
-
-conf.l2types.register(ARPHDR_ETHER, Ether)
-conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether)
-conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether)
-conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3)
-conf.l2types.register(144, CookedLinux) # called LINUX_IRDA, similar to CookedLinux
-conf.l2types.register(113, CookedLinux)
-
-conf.l3types.register(ETH_P_ARP, ARP)
-
-
-
-
-### Technics
-
-
-
-@conf.commands.register
-def arpcachepoison(target, victim, interval=60):
- """Poison target's cache with (your MAC,victim's IP) couple
-arpcachepoison(target, victim, [interval=60]) -> None
-"""
- tmac = getmacbyip(target)
- p = Ether(dst=tmac)/ARP(op="who-has", psrc=victim, pdst=target)
- try:
- while 1:
- sendp(p, iface_hint=target)
- if conf.verb > 1:
- os.write(1,".")
- time.sleep(interval)
- except KeyboardInterrupt:
- pass
-
-
-class ARPingResult(SndRcvList):
- def __init__(self, res=None, name="ARPing", stats=None):
- SndRcvList.__init__(self, res, name, stats)
-
- def show(self):
- for s,r in self.res:
- print r.sprintf("%19s,Ether.src% %ARP.psrc%")
-
-
-
-@conf.commands.register
-def arping(net, timeout=2, cache=0, verbose=None, **kargs):
- """Send ARP who-has requests to determine which hosts are up
-arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None
-Set cache=True if you want arping to modify internal ARP-Cache"""
- if verbose is None:
- verbose = conf.verb
- ans,unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=net), verbose=verbose,
- filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs)
- ans = ARPingResult(ans.res)
-
- if cache and ans is not None:
- for pair in ans:
- conf.netcache.arp_cache[pair[1].psrc] = (pair[1].hwsrc, time.time())
- if verbose:
- ans.show()
- return ans,unans
-
-@conf.commands.register
-def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00",**kargs):
- """Try to guess if target is in Promisc mode. The target is provided by its ip."""
-
- responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip),type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0,**kargs)
-
- return responses is not None
-
-@conf.commands.register
-def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs):
- """Send ARP who-has requests to determine which hosts are in promiscuous mode
- promiscping(net, iface=conf.iface)"""
- ans,unans = srp(Ether(dst=fake_bcast)/ARP(pdst=net),
- filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs)
- ans = ARPingResult(ans.res, name="PROMISCPing")
-
- ans.display()
- return ans,unans
-
-
-class ARP_am(AnsweringMachine):
- function_name="farpd"
- filter = "arp"
- send_function = staticmethod(sendp)
-
- def parse_options(self, IP_addr=None, iface=None, ARP_addr=None):
- self.IP_addr=IP_addr
- self.iface=iface
- self.ARP_addr=ARP_addr
-
- def is_request(self, req):
- return (req.haslayer(ARP) and
- req.getlayer(ARP).op == 1 and
- (self.IP_addr == None or self.IP_addr == req.getlayer(ARP).pdst))
-
- def make_reply(self, req):
- ether = req.getlayer(Ether)
- arp = req.getlayer(ARP)
- iff,a,gw = conf.route.route(arp.psrc)
- if self.iface != None:
- iff = iface
- ARP_addr = self.ARP_addr
- IP_addr = arp.pdst
- resp = Ether(dst=ether.src,
- src=ARP_addr)/ARP(op="is-at",
- hwsrc=ARP_addr,
- psrc=IP_addr,
- hwdst=arp.hwsrc,
- pdst=arp.pdst)
- return resp
-
- def sniff(self):
- sniff(iface=self.iface, **self.optsniff)
-
-@conf.commands.register
-def etherleak(target, **kargs):
- """Exploit Etherleak flaw"""
- return srpflood(Ether()/ARP(pdst=target),
- prn=lambda (s,r): conf.padding_layer in r and hexstr(r[conf.padding_layer].load),
- filter="arp", **kargs)
-
-