summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py')
-rw-r--r--scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py501
1 files changed, 501 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
new file mode 100644
index 00000000..3bd1ade3
--- /dev/null
+++ b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
@@ -0,0 +1,501 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Customizations needed to support Microsoft Windows.
+"""
+
+import os,re,sys,socket,time, itertools
+import subprocess as sp
+from glob import glob
+from scapy.config import conf,ConfClass
+from scapy.error import Scapy_Exception,log_loading,log_runtime
+from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader
+from scapy.base_classes import Gen, Net, SetGen
+import scapy.plist as plist
+from scapy.sendrecv import debug, srp1
+from scapy.layers.l2 import Ether, ARP
+from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP
+
+conf.use_winpcapy = True
+from scapy.arch import pcapdnet
+from scapy.arch.pcapdnet import *
+
+LOOPBACK_NAME="lo0"
+WINDOWS = True
+
+
+def _where(filename, dirs=[], env="PATH"):
+ """Find file in current dir or system path"""
+ if not isinstance(dirs, list):
+ dirs = [dirs]
+ if glob(filename):
+ return filename
+ paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs
+ for path in paths:
+ for match in glob(os.path.join(path, filename)):
+ if match:
+ return os.path.normpath(match)
+ raise IOError("File not found: %s" % filename)
+
+def win_find_exe(filename, installsubdir=None, env="ProgramFiles"):
+ """Find executable in current dir, system path or given ProgramFiles subdir"""
+ for fn in [filename, filename+".exe"]:
+ try:
+ if installsubdir is None:
+ path = _where(fn)
+ else:
+ path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)])
+ except IOError:
+ path = filename
+ else:
+ break
+ return path
+
+
+class WinProgPath(ConfClass):
+ _default = "<System default>"
+ # We try some magic to find the appropriate executables
+ pdfreader = win_find_exe("AcroRd32")
+ psreader = win_find_exe("gsview32.exe", "Ghostgum/gsview")
+ dot = win_find_exe("dot", "ATT/Graphviz/bin")
+ tcpdump = win_find_exe("windump")
+ tcpreplay = win_find_exe("tcpreplay")
+ display = _default
+ hexedit = win_find_exe("hexer")
+ wireshark = win_find_exe("wireshark", "wireshark")
+
+conf.prog = WinProgPath()
+
+class PcapNameNotFoundError(Scapy_Exception):
+ pass
+
+def get_windows_if_list():
+ ps = sp.Popen(['powershell', 'Get-NetAdapter', '|', 'select Name, InterfaceIndex, InterfaceDescription, InterfaceGuid, MacAddress', '|', 'fl'], stdout = sp.PIPE)
+ stdout, stdin = ps.communicate(timeout = 10)
+ current_interface = None
+ interface_list = []
+ for i in stdout.split(b'\r\n'):
+ if not i.strip():
+ continue
+ if i.find(b':')<0:
+ continue
+ name, value = [ j.strip() for j in i.split(b':') ]
+ if name == b'Name':
+ if current_interface:
+ interface_list.append(current_interface)
+ current_interface = {}
+ current_interface['name'] = value.decode('ascii')
+ elif name == b'InterfaceIndex':
+ current_interface['win_index'] = int(value)
+ elif name == b'InterfaceDescription':
+ current_interface['description'] = value.decode('ascii')
+ elif name == b'InterfaceGuid':
+ current_interface['guid'] = value.decode('ascii')
+ elif name == b'MacAddress':
+ current_interface['mac'] = ':'.join([ j.decode('ascii') for j in value.split(b'-')])
+ if current_interface:
+ interface_list.append(current_interface)
+ return interface_list
+
+class NetworkInterface(object):
+ """A network interface of your local host"""
+
+ def __init__(self, data=None):
+ self.name = None
+ self.ip = None
+ self.mac = None
+ self.pcap_name = None
+ self.description = None
+ self.data = data
+ if data is not None:
+ self.update(data)
+
+ def update(self, data):
+ """Update info about network interface according to given dnet dictionary"""
+ self.name = data["name"]
+ self.description = data['description']
+ self.win_index = data['win_index']
+ # Other attributes are optional
+ self._update_pcapdata()
+ try:
+ self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid']))
+ except (KeyError, AttributeError, NameError):
+ pass
+ try:
+ self.mac = data['mac']
+ except KeyError:
+ pass
+
+ def _update_pcapdata(self):
+ for i in winpcapy_get_if_list():
+ if i.endswith(self.data['guid']):
+ self.pcap_name = i
+ return
+
+ raise PcapNameNotFoundError
+
+ def __repr__(self):
+ return "<%s: %s %s %s pcap_name=%s description=%s>" % (self.__class__.__name__,
+ self.name, self.ip, self.mac, self.pcap_name, self.description)
+
+from collections import UserDict
+
+class NetworkInterfaceDict(UserDict):
+ """Store information about network interfaces and convert between names"""
+ def load_from_powershell(self):
+ for i in get_windows_if_list():
+ try:
+ interface = NetworkInterface(i)
+ self.data[interface.name] = interface
+ except (KeyError, PcapNameNotFoundError):
+ pass
+ if len(self.data) == 0:
+ log_loading.warning("No match between your pcap and windows network interfaces found. "
+ "You probably won't be able to send packets. "
+ "Deactivating unneeded interfaces and restarting Scapy might help."
+ "Check your winpcap and powershell installation, and access rights.")
+
+ def pcap_name(self, devname):
+ """Return pcap device name for given Windows device name."""
+
+ try:
+ pcap_name = self.data[devname].pcap_name
+ except KeyError:
+ raise ValueError("Unknown network interface %r" % devname)
+ else:
+ return pcap_name
+
+ def devname(self, pcap_name):
+ """Return Windows device name for given pcap device name."""
+
+ for devname, iface in self.items():
+ if iface.pcap_name == pcap_name:
+ return iface.name
+ raise ValueError("Unknown pypcap network interface %r" % pcap_name)
+
+ def devname_from_index(self, if_index):
+ """Return interface name from interface index"""
+ for devname, iface in self.items():
+ if iface.win_index == if_index:
+ return iface.name
+ raise ValueError("Unknown network interface index %r" % if_index)
+
+ def show(self, resolve_mac=True):
+ """Print list of available network interfaces in human readable form"""
+
+ print("%s %s %s %s" % ("INDEX".ljust(5), "IFACE".ljust(35), "IP".ljust(15), "MAC"))
+ for iface_name in sorted(self.data.keys()):
+ dev = self.data[iface_name]
+ mac = dev.mac
+ if resolve_mac:
+ mac = conf.manufdb._resolve_MAC(mac)
+ print("%s %s %s %s" % (str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac) )
+
+ifaces = NetworkInterfaceDict()
+ifaces.load_from_powershell()
+
+def pcap_name(devname):
+ """Return pypcap device name for given libdnet/Scapy device name"""
+ try:
+ pcap_name = ifaces.pcap_name(devname)
+ except ValueError:
+ # pcap.pcap() will choose a sensible default for sniffing if iface=None
+ pcap_name = None
+ return pcap_name
+
+def devname(pcap_name):
+ """Return libdnet/Scapy device name for given pypcap device name"""
+ return ifaces.devname(pcap_name)
+
+def devname_from_index(if_index):
+ """Return Windows adapter name for given Windows interface index"""
+ return ifaces.devname_from_index(if_index)
+
+def show_interfaces(resolve_mac=True):
+ """Print list of available network interfaces"""
+ return ifaces.show(resolve_mac)
+
+_orig_open_pcap = pcapdnet.open_pcap
+pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcap_name(iface),*args,**kargs)
+
+_orig_get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr
+pcapdnet.get_if_raw_hwaddr = lambda iface,*args,**kargs: [ int(i, 16) for i in ifaces[iface].mac.split(':') ]
+get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr
+
+def read_routes():
+ routes = []
+ if_index = '(\d+)'
+ dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)'
+ next_hop = '(\d+\.\d+\.\d+\.\d+)'
+ metric_pattern = "(\d+)"
+ delim = "\s+" # The columns are separated by whitespace
+ netstat_line = delim.join([if_index, dest, next_hop, metric_pattern])
+ pattern = re.compile(netstat_line)
+ ps = sp.Popen(['powershell', 'Get-NetRoute', '-AddressFamily IPV4', '|', 'select ifIndex, DestinationPrefix, NextHop, RouteMetric'], stdout = sp.PIPE)
+ stdout, stdin = ps.communicate(timeout = 10)
+ for l in stdout.split(b'\r\n'):
+ match = re.search(pattern,l.decode('utf-8'))
+ if match:
+ try:
+ iface = devname_from_index(int(match.group(1)))
+ addr = ifaces[iface].ip
+ except:
+ continue
+ dest = atol(match.group(2))
+ mask = itom(int(match.group(3)))
+ gw = match.group(4)
+ # try:
+ # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest))
+ # except OSError:
+ # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s" % dest)
+ # continue
+ routes.append((dest, mask, gw, iface, addr))
+ return routes
+
+def read_routes6():
+ return []
+
+try:
+ __IPYTHON__
+except NameError:
+ try:
+ import readline
+ console = readline.GetOutputFile()
+ except (ImportError, AttributeError):
+ log_loading.info("Could not get readline console. Will not interpret ANSI color codes.")
+ else:
+ conf.readfunc = readline.rl.readline
+ orig_stdout = sys.stdout
+ sys.stdout = console
+
+def sndrcv(pks, pkt, timeout = 2, inter = 0, verbose=None, chainCC=0, retry=0, multi=0):
+ if not isinstance(pkt, Gen):
+ pkt = SetGen(pkt)
+
+ if verbose is None:
+ verbose = conf.verb
+ debug.recv = plist.PacketList([],"Unanswered")
+ debug.sent = plist.PacketList([],"Sent")
+ debug.match = plist.SndRcvList([])
+ nbrecv=0
+ ans = []
+ # do it here to fix random fields, so that parent and child have the same
+ all_stimuli = tobesent = [p for p in pkt]
+ notans = len(tobesent)
+
+ hsent={}
+ for i in tobesent:
+ h = i.hashret()
+ if h in hsent:
+ hsent[h].append(i)
+ else:
+ hsent[h] = [i]
+ if retry < 0:
+ retry = -retry
+ autostop=retry
+ else:
+ autostop=0
+
+
+ while retry >= 0:
+ found=0
+
+ if timeout < 0:
+ timeout = None
+
+ pid=1
+ try:
+ if WINDOWS or pid == 0:
+ try:
+ try:
+ i = 0
+ if verbose:
+ print("Begin emission:")
+ for p in tobesent:
+ pks.send(p)
+ i += 1
+ time.sleep(inter)
+ if verbose:
+ print("Finished to send %i packets." % i)
+ except SystemExit:
+ pass
+ except KeyboardInterrupt:
+ pass
+ except:
+ log_runtime.exception("--- Error sending packets")
+ log_runtime.info("--- Error sending packets")
+ finally:
+ try:
+ sent_times = [p.sent_time for p in all_stimuli if p.sent_time]
+ except:
+ pass
+ if WINDOWS or pid > 0:
+ # Timeout starts after last packet is sent (as in Unix version)
+ if timeout:
+ stoptime = time.time()+timeout
+ else:
+ stoptime = 0
+ remaintime = None
+ # inmask = [pks.ins.fd]
+ try:
+ try:
+ while 1:
+ if stoptime:
+ remaintime = stoptime-time.time()
+ if remaintime <= 0:
+ break
+ r = pks.recv(MTU)
+ if r is None:
+ continue
+ ok = 0
+ h = r.hashret()
+ if h in hsent:
+ hlst = hsent[h]
+ for i in range(len(hlst)):
+ if r.answers(hlst[i]):
+ ans.append((hlst[i],r))
+ if verbose > 1:
+ os.write(1, b"*")
+ ok = 1
+ if not multi:
+ del(hlst[i])
+ notans -= 1;
+ else:
+ if not hasattr(hlst[i], '_answered'):
+ notans -= 1;
+ hlst[i]._answered = 1;
+ break
+ if notans == 0 and not multi:
+ break
+ if not ok:
+ if verbose > 1:
+ os.write(1, b".")
+ nbrecv += 1
+ if conf.debug_match:
+ debug.recv.append(r)
+ except KeyboardInterrupt:
+ if chainCC:
+ raise
+ finally:
+ if WINDOWS:
+ for p,t in zip(all_stimuli, sent_times):
+ p.sent_time = t
+ finally:
+ pass
+
+ # remain = reduce(list.__add__, hsent.values(), [])
+ remain = list(itertools.chain(*[ i for i in hsent.values() ]))
+
+ if multi:
+ #remain = filter(lambda p: not hasattr(p, '_answered'), remain);
+ remain = [ p for p in remain if not hasattr(p, '_answered')]
+
+ if autostop and len(remain) > 0 and len(remain) != len(tobesent):
+ retry = autostop
+
+ tobesent = remain
+ if len(tobesent) == 0:
+ break
+ retry -= 1
+
+ if conf.debug_match:
+ debug.sent=plist.PacketList(remain[:],"Sent")
+ debug.match=plist.SndRcvList(ans[:])
+
+ #clean the ans list to delete the field _answered
+ if (multi):
+ for s,r in ans:
+ if hasattr(s, '_answered'):
+ del(s._answered)
+
+ if verbose:
+ print("\nReceived %i packets, got %i answers, remaining %i packets" % (nbrecv+len(ans), len(ans), notans))
+ return plist.SndRcvList(ans),plist.PacketList(remain,"Unanswered")
+
+
+import scapy.sendrecv
+scapy.sendrecv.sndrcv = sndrcv
+
+def sniff(count=0, store=1, offline=None, prn = None, lfilter=None, L2socket=None, timeout=None, *arg, **karg):
+ """Sniff packets
+sniff([count=0,] [prn=None,] [store=1,] [offline=None,] [lfilter=None,] + L2ListenSocket args) -> list of packets
+Select interface to sniff by setting conf.iface. Use show_interfaces() to see interface names.
+ count: number of packets to capture. 0 means infinity
+ store: wether to store sniffed packets or discard them
+ prn: function to apply to each packet. If something is returned,
+ it is displayed. Ex:
+ ex: prn = lambda x: x.summary()
+lfilter: python function applied to each packet to determine
+ if further action may be done
+ ex: lfilter = lambda x: x.haslayer(Padding)
+offline: pcap file to read packets from, instead of sniffing them
+timeout: stop sniffing after a given time (default: None)
+L2socket: use the provided L2socket
+ """
+ c = 0
+
+ if offline is None:
+ log_runtime.info('Sniffing on %s' % conf.iface)
+ if L2socket is None:
+ L2socket = conf.L2listen
+ s = L2socket(type=ETH_P_ALL, *arg, **karg)
+ else:
+ s = PcapReader(offline)
+
+ lst = []
+ if timeout is not None:
+ stoptime = time.time()+timeout
+ remain = None
+ while 1:
+ try:
+ if timeout is not None:
+ remain = stoptime-time.time()
+ if remain <= 0:
+ break
+
+ try:
+ p = s.recv(MTU)
+ except PcapTimeoutElapsed:
+ continue
+ if p is None:
+ break
+ if lfilter and not lfilter(p):
+ continue
+ if store:
+ lst.append(p)
+ c += 1
+ if prn:
+ r = prn(p)
+ if r is not None:
+ print(r)
+ if count > 0 and c >= count:
+ break
+ except KeyboardInterrupt:
+ break
+ s.close()
+ return plist.PacketList(lst,"Sniffed")
+
+import scapy.sendrecv
+scapy.sendrecv.sniff = sniff
+
+# def get_if_list():
+# print('windows if_list')
+# return sorted(ifaces.keys())
+
+def get_working_if():
+ try:
+ if 'Ethernet' in ifaces and ifaces['Ethernet'].ip != '0.0.0.0':
+ return 'Ethernet'
+ elif 'Wi-Fi' in ifaces and ifaces['Wi-Fi'].ip != '0.0.0.0':
+ return 'Wi-Fi'
+ elif len(ifaces) > 0:
+ return ifaces[list(ifaces.keys())[0]]
+ else:
+ return LOOPBACK_NAME
+ except:
+ return LOOPBACK_NAME
+
+conf.iface = get_working_if()