summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py')
-rw-r--r--scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py501
1 files changed, 0 insertions, 501 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
deleted file mode 100644
index 3bd1ade3..00000000
--- a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py
+++ /dev/null
@@ -1,501 +0,0 @@
-## This file is part of Scapy
-## See http://www.secdev.org/projects/scapy for more informations
-## Copyright (C) Philippe Biondi <phil@secdev.org>
-## This program is published under a GPLv2 license
-
-"""
-Customizations needed to support Microsoft Windows.
-"""
-
-import os,re,sys,socket,time, itertools
-import subprocess as sp
-from glob import glob
-from scapy.config import conf,ConfClass
-from scapy.error import Scapy_Exception,log_loading,log_runtime
-from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader
-from scapy.base_classes import Gen, Net, SetGen
-import scapy.plist as plist
-from scapy.sendrecv import debug, srp1
-from scapy.layers.l2 import Ether, ARP
-from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP
-
-conf.use_winpcapy = True
-from scapy.arch import pcapdnet
-from scapy.arch.pcapdnet import *
-
-LOOPBACK_NAME="lo0"
-WINDOWS = True
-
-
-def _where(filename, dirs=[], env="PATH"):
- """Find file in current dir or system path"""
- if not isinstance(dirs, list):
- dirs = [dirs]
- if glob(filename):
- return filename
- paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs
- for path in paths:
- for match in glob(os.path.join(path, filename)):
- if match:
- return os.path.normpath(match)
- raise IOError("File not found: %s" % filename)
-
-def win_find_exe(filename, installsubdir=None, env="ProgramFiles"):
- """Find executable in current dir, system path or given ProgramFiles subdir"""
- for fn in [filename, filename+".exe"]:
- try:
- if installsubdir is None:
- path = _where(fn)
- else:
- path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)])
- except IOError:
- path = filename
- else:
- break
- return path
-
-
-class WinProgPath(ConfClass):
- _default = "<System default>"
- # We try some magic to find the appropriate executables
- pdfreader = win_find_exe("AcroRd32")
- psreader = win_find_exe("gsview32.exe", "Ghostgum/gsview")
- dot = win_find_exe("dot", "ATT/Graphviz/bin")
- tcpdump = win_find_exe("windump")
- tcpreplay = win_find_exe("tcpreplay")
- display = _default
- hexedit = win_find_exe("hexer")
- wireshark = win_find_exe("wireshark", "wireshark")
-
-conf.prog = WinProgPath()
-
-class PcapNameNotFoundError(Scapy_Exception):
- pass
-
-def get_windows_if_list():
- ps = sp.Popen(['powershell', 'Get-NetAdapter', '|', 'select Name, InterfaceIndex, InterfaceDescription, InterfaceGuid, MacAddress', '|', 'fl'], stdout = sp.PIPE)
- stdout, stdin = ps.communicate(timeout = 10)
- current_interface = None
- interface_list = []
- for i in stdout.split(b'\r\n'):
- if not i.strip():
- continue
- if i.find(b':')<0:
- continue
- name, value = [ j.strip() for j in i.split(b':') ]
- if name == b'Name':
- if current_interface:
- interface_list.append(current_interface)
- current_interface = {}
- current_interface['name'] = value.decode('ascii')
- elif name == b'InterfaceIndex':
- current_interface['win_index'] = int(value)
- elif name == b'InterfaceDescription':
- current_interface['description'] = value.decode('ascii')
- elif name == b'InterfaceGuid':
- current_interface['guid'] = value.decode('ascii')
- elif name == b'MacAddress':
- current_interface['mac'] = ':'.join([ j.decode('ascii') for j in value.split(b'-')])
- if current_interface:
- interface_list.append(current_interface)
- return interface_list
-
-class NetworkInterface(object):
- """A network interface of your local host"""
-
- def __init__(self, data=None):
- self.name = None
- self.ip = None
- self.mac = None
- self.pcap_name = None
- self.description = None
- self.data = data
- if data is not None:
- self.update(data)
-
- def update(self, data):
- """Update info about network interface according to given dnet dictionary"""
- self.name = data["name"]
- self.description = data['description']
- self.win_index = data['win_index']
- # Other attributes are optional
- self._update_pcapdata()
- try:
- self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid']))
- except (KeyError, AttributeError, NameError):
- pass
- try:
- self.mac = data['mac']
- except KeyError:
- pass
-
- def _update_pcapdata(self):
- for i in winpcapy_get_if_list():
- if i.endswith(self.data['guid']):
- self.pcap_name = i
- return
-
- raise PcapNameNotFoundError
-
- def __repr__(self):
- return "<%s: %s %s %s pcap_name=%s description=%s>" % (self.__class__.__name__,
- self.name, self.ip, self.mac, self.pcap_name, self.description)
-
-from collections import UserDict
-
-class NetworkInterfaceDict(UserDict):
- """Store information about network interfaces and convert between names"""
- def load_from_powershell(self):
- for i in get_windows_if_list():
- try:
- interface = NetworkInterface(i)
- self.data[interface.name] = interface
- except (KeyError, PcapNameNotFoundError):
- pass
- if len(self.data) == 0:
- log_loading.warning("No match between your pcap and windows network interfaces found. "
- "You probably won't be able to send packets. "
- "Deactivating unneeded interfaces and restarting Scapy might help."
- "Check your winpcap and powershell installation, and access rights.")
-
- def pcap_name(self, devname):
- """Return pcap device name for given Windows device name."""
-
- try:
- pcap_name = self.data[devname].pcap_name
- except KeyError:
- raise ValueError("Unknown network interface %r" % devname)
- else:
- return pcap_name
-
- def devname(self, pcap_name):
- """Return Windows device name for given pcap device name."""
-
- for devname, iface in self.items():
- if iface.pcap_name == pcap_name:
- return iface.name
- raise ValueError("Unknown pypcap network interface %r" % pcap_name)
-
- def devname_from_index(self, if_index):
- """Return interface name from interface index"""
- for devname, iface in self.items():
- if iface.win_index == if_index:
- return iface.name
- raise ValueError("Unknown network interface index %r" % if_index)
-
- def show(self, resolve_mac=True):
- """Print list of available network interfaces in human readable form"""
-
- print("%s %s %s %s" % ("INDEX".ljust(5), "IFACE".ljust(35), "IP".ljust(15), "MAC"))
- for iface_name in sorted(self.data.keys()):
- dev = self.data[iface_name]
- mac = dev.mac
- if resolve_mac:
- mac = conf.manufdb._resolve_MAC(mac)
- print("%s %s %s %s" % (str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac) )
-
-ifaces = NetworkInterfaceDict()
-ifaces.load_from_powershell()
-
-def pcap_name(devname):
- """Return pypcap device name for given libdnet/Scapy device name"""
- try:
- pcap_name = ifaces.pcap_name(devname)
- except ValueError:
- # pcap.pcap() will choose a sensible default for sniffing if iface=None
- pcap_name = None
- return pcap_name
-
-def devname(pcap_name):
- """Return libdnet/Scapy device name for given pypcap device name"""
- return ifaces.devname(pcap_name)
-
-def devname_from_index(if_index):
- """Return Windows adapter name for given Windows interface index"""
- return ifaces.devname_from_index(if_index)
-
-def show_interfaces(resolve_mac=True):
- """Print list of available network interfaces"""
- return ifaces.show(resolve_mac)
-
-_orig_open_pcap = pcapdnet.open_pcap
-pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcap_name(iface),*args,**kargs)
-
-_orig_get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr
-pcapdnet.get_if_raw_hwaddr = lambda iface,*args,**kargs: [ int(i, 16) for i in ifaces[iface].mac.split(':') ]
-get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr
-
-def read_routes():
- routes = []
- if_index = '(\d+)'
- dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)'
- next_hop = '(\d+\.\d+\.\d+\.\d+)'
- metric_pattern = "(\d+)"
- delim = "\s+" # The columns are separated by whitespace
- netstat_line = delim.join([if_index, dest, next_hop, metric_pattern])
- pattern = re.compile(netstat_line)
- ps = sp.Popen(['powershell', 'Get-NetRoute', '-AddressFamily IPV4', '|', 'select ifIndex, DestinationPrefix, NextHop, RouteMetric'], stdout = sp.PIPE)
- stdout, stdin = ps.communicate(timeout = 10)
- for l in stdout.split(b'\r\n'):
- match = re.search(pattern,l.decode('utf-8'))
- if match:
- try:
- iface = devname_from_index(int(match.group(1)))
- addr = ifaces[iface].ip
- except:
- continue
- dest = atol(match.group(2))
- mask = itom(int(match.group(3)))
- gw = match.group(4)
- # try:
- # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest))
- # except OSError:
- # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s" % dest)
- # continue
- routes.append((dest, mask, gw, iface, addr))
- return routes
-
-def read_routes6():
- return []
-
-try:
- __IPYTHON__
-except NameError:
- try:
- import readline
- console = readline.GetOutputFile()
- except (ImportError, AttributeError):
- log_loading.info("Could not get readline console. Will not interpret ANSI color codes.")
- else:
- conf.readfunc = readline.rl.readline
- orig_stdout = sys.stdout
- sys.stdout = console
-
-def sndrcv(pks, pkt, timeout = 2, inter = 0, verbose=None, chainCC=0, retry=0, multi=0):
- if not isinstance(pkt, Gen):
- pkt = SetGen(pkt)
-
- if verbose is None:
- verbose = conf.verb
- debug.recv = plist.PacketList([],"Unanswered")
- debug.sent = plist.PacketList([],"Sent")
- debug.match = plist.SndRcvList([])
- nbrecv=0
- ans = []
- # do it here to fix random fields, so that parent and child have the same
- all_stimuli = tobesent = [p for p in pkt]
- notans = len(tobesent)
-
- hsent={}
- for i in tobesent:
- h = i.hashret()
- if h in hsent:
- hsent[h].append(i)
- else:
- hsent[h] = [i]
- if retry < 0:
- retry = -retry
- autostop=retry
- else:
- autostop=0
-
-
- while retry >= 0:
- found=0
-
- if timeout < 0:
- timeout = None
-
- pid=1
- try:
- if WINDOWS or pid == 0:
- try:
- try:
- i = 0
- if verbose:
- print("Begin emission:")
- for p in tobesent:
- pks.send(p)
- i += 1
- time.sleep(inter)
- if verbose:
- print("Finished to send %i packets." % i)
- except SystemExit:
- pass
- except KeyboardInterrupt:
- pass
- except:
- log_runtime.exception("--- Error sending packets")
- log_runtime.info("--- Error sending packets")
- finally:
- try:
- sent_times = [p.sent_time for p in all_stimuli if p.sent_time]
- except:
- pass
- if WINDOWS or pid > 0:
- # Timeout starts after last packet is sent (as in Unix version)
- if timeout:
- stoptime = time.time()+timeout
- else:
- stoptime = 0
- remaintime = None
- # inmask = [pks.ins.fd]
- try:
- try:
- while 1:
- if stoptime:
- remaintime = stoptime-time.time()
- if remaintime <= 0:
- break
- r = pks.recv(MTU)
- if r is None:
- continue
- ok = 0
- h = r.hashret()
- if h in hsent:
- hlst = hsent[h]
- for i in range(len(hlst)):
- if r.answers(hlst[i]):
- ans.append((hlst[i],r))
- if verbose > 1:
- os.write(1, b"*")
- ok = 1
- if not multi:
- del(hlst[i])
- notans -= 1;
- else:
- if not hasattr(hlst[i], '_answered'):
- notans -= 1;
- hlst[i]._answered = 1;
- break
- if notans == 0 and not multi:
- break
- if not ok:
- if verbose > 1:
- os.write(1, b".")
- nbrecv += 1
- if conf.debug_match:
- debug.recv.append(r)
- except KeyboardInterrupt:
- if chainCC:
- raise
- finally:
- if WINDOWS:
- for p,t in zip(all_stimuli, sent_times):
- p.sent_time = t
- finally:
- pass
-
- # remain = reduce(list.__add__, hsent.values(), [])
- remain = list(itertools.chain(*[ i for i in hsent.values() ]))
-
- if multi:
- #remain = filter(lambda p: not hasattr(p, '_answered'), remain);
- remain = [ p for p in remain if not hasattr(p, '_answered')]
-
- if autostop and len(remain) > 0 and len(remain) != len(tobesent):
- retry = autostop
-
- tobesent = remain
- if len(tobesent) == 0:
- break
- retry -= 1
-
- if conf.debug_match:
- debug.sent=plist.PacketList(remain[:],"Sent")
- debug.match=plist.SndRcvList(ans[:])
-
- #clean the ans list to delete the field _answered
- if (multi):
- for s,r in ans:
- if hasattr(s, '_answered'):
- del(s._answered)
-
- if verbose:
- print("\nReceived %i packets, got %i answers, remaining %i packets" % (nbrecv+len(ans), len(ans), notans))
- return plist.SndRcvList(ans),plist.PacketList(remain,"Unanswered")
-
-
-import scapy.sendrecv
-scapy.sendrecv.sndrcv = sndrcv
-
-def sniff(count=0, store=1, offline=None, prn = None, lfilter=None, L2socket=None, timeout=None, *arg, **karg):
- """Sniff packets
-sniff([count=0,] [prn=None,] [store=1,] [offline=None,] [lfilter=None,] + L2ListenSocket args) -> list of packets
-Select interface to sniff by setting conf.iface. Use show_interfaces() to see interface names.
- count: number of packets to capture. 0 means infinity
- store: wether to store sniffed packets or discard them
- prn: function to apply to each packet. If something is returned,
- it is displayed. Ex:
- ex: prn = lambda x: x.summary()
-lfilter: python function applied to each packet to determine
- if further action may be done
- ex: lfilter = lambda x: x.haslayer(Padding)
-offline: pcap file to read packets from, instead of sniffing them
-timeout: stop sniffing after a given time (default: None)
-L2socket: use the provided L2socket
- """
- c = 0
-
- if offline is None:
- log_runtime.info('Sniffing on %s' % conf.iface)
- if L2socket is None:
- L2socket = conf.L2listen
- s = L2socket(type=ETH_P_ALL, *arg, **karg)
- else:
- s = PcapReader(offline)
-
- lst = []
- if timeout is not None:
- stoptime = time.time()+timeout
- remain = None
- while 1:
- try:
- if timeout is not None:
- remain = stoptime-time.time()
- if remain <= 0:
- break
-
- try:
- p = s.recv(MTU)
- except PcapTimeoutElapsed:
- continue
- if p is None:
- break
- if lfilter and not lfilter(p):
- continue
- if store:
- lst.append(p)
- c += 1
- if prn:
- r = prn(p)
- if r is not None:
- print(r)
- if count > 0 and c >= count:
- break
- except KeyboardInterrupt:
- break
- s.close()
- return plist.PacketList(lst,"Sniffed")
-
-import scapy.sendrecv
-scapy.sendrecv.sniff = sniff
-
-# def get_if_list():
-# print('windows if_list')
-# return sorted(ifaces.keys())
-
-def get_working_if():
- try:
- if 'Ethernet' in ifaces and ifaces['Ethernet'].ip != '0.0.0.0':
- return 'Ethernet'
- elif 'Wi-Fi' in ifaces and ifaces['Wi-Fi'].ip != '0.0.0.0':
- return 'Wi-Fi'
- elif len(ifaces) > 0:
- return ifaces[list(ifaces.keys())[0]]
- else:
- return LOOPBACK_NAME
- except:
- return LOOPBACK_NAME
-
-conf.iface = get_working_if()