diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py | 501 |
1 files changed, 0 insertions, 501 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py deleted file mode 100644 index 3bd1ade3..00000000 --- a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py +++ /dev/null @@ -1,501 +0,0 @@ -## This file is part of Scapy -## See http://www.secdev.org/projects/scapy for more informations -## Copyright (C) Philippe Biondi <phil@secdev.org> -## This program is published under a GPLv2 license - -""" -Customizations needed to support Microsoft Windows. -""" - -import os,re,sys,socket,time, itertools -import subprocess as sp -from glob import glob -from scapy.config import conf,ConfClass -from scapy.error import Scapy_Exception,log_loading,log_runtime -from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader -from scapy.base_classes import Gen, Net, SetGen -import scapy.plist as plist -from scapy.sendrecv import debug, srp1 -from scapy.layers.l2 import Ether, ARP -from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP - -conf.use_winpcapy = True -from scapy.arch import pcapdnet -from scapy.arch.pcapdnet import * - -LOOPBACK_NAME="lo0" -WINDOWS = True - - -def _where(filename, dirs=[], env="PATH"): - """Find file in current dir or system path""" - if not isinstance(dirs, list): - dirs = [dirs] - if glob(filename): - return filename - paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs - for path in paths: - for match in glob(os.path.join(path, filename)): - if match: - return os.path.normpath(match) - raise IOError("File not found: %s" % filename) - -def win_find_exe(filename, installsubdir=None, env="ProgramFiles"): - """Find executable in current dir, system path or given ProgramFiles subdir""" - for fn in [filename, filename+".exe"]: - try: - if installsubdir is None: - path = _where(fn) - else: - path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)]) - except IOError: - path = filename - else: - break - return path - - -class WinProgPath(ConfClass): - _default = "<System default>" - # We try some magic to find the appropriate executables - pdfreader = win_find_exe("AcroRd32") - psreader = win_find_exe("gsview32.exe", "Ghostgum/gsview") - dot = win_find_exe("dot", "ATT/Graphviz/bin") - tcpdump = win_find_exe("windump") - tcpreplay = win_find_exe("tcpreplay") - display = _default - hexedit = win_find_exe("hexer") - wireshark = win_find_exe("wireshark", "wireshark") - -conf.prog = WinProgPath() - -class PcapNameNotFoundError(Scapy_Exception): - pass - -def get_windows_if_list(): - ps = sp.Popen(['powershell', 'Get-NetAdapter', '|', 'select Name, InterfaceIndex, InterfaceDescription, InterfaceGuid, MacAddress', '|', 'fl'], stdout = sp.PIPE) - stdout, stdin = ps.communicate(timeout = 10) - current_interface = None - interface_list = [] - for i in stdout.split(b'\r\n'): - if not i.strip(): - continue - if i.find(b':')<0: - continue - name, value = [ j.strip() for j in i.split(b':') ] - if name == b'Name': - if current_interface: - interface_list.append(current_interface) - current_interface = {} - current_interface['name'] = value.decode('ascii') - elif name == b'InterfaceIndex': - current_interface['win_index'] = int(value) - elif name == b'InterfaceDescription': - current_interface['description'] = value.decode('ascii') - elif name == b'InterfaceGuid': - current_interface['guid'] = value.decode('ascii') - elif name == b'MacAddress': - current_interface['mac'] = ':'.join([ j.decode('ascii') for j in value.split(b'-')]) - if current_interface: - interface_list.append(current_interface) - return interface_list - -class NetworkInterface(object): - """A network interface of your local host""" - - def __init__(self, data=None): - self.name = None - self.ip = None - self.mac = None - self.pcap_name = None - self.description = None - self.data = data - if data is not None: - self.update(data) - - def update(self, data): - """Update info about network interface according to given dnet dictionary""" - self.name = data["name"] - self.description = data['description'] - self.win_index = data['win_index'] - # Other attributes are optional - self._update_pcapdata() - try: - self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid'])) - except (KeyError, AttributeError, NameError): - pass - try: - self.mac = data['mac'] - except KeyError: - pass - - def _update_pcapdata(self): - for i in winpcapy_get_if_list(): - if i.endswith(self.data['guid']): - self.pcap_name = i - return - - raise PcapNameNotFoundError - - def __repr__(self): - return "<%s: %s %s %s pcap_name=%s description=%s>" % (self.__class__.__name__, - self.name, self.ip, self.mac, self.pcap_name, self.description) - -from collections import UserDict - -class NetworkInterfaceDict(UserDict): - """Store information about network interfaces and convert between names""" - def load_from_powershell(self): - for i in get_windows_if_list(): - try: - interface = NetworkInterface(i) - self.data[interface.name] = interface - except (KeyError, PcapNameNotFoundError): - pass - if len(self.data) == 0: - log_loading.warning("No match between your pcap and windows network interfaces found. " - "You probably won't be able to send packets. " - "Deactivating unneeded interfaces and restarting Scapy might help." - "Check your winpcap and powershell installation, and access rights.") - - def pcap_name(self, devname): - """Return pcap device name for given Windows device name.""" - - try: - pcap_name = self.data[devname].pcap_name - except KeyError: - raise ValueError("Unknown network interface %r" % devname) - else: - return pcap_name - - def devname(self, pcap_name): - """Return Windows device name for given pcap device name.""" - - for devname, iface in self.items(): - if iface.pcap_name == pcap_name: - return iface.name - raise ValueError("Unknown pypcap network interface %r" % pcap_name) - - def devname_from_index(self, if_index): - """Return interface name from interface index""" - for devname, iface in self.items(): - if iface.win_index == if_index: - return iface.name - raise ValueError("Unknown network interface index %r" % if_index) - - def show(self, resolve_mac=True): - """Print list of available network interfaces in human readable form""" - - print("%s %s %s %s" % ("INDEX".ljust(5), "IFACE".ljust(35), "IP".ljust(15), "MAC")) - for iface_name in sorted(self.data.keys()): - dev = self.data[iface_name] - mac = dev.mac - if resolve_mac: - mac = conf.manufdb._resolve_MAC(mac) - print("%s %s %s %s" % (str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac) ) - -ifaces = NetworkInterfaceDict() -ifaces.load_from_powershell() - -def pcap_name(devname): - """Return pypcap device name for given libdnet/Scapy device name""" - try: - pcap_name = ifaces.pcap_name(devname) - except ValueError: - # pcap.pcap() will choose a sensible default for sniffing if iface=None - pcap_name = None - return pcap_name - -def devname(pcap_name): - """Return libdnet/Scapy device name for given pypcap device name""" - return ifaces.devname(pcap_name) - -def devname_from_index(if_index): - """Return Windows adapter name for given Windows interface index""" - return ifaces.devname_from_index(if_index) - -def show_interfaces(resolve_mac=True): - """Print list of available network interfaces""" - return ifaces.show(resolve_mac) - -_orig_open_pcap = pcapdnet.open_pcap -pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcap_name(iface),*args,**kargs) - -_orig_get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr -pcapdnet.get_if_raw_hwaddr = lambda iface,*args,**kargs: [ int(i, 16) for i in ifaces[iface].mac.split(':') ] -get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr - -def read_routes(): - routes = [] - if_index = '(\d+)' - dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)' - next_hop = '(\d+\.\d+\.\d+\.\d+)' - metric_pattern = "(\d+)" - delim = "\s+" # The columns are separated by whitespace - netstat_line = delim.join([if_index, dest, next_hop, metric_pattern]) - pattern = re.compile(netstat_line) - ps = sp.Popen(['powershell', 'Get-NetRoute', '-AddressFamily IPV4', '|', 'select ifIndex, DestinationPrefix, NextHop, RouteMetric'], stdout = sp.PIPE) - stdout, stdin = ps.communicate(timeout = 10) - for l in stdout.split(b'\r\n'): - match = re.search(pattern,l.decode('utf-8')) - if match: - try: - iface = devname_from_index(int(match.group(1))) - addr = ifaces[iface].ip - except: - continue - dest = atol(match.group(2)) - mask = itom(int(match.group(3))) - gw = match.group(4) - # try: - # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest)) - # except OSError: - # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s" % dest) - # continue - routes.append((dest, mask, gw, iface, addr)) - return routes - -def read_routes6(): - return [] - -try: - __IPYTHON__ -except NameError: - try: - import readline - console = readline.GetOutputFile() - except (ImportError, AttributeError): - log_loading.info("Could not get readline console. Will not interpret ANSI color codes.") - else: - conf.readfunc = readline.rl.readline - orig_stdout = sys.stdout - sys.stdout = console - -def sndrcv(pks, pkt, timeout = 2, inter = 0, verbose=None, chainCC=0, retry=0, multi=0): - if not isinstance(pkt, Gen): - pkt = SetGen(pkt) - - if verbose is None: - verbose = conf.verb - debug.recv = plist.PacketList([],"Unanswered") - debug.sent = plist.PacketList([],"Sent") - debug.match = plist.SndRcvList([]) - nbrecv=0 - ans = [] - # do it here to fix random fields, so that parent and child have the same - all_stimuli = tobesent = [p for p in pkt] - notans = len(tobesent) - - hsent={} - for i in tobesent: - h = i.hashret() - if h in hsent: - hsent[h].append(i) - else: - hsent[h] = [i] - if retry < 0: - retry = -retry - autostop=retry - else: - autostop=0 - - - while retry >= 0: - found=0 - - if timeout < 0: - timeout = None - - pid=1 - try: - if WINDOWS or pid == 0: - try: - try: - i = 0 - if verbose: - print("Begin emission:") - for p in tobesent: - pks.send(p) - i += 1 - time.sleep(inter) - if verbose: - print("Finished to send %i packets." % i) - except SystemExit: - pass - except KeyboardInterrupt: - pass - except: - log_runtime.exception("--- Error sending packets") - log_runtime.info("--- Error sending packets") - finally: - try: - sent_times = [p.sent_time for p in all_stimuli if p.sent_time] - except: - pass - if WINDOWS or pid > 0: - # Timeout starts after last packet is sent (as in Unix version) - if timeout: - stoptime = time.time()+timeout - else: - stoptime = 0 - remaintime = None - # inmask = [pks.ins.fd] - try: - try: - while 1: - if stoptime: - remaintime = stoptime-time.time() - if remaintime <= 0: - break - r = pks.recv(MTU) - if r is None: - continue - ok = 0 - h = r.hashret() - if h in hsent: - hlst = hsent[h] - for i in range(len(hlst)): - if r.answers(hlst[i]): - ans.append((hlst[i],r)) - if verbose > 1: - os.write(1, b"*") - ok = 1 - if not multi: - del(hlst[i]) - notans -= 1; - else: - if not hasattr(hlst[i], '_answered'): - notans -= 1; - hlst[i]._answered = 1; - break - if notans == 0 and not multi: - break - if not ok: - if verbose > 1: - os.write(1, b".") - nbrecv += 1 - if conf.debug_match: - debug.recv.append(r) - except KeyboardInterrupt: - if chainCC: - raise - finally: - if WINDOWS: - for p,t in zip(all_stimuli, sent_times): - p.sent_time = t - finally: - pass - - # remain = reduce(list.__add__, hsent.values(), []) - remain = list(itertools.chain(*[ i for i in hsent.values() ])) - - if multi: - #remain = filter(lambda p: not hasattr(p, '_answered'), remain); - remain = [ p for p in remain if not hasattr(p, '_answered')] - - if autostop and len(remain) > 0 and len(remain) != len(tobesent): - retry = autostop - - tobesent = remain - if len(tobesent) == 0: - break - retry -= 1 - - if conf.debug_match: - debug.sent=plist.PacketList(remain[:],"Sent") - debug.match=plist.SndRcvList(ans[:]) - - #clean the ans list to delete the field _answered - if (multi): - for s,r in ans: - if hasattr(s, '_answered'): - del(s._answered) - - if verbose: - print("\nReceived %i packets, got %i answers, remaining %i packets" % (nbrecv+len(ans), len(ans), notans)) - return plist.SndRcvList(ans),plist.PacketList(remain,"Unanswered") - - -import scapy.sendrecv -scapy.sendrecv.sndrcv = sndrcv - -def sniff(count=0, store=1, offline=None, prn = None, lfilter=None, L2socket=None, timeout=None, *arg, **karg): - """Sniff packets -sniff([count=0,] [prn=None,] [store=1,] [offline=None,] [lfilter=None,] + L2ListenSocket args) -> list of packets -Select interface to sniff by setting conf.iface. Use show_interfaces() to see interface names. - count: number of packets to capture. 0 means infinity - store: wether to store sniffed packets or discard them - prn: function to apply to each packet. If something is returned, - it is displayed. Ex: - ex: prn = lambda x: x.summary() -lfilter: python function applied to each packet to determine - if further action may be done - ex: lfilter = lambda x: x.haslayer(Padding) -offline: pcap file to read packets from, instead of sniffing them -timeout: stop sniffing after a given time (default: None) -L2socket: use the provided L2socket - """ - c = 0 - - if offline is None: - log_runtime.info('Sniffing on %s' % conf.iface) - if L2socket is None: - L2socket = conf.L2listen - s = L2socket(type=ETH_P_ALL, *arg, **karg) - else: - s = PcapReader(offline) - - lst = [] - if timeout is not None: - stoptime = time.time()+timeout - remain = None - while 1: - try: - if timeout is not None: - remain = stoptime-time.time() - if remain <= 0: - break - - try: - p = s.recv(MTU) - except PcapTimeoutElapsed: - continue - if p is None: - break - if lfilter and not lfilter(p): - continue - if store: - lst.append(p) - c += 1 - if prn: - r = prn(p) - if r is not None: - print(r) - if count > 0 and c >= count: - break - except KeyboardInterrupt: - break - s.close() - return plist.PacketList(lst,"Sniffed") - -import scapy.sendrecv -scapy.sendrecv.sniff = sniff - -# def get_if_list(): -# print('windows if_list') -# return sorted(ifaces.keys()) - -def get_working_if(): - try: - if 'Ethernet' in ifaces and ifaces['Ethernet'].ip != '0.0.0.0': - return 'Ethernet' - elif 'Wi-Fi' in ifaces and ifaces['Wi-Fi'].ip != '0.0.0.0': - return 'Wi-Fi' - elif len(ifaces) > 0: - return ifaces[list(ifaces.keys())[0]] - else: - return LOOPBACK_NAME - except: - return LOOPBACK_NAME - -conf.iface = get_working_if() |