diff options
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/scapy/utils.py')
-rw-r--r-- | scripts/external_libs/scapy-2.3.1/scapy/utils.py | 816 |
1 files changed, 0 insertions, 816 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/scapy/utils.py b/scripts/external_libs/scapy-2.3.1/scapy/utils.py deleted file mode 100644 index c5ac2520..00000000 --- a/scripts/external_libs/scapy-2.3.1/scapy/utils.py +++ /dev/null @@ -1,816 +0,0 @@ -## This file is part of Scapy -## See http://www.secdev.org/projects/scapy for more informations -## Copyright (C) Philippe Biondi <phil@secdev.org> -## This program is published under a GPLv2 license - -""" -General utility functions. -""" - -import os,sys,socket,types -import random,time -import gzip,zlib,cPickle -import re,struct,array -import subprocess - -import warnings -warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__) - -from config import conf -from data import MTU -from error import log_runtime,log_loading,log_interactive, Scapy_Exception -from base_classes import BasePacketList - -WINDOWS=sys.platform.startswith("win32") - -########### -## Tools ## -########### - -def get_temp_file(keep=False, autoext=""): - f = os.tempnam("","scapy") - if not keep: - conf.temp_files.append(f+autoext) - return f - -def sane_color(x): - r="" - for i in x: - j = ord(i) - if (j < 32) or (j >= 127): - r=r+conf.color_theme.not_printable(".") - else: - r=r+i - return r - -def sane(x): - r="" - for i in x: - j = ord(i) - if (j < 32) or (j >= 127): - r=r+"." - else: - r=r+i - return r - -def lhex(x): - if type(x) in (int,long): - return hex(x) - elif type(x) is tuple: - return "(%s)" % ", ".join(map(lhex, x)) - elif type(x) is list: - return "[%s]" % ", ".join(map(lhex, x)) - else: - return x - -@conf.commands.register -def hexdump(x): - x=str(x) - l = len(x) - i = 0 - while i < l: - print "%04x " % i, - for j in range(16): - if i+j < l: - print "%02X" % ord(x[i+j]), - else: - print " ", - if j%16 == 7: - print "", - print " ", - print sane_color(x[i:i+16]) - i += 16 - -@conf.commands.register -def linehexdump(x, onlyasc=0, onlyhex=0): - x = str(x) - l = len(x) - if not onlyasc: - for i in range(l): - print "%02X" % ord(x[i]), - print "", - if not onlyhex: - print sane_color(x) - -def chexdump(x): - x=str(x) - print ", ".join(map(lambda x: "%#04x"%ord(x), x)) - -def hexstr(x, onlyasc=0, onlyhex=0): - s = [] - if not onlyasc: - s.append(" ".join(map(lambda x:"%02x"%ord(x), x))) - if not onlyhex: - s.append(sane(x)) - return " ".join(s) - - -@conf.commands.register -def hexdiff(x,y): - """Show differences between 2 binary strings""" - x=str(x)[::-1] - y=str(y)[::-1] - SUBST=1 - INSERT=1 - d={} - d[-1,-1] = 0,(-1,-1) - for j in range(len(y)): - d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1) - for i in range(len(x)): - d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1) - - for j in range(len(y)): - for i in range(len(x)): - d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ), - ( d[i-1,j][0]+INSERT, (i-1,j) ), - ( d[i,j-1][0]+INSERT, (i,j-1) ) ) - - - backtrackx = [] - backtracky = [] - i=len(x)-1 - j=len(y)-1 - while not (i == j == -1): - i2,j2 = d[i,j][1] - backtrackx.append(x[i2+1:i+1]) - backtracky.append(y[j2+1:j+1]) - i,j = i2,j2 - - - - x = y = i = 0 - colorize = { 0: lambda x:x, - -1: conf.color_theme.left, - 1: conf.color_theme.right } - - dox=1 - doy=0 - l = len(backtrackx) - while i < l: - separate=0 - linex = backtrackx[i:i+16] - liney = backtracky[i:i+16] - xx = sum(len(k) for k in linex) - yy = sum(len(k) for k in liney) - if dox and not xx: - dox = 0 - doy = 1 - if dox and linex == liney: - doy=1 - - if dox: - xd = y - j = 0 - while not linex[j]: - j += 1 - xd -= 1 - print colorize[doy-dox]("%04x" % xd), - x += xx - line=linex - else: - print " ", - if doy: - yd = y - j = 0 - while not liney[j]: - j += 1 - yd -= 1 - print colorize[doy-dox]("%04x" % yd), - y += yy - line=liney - else: - print " ", - - print " ", - - cl = "" - for j in range(16): - if i+j < l: - if line[j]: - col = colorize[(linex[j]!=liney[j])*(doy-dox)] - print col("%02X" % ord(line[j])), - if linex[j]==liney[j]: - cl += sane_color(line[j]) - else: - cl += col(sane(line[j])) - else: - print " ", - cl += " " - else: - print " ", - if j == 7: - print "", - - - print " ",cl - - if doy or not yy: - doy=0 - dox=1 - i += 16 - else: - if yy: - dox=0 - doy=1 - else: - i += 16 - - -crc32 = zlib.crc32 - -if struct.pack("H",1) == "\x00\x01": # big endian - def checksum(pkt): - if len(pkt) % 2 == 1: - pkt += "\0" - s = sum(array.array("H", pkt)) - s = (s >> 16) + (s & 0xffff) - s += s >> 16 - s = ~s - return s & 0xffff -else: - def checksum(pkt): - if len(pkt) % 2 == 1: - pkt += "\0" - s = sum(array.array("H", pkt)) - s = (s >> 16) + (s & 0xffff) - s += s >> 16 - s = ~s - return (((s>>8)&0xff)|s<<8) & 0xffff - -def warning(x): - log_runtime.warning(x) - -def mac2str(mac): - return "".join(map(lambda x: chr(int(x,16)), mac.split(":"))) - -def str2mac(s): - return ("%02x:"*6)[:-1] % tuple(map(ord, s)) - -def strxor(x,y): - return "".join(map(lambda x,y:chr(ord(x)^ord(y)),x,y)) - -# Workarround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 -try: - socket.inet_aton("255.255.255.255") -except socket.error: - def inet_aton(x): - if x == "255.255.255.255": - return "\xff"*4 - else: - return socket.inet_aton(x) -else: - inet_aton = socket.inet_aton - -inet_ntoa = socket.inet_ntoa -try: - inet_ntop = socket.inet_ntop - inet_pton = socket.inet_pton -except AttributeError: - from scapy.pton_ntop import * - log_loading.info("inet_ntop/pton functions not found. Python IPv6 support not present") - - -def atol(x): - try: - ip = inet_aton(x) - except socket.error: - ip = inet_aton(socket.gethostbyname(x)) - return struct.unpack("!I", ip)[0] -def ltoa(x): - return inet_ntoa(struct.pack("!I", x&0xffffffff)) - -def itom(x): - return (0xffffffff00000000L>>x)&0xffffffffL - -def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,options=None): - """do_graph(graph, prog=conf.prog.dot, format="svg", - target="| conf.prog.display", options=None, [string=1]): - string: if not None, simply return the graph string - graph: GraphViz graph description - format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option - target: filename or redirect. Defaults pipe to Imagemagick's display program - prog: which graphviz program to use - options: options to be passed to prog""" - - if format is None: - if WINDOWS: - format = "png" # use common format to make sure a viewer is installed - else: - format = "svg" - if string: - return graph - if type is not None: - format=type - if prog is None: - prog = conf.prog.dot - start_viewer=False - if target is None: - if WINDOWS: - tempfile = os.tempnam("", "scapy") + "." + format - target = "> %s" % tempfile - start_viewer = True - else: - target = "| %s" % conf.prog.display - if format is not None: - format = "-T %s" % format - w,r = os.popen2("%s %s %s %s" % (prog,options or "", format or "", target)) - w.write(graph) - w.close() - if start_viewer: - # Workaround for file not found error: We wait until tempfile is written. - waiting_start = time.time() - while not os.path.exists(tempfile): - time.sleep(0.1) - if time.time() - waiting_start > 3: - warning("Temporary file '%s' could not be written. Graphic will not be displayed." % tempfile) - break - else: - if conf.prog.display == conf.prog._default: - os.startfile(tempfile) - else: - subprocess.Popen([conf.prog.display, tempfile]) - -_TEX_TR = { - "{":"{\\tt\\char123}", - "}":"{\\tt\\char125}", - "\\":"{\\tt\\char92}", - "^":"\\^{}", - "$":"\\$", - "#":"\\#", - "~":"\\~", - "_":"\\_", - "&":"\\&", - "%":"\\%", - "|":"{\\tt\\char124}", - "~":"{\\tt\\char126}", - "<":"{\\tt\\char60}", - ">":"{\\tt\\char62}", - } - -def tex_escape(x): - s = "" - for c in x: - s += _TEX_TR.get(c,c) - return s - -def colgen(*lstcol,**kargs): - """Returns a generator that mixes provided quantities forever - trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" - if len(lstcol) < 2: - lstcol *= 2 - trans = kargs.get("trans", lambda x,y,z: (x,y,z)) - while 1: - for i in range(len(lstcol)): - for j in range(len(lstcol)): - for k in range(len(lstcol)): - if i != j or j != k or k != i: - yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)]) - -def incremental_label(label="tag%05i", start=0): - while True: - yield label % start - start += 1 - -######################### -#### Enum management #### -######################### - -class EnumElement: - _value=None - def __init__(self, key, value): - self._key = key - self._value = value - def __repr__(self): - return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) - def __getattr__(self, attr): - return getattr(self._value, attr) - def __str__(self): - return self._key - def __eq__(self, other): - return self._value == int(other) - - -class Enum_metaclass(type): - element_class = EnumElement - def __new__(cls, name, bases, dct): - rdict={} - for k,v in dct.iteritems(): - if type(v) is int: - v = cls.element_class(k,v) - dct[k] = v - rdict[v] = k - dct["__rdict__"] = rdict - return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) - def __getitem__(self, attr): - return self.__rdict__[attr] - def __contains__(self, val): - return val in self.__rdict__ - def get(self, attr, val=None): - return self._rdict__.get(attr, val) - def __repr__(self): - return "<%s>" % self.__dict__.get("name", self.__name__) - - - -################### -## Object saving ## -################### - - -def export_object(obj): - print gzip.zlib.compress(cPickle.dumps(obj,2),9).encode("base64") - -def import_object(obj=None): - if obj is None: - obj = sys.stdin.read() - return cPickle.loads(gzip.zlib.decompress(obj.strip().decode("base64"))) - - -def save_object(fname, obj): - cPickle.dump(obj,gzip.open(fname,"wb")) - -def load_object(fname): - return cPickle.load(gzip.open(fname,"rb")) - -@conf.commands.register -def corrupt_bytes(s, p=0.01, n=None): - """Corrupt a given percentage or number of bytes from a string""" - s = array.array("B",str(s)) - l = len(s) - if n is None: - n = max(1,int(l*p)) - for i in random.sample(xrange(l), n): - s[i] = (s[i]+random.randint(1,255))%256 - return s.tostring() - -@conf.commands.register -def corrupt_bits(s, p=0.01, n=None): - """Flip a given percentage or number of bits from a string""" - s = array.array("B",str(s)) - l = len(s)*8 - if n is None: - n = max(1,int(l*p)) - for i in random.sample(xrange(l), n): - s[i/8] ^= 1 << (i%8) - return s.tostring() - - - - -############################# -## pcap capture file stuff ## -############################# - -@conf.commands.register -def wrpcap(filename, pkt, *args, **kargs): - """Write a list of packets to a pcap file -gz: set to 1 to save a gzipped capture -linktype: force linktype value -endianness: "<" or ">", force endianness""" - PcapWriter(filename, *args, **kargs).write(pkt) - -@conf.commands.register -def rdpcap(filename, count=-1): - """Read a pcap file and return a packet list -count: read only <count> packets""" - return PcapReader(filename).read_all(count=count) - - - -class RawPcapReader: - """A stateful pcap reader. Each packet is returned as a string""" - - def __init__(self, filename): - self.filename = filename - try: - self.f = gzip.open(filename,"rb") - magic = self.f.read(4) - except IOError: - self.f = open(filename,"rb") - magic = self.f.read(4) - if magic == "\xa1\xb2\xc3\xd4": #big endian - self.endian = ">" - elif magic == "\xd4\xc3\xb2\xa1": #little endian - self.endian = "<" - else: - raise Scapy_Exception("Not a pcap capture file (bad magic)") - hdr = self.f.read(20) - if len(hdr)<20: - raise Scapy_Exception("Invalid pcap file (too short)") - vermaj,vermin,tz,sig,snaplen,linktype = struct.unpack(self.endian+"HHIIII",hdr) - - self.linktype = linktype - - - - def __iter__(self): - return self - - def next(self): - """impliment the iterator protocol on a set of packets in a pcap file""" - pkt = self.read_packet() - if pkt == None: - raise StopIteration - return pkt - - - def read_packet(self, size=MTU): - """return a single packet read from the file - - returns None when no more packets are available - """ - hdr = self.f.read(16) - if len(hdr) < 16: - return None - sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr) - s = self.f.read(caplen)[:MTU] - return s,(sec,usec,wirelen) # caplen = len(s) - - - def dispatch(self, callback): - """call the specified callback routine for each packet read - - This is just a convienience function for the main loop - that allows for easy launching of packet processing in a - thread. - """ - for p in self: - callback(p) - - def read_all(self,count=-1): - """return a list of all packets in the pcap file - """ - res=[] - while count != 0: - count -= 1 - p = self.read_packet() - if p is None: - break - res.append(p) - return res - - def recv(self, size=MTU): - """ Emulate a socket - """ - return self.read_packet(size)[0] - - def fileno(self): - return self.f.fileno() - - def close(self): - return self.f.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, tracback): - pass - - -class PcapReader(RawPcapReader): - def __init__(self, filename): - RawPcapReader.__init__(self, filename) - try: - self.LLcls = conf.l2types[self.linktype] - except KeyError: - warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) - self.LLcls = conf.raw_layer - def read_packet(self, size=MTU): - rp = RawPcapReader.read_packet(self,size) - if rp is None: - return None - s,(sec,usec,wirelen) = rp - - try: - p = self.LLcls(s) - except KeyboardInterrupt: - raise - except: - if conf.debug_dissector: - raise - p = conf.raw_layer(s) - p.time = sec+0.000001*usec - return p - def read_all(self,count=-1): - res = RawPcapReader.read_all(self, count) - import plist - return plist.PacketList(res,name = os.path.basename(self.filename)) - def recv(self, size=MTU): - return self.read_packet(size) - - - -class RawPcapWriter: - """A stream PCAP writer with more control than wrpcap()""" - def __init__(self, filename, linktype=None, gz=False, endianness="", append=False, sync=False): - """ - linktype: force linktype to a given value. If None, linktype is taken - from the first writter packet - gz: compress the capture on the fly - endianness: force an endianness (little:"<", big:">"). Default is native - append: append packets to the capture file instead of truncating it - sync: do not bufferize writes to the capture file - """ - - self.linktype = linktype - self.header_present = 0 - self.append=append - self.gz = gz - self.endian = endianness - self.filename=filename - self.sync=sync - bufsz=4096 - if sync: - bufsz=0 - - self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz) - - def fileno(self): - return self.f.fileno() - - def _write_header(self, pkt): - self.header_present=1 - - if self.append: - # Even if prone to race conditions, this seems to be - # safest way to tell whether the header is already present - # because we have to handle compressed streams that - # are not as flexible as basic files - g = [open,gzip.open][self.gz](self.filename,"rb") - if g.read(16): - return - - self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b2c3d4L, - 2, 4, 0, 0, MTU, self.linktype)) - self.f.flush() - - - def write(self, pkt): - """accepts a either a single packet or a list of packets - to be written to the dumpfile - """ - if not self.header_present: - self._write_header(pkt) - if type(pkt) is str: - self._write_packet(pkt) - else: - for p in pkt: - self._write_packet(p) - - def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None): - """writes a single packet to the pcap file - """ - if caplen is None: - caplen = len(packet) - if wirelen is None: - wirelen = caplen - if sec is None or usec is None: - t=time.time() - it = int(t) - if sec is None: - sec = it - if usec is None: - usec = int(round((t-it)*1000000)) - self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen)) - self.f.write(packet) - if self.gz and self.sync: - self.f.flush() - - def flush(self): - return self.f.flush() - - def close(self): - return self.f.close() - - def __enter__(self): - return self - def __exit__(self, exc_type, exc_value, tracback): - self.flush() - - -class PcapWriter(RawPcapWriter): - def _write_header(self, pkt): - if self.linktype == None: - if type(pkt) is list or type(pkt) is tuple or isinstance(pkt,BasePacketList): - pkt = pkt[0] - try: - self.linktype = conf.l2types[pkt.__class__] - except KeyError: - warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)" % pkt.__class__.__name__) - self.linktype = 1 - RawPcapWriter._write_header(self, pkt) - - def _write_packet(self, packet): - sec = int(packet.time) - usec = int(round((packet.time-sec)*1000000)) - s = str(packet) - caplen = len(s) - RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) - - -re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") - -def import_hexcap(): - p = "" - try: - while 1: - l = raw_input().strip() - try: - p += re_extract_hexcap.match(l).groups()[2] - except: - warning("Parsing error during hexcap") - continue - except EOFError: - pass - - p = p.replace(" ","") - return p.decode("hex") - - - -@conf.commands.register -def wireshark(pktlist): - """Run wireshark on a list of packets""" - f = get_temp_file() - wrpcap(f, pktlist) - subprocess.Popen([conf.prog.wireshark, "-r", f]) - -@conf.commands.register -def hexedit(x): - x = str(x) - f = get_temp_file() - open(f,"w").write(x) - subprocess.call([conf.prog.hexedit, f]) - x = open(f).read() - os.unlink(f) - return x - -def __make_table(yfmtfunc, fmtfunc, endline, list, fxyz, sortx=None, sorty=None, seplinefunc=None): - vx = {} - vy = {} - vz = {} - vxf = {} - vyf = {} - l = 0 - for e in list: - xx,yy,zz = map(str, fxyz(e)) - l = max(len(yy),l) - vx[xx] = max(vx.get(xx,0), len(xx), len(zz)) - vy[yy] = None - vz[(xx,yy)] = zz - - vxk = vx.keys() - vyk = vy.keys() - if sortx: - vxk.sort(sortx) - else: - try: - vxk.sort(lambda x,y:int(x)-int(y)) - except: - try: - vxk.sort(lambda x,y: cmp(atol(x),atol(y))) - except: - vxk.sort() - if sorty: - vyk.sort(sorty) - else: - try: - vyk.sort(lambda x,y:int(x)-int(y)) - except: - try: - vyk.sort(lambda x,y: cmp(atol(x),atol(y))) - except: - vyk.sort() - - - if seplinefunc: - sepline = seplinefunc(l, map(lambda x:vx[x],vxk)) - print sepline - - fmt = yfmtfunc(l) - print fmt % "", - for x in vxk: - vxf[x] = fmtfunc(vx[x]) - print vxf[x] % x, - print endline - if seplinefunc: - print sepline - for y in vyk: - print fmt % y, - for x in vxk: - print vxf[x] % vz.get((x,y), "-"), - print endline - if seplinefunc: - print sepline - -def make_table(*args, **kargs): - __make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs) - -def make_lined_table(*args, **kargs): - __make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "", - seplinefunc=lambda a,x:"+".join(map(lambda y:"-"*(y+2), [a-1]+x+[-2])), - *args, **kargs) - -def make_tex_table(*args, **kargs): - __make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs) - |