diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py new file mode 100644 index 00000000..3bd1ade3 --- /dev/null +++ b/scripts/external_libs/scapy-python3-0.18/scapy/arch/windows/__init__.py @@ -0,0 +1,501 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +Customizations needed to support Microsoft Windows. +""" + +import os,re,sys,socket,time, itertools +import subprocess as sp +from glob import glob +from scapy.config import conf,ConfClass +from scapy.error import Scapy_Exception,log_loading,log_runtime +from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader +from scapy.base_classes import Gen, Net, SetGen +import scapy.plist as plist +from scapy.sendrecv import debug, srp1 +from scapy.layers.l2 import Ether, ARP +from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP + +conf.use_winpcapy = True +from scapy.arch import pcapdnet +from scapy.arch.pcapdnet import * + +LOOPBACK_NAME="lo0" +WINDOWS = True + + +def _where(filename, dirs=[], env="PATH"): + """Find file in current dir or system path""" + if not isinstance(dirs, list): + dirs = [dirs] + if glob(filename): + return filename + paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs + for path in paths: + for match in glob(os.path.join(path, filename)): + if match: + return os.path.normpath(match) + raise IOError("File not found: %s" % filename) + +def win_find_exe(filename, installsubdir=None, env="ProgramFiles"): + """Find executable in current dir, system path or given ProgramFiles subdir""" + for fn in [filename, filename+".exe"]: + try: + if installsubdir is None: + path = _where(fn) + else: + path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)]) + except IOError: + path = filename + else: + break + return path + + +class WinProgPath(ConfClass): + _default = "<System default>" + # We try some magic to find the appropriate executables + pdfreader = win_find_exe("AcroRd32") + psreader = win_find_exe("gsview32.exe", "Ghostgum/gsview") + dot = win_find_exe("dot", "ATT/Graphviz/bin") + tcpdump = win_find_exe("windump") + tcpreplay = win_find_exe("tcpreplay") + display = _default + hexedit = win_find_exe("hexer") + wireshark = win_find_exe("wireshark", "wireshark") + +conf.prog = WinProgPath() + +class PcapNameNotFoundError(Scapy_Exception): + pass + +def get_windows_if_list(): + ps = sp.Popen(['powershell', 'Get-NetAdapter', '|', 'select Name, InterfaceIndex, InterfaceDescription, InterfaceGuid, MacAddress', '|', 'fl'], stdout = sp.PIPE) + stdout, stdin = ps.communicate(timeout = 10) + current_interface = None + interface_list = [] + for i in stdout.split(b'\r\n'): + if not i.strip(): + continue + if i.find(b':')<0: + continue + name, value = [ j.strip() for j in i.split(b':') ] + if name == b'Name': + if current_interface: + interface_list.append(current_interface) + current_interface = {} + current_interface['name'] = value.decode('ascii') + elif name == b'InterfaceIndex': + current_interface['win_index'] = int(value) + elif name == b'InterfaceDescription': + current_interface['description'] = value.decode('ascii') + elif name == b'InterfaceGuid': + current_interface['guid'] = value.decode('ascii') + elif name == b'MacAddress': + current_interface['mac'] = ':'.join([ j.decode('ascii') for j in value.split(b'-')]) + if current_interface: + interface_list.append(current_interface) + return interface_list + +class NetworkInterface(object): + """A network interface of your local host""" + + def __init__(self, data=None): + self.name = None + self.ip = None + self.mac = None + self.pcap_name = None + self.description = None + self.data = data + if data is not None: + self.update(data) + + def update(self, data): + """Update info about network interface according to given dnet dictionary""" + self.name = data["name"] + self.description = data['description'] + self.win_index = data['win_index'] + # Other attributes are optional + self._update_pcapdata() + try: + self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid'])) + except (KeyError, AttributeError, NameError): + pass + try: + self.mac = data['mac'] + except KeyError: + pass + + def _update_pcapdata(self): + for i in winpcapy_get_if_list(): + if i.endswith(self.data['guid']): + self.pcap_name = i + return + + raise PcapNameNotFoundError + + def __repr__(self): + return "<%s: %s %s %s pcap_name=%s description=%s>" % (self.__class__.__name__, + self.name, self.ip, self.mac, self.pcap_name, self.description) + +from collections import UserDict + +class NetworkInterfaceDict(UserDict): + """Store information about network interfaces and convert between names""" + def load_from_powershell(self): + for i in get_windows_if_list(): + try: + interface = NetworkInterface(i) + self.data[interface.name] = interface + except (KeyError, PcapNameNotFoundError): + pass + if len(self.data) == 0: + log_loading.warning("No match between your pcap and windows network interfaces found. " + "You probably won't be able to send packets. " + "Deactivating unneeded interfaces and restarting Scapy might help." + "Check your winpcap and powershell installation, and access rights.") + + def pcap_name(self, devname): + """Return pcap device name for given Windows device name.""" + + try: + pcap_name = self.data[devname].pcap_name + except KeyError: + raise ValueError("Unknown network interface %r" % devname) + else: + return pcap_name + + def devname(self, pcap_name): + """Return Windows device name for given pcap device name.""" + + for devname, iface in self.items(): + if iface.pcap_name == pcap_name: + return iface.name + raise ValueError("Unknown pypcap network interface %r" % pcap_name) + + def devname_from_index(self, if_index): + """Return interface name from interface index""" + for devname, iface in self.items(): + if iface.win_index == if_index: + return iface.name + raise ValueError("Unknown network interface index %r" % if_index) + + def show(self, resolve_mac=True): + """Print list of available network interfaces in human readable form""" + + print("%s %s %s %s" % ("INDEX".ljust(5), "IFACE".ljust(35), "IP".ljust(15), "MAC")) + for iface_name in sorted(self.data.keys()): + dev = self.data[iface_name] + mac = dev.mac + if resolve_mac: + mac = conf.manufdb._resolve_MAC(mac) + print("%s %s %s %s" % (str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac) ) + +ifaces = NetworkInterfaceDict() +ifaces.load_from_powershell() + +def pcap_name(devname): + """Return pypcap device name for given libdnet/Scapy device name""" + try: + pcap_name = ifaces.pcap_name(devname) + except ValueError: + # pcap.pcap() will choose a sensible default for sniffing if iface=None + pcap_name = None + return pcap_name + +def devname(pcap_name): + """Return libdnet/Scapy device name for given pypcap device name""" + return ifaces.devname(pcap_name) + +def devname_from_index(if_index): + """Return Windows adapter name for given Windows interface index""" + return ifaces.devname_from_index(if_index) + +def show_interfaces(resolve_mac=True): + """Print list of available network interfaces""" + return ifaces.show(resolve_mac) + +_orig_open_pcap = pcapdnet.open_pcap +pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcap_name(iface),*args,**kargs) + +_orig_get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr +pcapdnet.get_if_raw_hwaddr = lambda iface,*args,**kargs: [ int(i, 16) for i in ifaces[iface].mac.split(':') ] +get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr + +def read_routes(): + routes = [] + if_index = '(\d+)' + dest = '(\d+\.\d+\.\d+\.\d+)/(\d+)' + next_hop = '(\d+\.\d+\.\d+\.\d+)' + metric_pattern = "(\d+)" + delim = "\s+" # The columns are separated by whitespace + netstat_line = delim.join([if_index, dest, next_hop, metric_pattern]) + pattern = re.compile(netstat_line) + ps = sp.Popen(['powershell', 'Get-NetRoute', '-AddressFamily IPV4', '|', 'select ifIndex, DestinationPrefix, NextHop, RouteMetric'], stdout = sp.PIPE) + stdout, stdin = ps.communicate(timeout = 10) + for l in stdout.split(b'\r\n'): + match = re.search(pattern,l.decode('utf-8')) + if match: + try: + iface = devname_from_index(int(match.group(1))) + addr = ifaces[iface].ip + except: + continue + dest = atol(match.group(2)) + mask = itom(int(match.group(3))) + gw = match.group(4) + # try: + # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest)) + # except OSError: + # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s" % dest) + # continue + routes.append((dest, mask, gw, iface, addr)) + return routes + +def read_routes6(): + return [] + +try: + __IPYTHON__ +except NameError: + try: + import readline + console = readline.GetOutputFile() + except (ImportError, AttributeError): + log_loading.info("Could not get readline console. Will not interpret ANSI color codes.") + else: + conf.readfunc = readline.rl.readline + orig_stdout = sys.stdout + sys.stdout = console + +def sndrcv(pks, pkt, timeout = 2, inter = 0, verbose=None, chainCC=0, retry=0, multi=0): + if not isinstance(pkt, Gen): + pkt = SetGen(pkt) + + if verbose is None: + verbose = conf.verb + debug.recv = plist.PacketList([],"Unanswered") + debug.sent = plist.PacketList([],"Sent") + debug.match = plist.SndRcvList([]) + nbrecv=0 + ans = [] + # do it here to fix random fields, so that parent and child have the same + all_stimuli = tobesent = [p for p in pkt] + notans = len(tobesent) + + hsent={} + for i in tobesent: + h = i.hashret() + if h in hsent: + hsent[h].append(i) + else: + hsent[h] = [i] + if retry < 0: + retry = -retry + autostop=retry + else: + autostop=0 + + + while retry >= 0: + found=0 + + if timeout < 0: + timeout = None + + pid=1 + try: + if WINDOWS or pid == 0: + try: + try: + i = 0 + if verbose: + print("Begin emission:") + for p in tobesent: + pks.send(p) + i += 1 + time.sleep(inter) + if verbose: + print("Finished to send %i packets." % i) + except SystemExit: + pass + except KeyboardInterrupt: + pass + except: + log_runtime.exception("--- Error sending packets") + log_runtime.info("--- Error sending packets") + finally: + try: + sent_times = [p.sent_time for p in all_stimuli if p.sent_time] + except: + pass + if WINDOWS or pid > 0: + # Timeout starts after last packet is sent (as in Unix version) + if timeout: + stoptime = time.time()+timeout + else: + stoptime = 0 + remaintime = None + # inmask = [pks.ins.fd] + try: + try: + while 1: + if stoptime: + remaintime = stoptime-time.time() + if remaintime <= 0: + break + r = pks.recv(MTU) + if r is None: + continue + ok = 0 + h = r.hashret() + if h in hsent: + hlst = hsent[h] + for i in range(len(hlst)): + if r.answers(hlst[i]): + ans.append((hlst[i],r)) + if verbose > 1: + os.write(1, b"*") + ok = 1 + if not multi: + del(hlst[i]) + notans -= 1; + else: + if not hasattr(hlst[i], '_answered'): + notans -= 1; + hlst[i]._answered = 1; + break + if notans == 0 and not multi: + break + if not ok: + if verbose > 1: + os.write(1, b".") + nbrecv += 1 + if conf.debug_match: + debug.recv.append(r) + except KeyboardInterrupt: + if chainCC: + raise + finally: + if WINDOWS: + for p,t in zip(all_stimuli, sent_times): + p.sent_time = t + finally: + pass + + # remain = reduce(list.__add__, hsent.values(), []) + remain = list(itertools.chain(*[ i for i in hsent.values() ])) + + if multi: + #remain = filter(lambda p: not hasattr(p, '_answered'), remain); + remain = [ p for p in remain if not hasattr(p, '_answered')] + + if autostop and len(remain) > 0 and len(remain) != len(tobesent): + retry = autostop + + tobesent = remain + if len(tobesent) == 0: + break + retry -= 1 + + if conf.debug_match: + debug.sent=plist.PacketList(remain[:],"Sent") + debug.match=plist.SndRcvList(ans[:]) + + #clean the ans list to delete the field _answered + if (multi): + for s,r in ans: + if hasattr(s, '_answered'): + del(s._answered) + + if verbose: + print("\nReceived %i packets, got %i answers, remaining %i packets" % (nbrecv+len(ans), len(ans), notans)) + return plist.SndRcvList(ans),plist.PacketList(remain,"Unanswered") + + +import scapy.sendrecv +scapy.sendrecv.sndrcv = sndrcv + +def sniff(count=0, store=1, offline=None, prn = None, lfilter=None, L2socket=None, timeout=None, *arg, **karg): + """Sniff packets +sniff([count=0,] [prn=None,] [store=1,] [offline=None,] [lfilter=None,] + L2ListenSocket args) -> list of packets +Select interface to sniff by setting conf.iface. Use show_interfaces() to see interface names. + count: number of packets to capture. 0 means infinity + store: wether to store sniffed packets or discard them + prn: function to apply to each packet. If something is returned, + it is displayed. Ex: + ex: prn = lambda x: x.summary() +lfilter: python function applied to each packet to determine + if further action may be done + ex: lfilter = lambda x: x.haslayer(Padding) +offline: pcap file to read packets from, instead of sniffing them +timeout: stop sniffing after a given time (default: None) +L2socket: use the provided L2socket + """ + c = 0 + + if offline is None: + log_runtime.info('Sniffing on %s' % conf.iface) + if L2socket is None: + L2socket = conf.L2listen + s = L2socket(type=ETH_P_ALL, *arg, **karg) + else: + s = PcapReader(offline) + + lst = [] + if timeout is not None: + stoptime = time.time()+timeout + remain = None + while 1: + try: + if timeout is not None: + remain = stoptime-time.time() + if remain <= 0: + break + + try: + p = s.recv(MTU) + except PcapTimeoutElapsed: + continue + if p is None: + break + if lfilter and not lfilter(p): + continue + if store: + lst.append(p) + c += 1 + if prn: + r = prn(p) + if r is not None: + print(r) + if count > 0 and c >= count: + break + except KeyboardInterrupt: + break + s.close() + return plist.PacketList(lst,"Sniffed") + +import scapy.sendrecv +scapy.sendrecv.sniff = sniff + +# def get_if_list(): +# print('windows if_list') +# return sorted(ifaces.keys()) + +def get_working_if(): + try: + if 'Ethernet' in ifaces and ifaces['Ethernet'].ip != '0.0.0.0': + return 'Ethernet' + elif 'Wi-Fi' in ifaces and ifaces['Wi-Fi'].ip != '0.0.0.0': + return 'Wi-Fi' + elif len(ifaces) > 0: + return ifaces[list(ifaces.keys())[0]] + else: + return LOOPBACK_NAME + except: + return LOOPBACK_NAME + +conf.iface = get_working_if() |