diff options
Diffstat (limited to 'scripts/external_libs/scapy-2.3.1/python2/scapy/utils.py')
-rw-r--r-- | scripts/external_libs/scapy-2.3.1/python2/scapy/utils.py | 817 |
1 files changed, 817 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-2.3.1/python2/scapy/utils.py b/scripts/external_libs/scapy-2.3.1/python2/scapy/utils.py new file mode 100644 index 00000000..b1efdf37 --- /dev/null +++ b/scripts/external_libs/scapy-2.3.1/python2/scapy/utils.py @@ -0,0 +1,817 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +General utility functions. +""" + +import os,sys,socket,types +import random,time +import gzip,zlib,cPickle +import re,struct,array +import subprocess + +import warnings +warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__) + +from config import conf +from data import MTU +from error import log_runtime,log_loading,log_interactive, Scapy_Exception +from base_classes import BasePacketList + +WINDOWS=sys.platform.startswith("win32") + +########### +## Tools ## +########### + +def get_temp_file(keep=False, autoext=""): + f = os.tempnam("","scapy") + if not keep: + conf.temp_files.append(f+autoext) + return f + +def sane_color(x): + r="" + for i in x: + j = ord(i) + if (j < 32) or (j >= 127): + r=r+conf.color_theme.not_printable(".") + else: + r=r+i + return r + +def sane(x): + r="" + for i in x: + j = ord(i) + if (j < 32) or (j >= 127): + r=r+"." + else: + r=r+i + return r + +def lhex(x): + if type(x) in (int,long): + return hex(x) + elif type(x) is tuple: + return "(%s)" % ", ".join(map(lhex, x)) + elif type(x) is list: + return "[%s]" % ", ".join(map(lhex, x)) + else: + return x + +@conf.commands.register +def hexdump(x): + x=str(x) + l = len(x) + i = 0 + while i < l: + print "%04x " % i, + for j in range(16): + if i+j < l: + print "%02X" % ord(x[i+j]), + else: + print " ", + if j%16 == 7: + print "", + print " ", + print sane_color(x[i:i+16]) + i += 16 + +@conf.commands.register +def linehexdump(x, onlyasc=0, onlyhex=0): + x = str(x) + l = len(x) + if not onlyasc: + for i in range(l): + print "%02X" % ord(x[i]), + print "", + if not onlyhex: + print sane_color(x) + +def chexdump(x): + x=str(x) + print ", ".join(map(lambda x: "%#04x"%ord(x), x)) + +def hexstr(x, onlyasc=0, onlyhex=0): + s = [] + if not onlyasc: + s.append(" ".join(map(lambda x:"%02x"%ord(x), x))) + if not onlyhex: + s.append(sane(x)) + return " ".join(s) + + +@conf.commands.register +def hexdiff(x,y): + """Show differences between 2 binary strings""" + x=str(x)[::-1] + y=str(y)[::-1] + SUBST=1 + INSERT=1 + d={} + d[-1,-1] = 0,(-1,-1) + for j in range(len(y)): + d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1) + for i in range(len(x)): + d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1) + + for j in range(len(y)): + for i in range(len(x)): + d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ), + ( d[i-1,j][0]+INSERT, (i-1,j) ), + ( d[i,j-1][0]+INSERT, (i,j-1) ) ) + + + backtrackx = [] + backtracky = [] + i=len(x)-1 + j=len(y)-1 + while not (i == j == -1): + i2,j2 = d[i,j][1] + backtrackx.append(x[i2+1:i+1]) + backtracky.append(y[j2+1:j+1]) + i,j = i2,j2 + + + + x = y = i = 0 + colorize = { 0: lambda x:x, + -1: conf.color_theme.left, + 1: conf.color_theme.right } + + dox=1 + doy=0 + l = len(backtrackx) + while i < l: + separate=0 + linex = backtrackx[i:i+16] + liney = backtracky[i:i+16] + xx = sum(len(k) for k in linex) + yy = sum(len(k) for k in liney) + if dox and not xx: + dox = 0 + doy = 1 + if dox and linex == liney: + doy=1 + + if dox: + xd = y + j = 0 + while not linex[j]: + j += 1 + xd -= 1 + print colorize[doy-dox]("%04x" % xd), + x += xx + line=linex + else: + print " ", + if doy: + yd = y + j = 0 + while not liney[j]: + j += 1 + yd -= 1 + print colorize[doy-dox]("%04x" % yd), + y += yy + line=liney + else: + print " ", + + print " ", + + cl = "" + for j in range(16): + if i+j < l: + if line[j]: + col = colorize[(linex[j]!=liney[j])*(doy-dox)] + print col("%02X" % ord(line[j])), + if linex[j]==liney[j]: + cl += sane_color(line[j]) + else: + cl += col(sane(line[j])) + else: + print " ", + cl += " " + else: + print " ", + if j == 7: + print "", + + + print " ",cl + + if doy or not yy: + doy=0 + dox=1 + i += 16 + else: + if yy: + dox=0 + doy=1 + else: + i += 16 + + +crc32 = zlib.crc32 + +if struct.pack("H",1) == "\x00\x01": # big endian + def checksum(pkt): + if len(pkt) % 2 == 1: + pkt += "\0" + s = sum(array.array("H", pkt)) + s = (s >> 16) + (s & 0xffff) + s += s >> 16 + s = ~s + return s & 0xffff +else: + def checksum(pkt): + if len(pkt) % 2 == 1: + pkt += "\0" + s = sum(array.array("H", pkt)) + s = (s >> 16) + (s & 0xffff) + s += s >> 16 + s = ~s + return (((s>>8)&0xff)|s<<8) & 0xffff + +def warning(x): + log_runtime.warning(x) + +def mac2str(mac): + return "".join(map(lambda x: chr(int(x,16)), mac.split(":"))) + +def str2mac(s): + return ("%02x:"*6)[:-1] % tuple(map(ord, s)) + +def strxor(x,y): + return "".join(map(lambda x,y:chr(ord(x)^ord(y)),x,y)) + +# Workarround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 +try: + socket.inet_aton("255.255.255.255") +except socket.error: + def inet_aton(x): + if x == "255.255.255.255": + return "\xff"*4 + else: + return socket.inet_aton(x) +else: + inet_aton = socket.inet_aton + +inet_ntoa = socket.inet_ntoa +try: + inet_ntop = socket.inet_ntop + inet_pton = socket.inet_pton +except AttributeError: + from scapy.pton_ntop import * + log_loading.info("inet_ntop/pton functions not found. Python IPv6 support not present") + + +def atol(x): + try: + ip = inet_aton(x) + except socket.error: + ip = inet_aton(socket.gethostbyname(x)) + return struct.unpack("!I", ip)[0] +def ltoa(x): + return inet_ntoa(struct.pack("!I", x&0xffffffff)) + +def itom(x): + return (0xffffffff00000000L>>x)&0xffffffffL + +def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,options=None): + """do_graph(graph, prog=conf.prog.dot, format="svg", + target="| conf.prog.display", options=None, [string=1]): + string: if not None, simply return the graph string + graph: GraphViz graph description + format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option + target: filename or redirect. Defaults pipe to Imagemagick's display program + prog: which graphviz program to use + options: options to be passed to prog""" + + if format is None: + if WINDOWS: + format = "png" # use common format to make sure a viewer is installed + else: + format = "svg" + if string: + return graph + if type is not None: + format=type + if prog is None: + prog = conf.prog.dot + start_viewer=False + if target is None: + if WINDOWS: + tempfile = os.tempnam("", "scapy") + "." + format + target = "> %s" % tempfile + start_viewer = True + else: + target = "| %s" % conf.prog.display + if format is not None: + format = "-T %s" % format + w,r = os.popen2("%s %s %s %s" % (prog,options or "", format or "", target)) + w.write(graph) + w.close() + if start_viewer: + # Workaround for file not found error: We wait until tempfile is written. + waiting_start = time.time() + while not os.path.exists(tempfile): + time.sleep(0.1) + if time.time() - waiting_start > 3: + warning("Temporary file '%s' could not be written. Graphic will not be displayed." % tempfile) + break + else: + if conf.prog.display == conf.prog._default: + os.startfile(tempfile) + else: + subprocess.Popen([conf.prog.display, tempfile]) + +_TEX_TR = { + "{":"{\\tt\\char123}", + "}":"{\\tt\\char125}", + "\\":"{\\tt\\char92}", + "^":"\\^{}", + "$":"\\$", + "#":"\\#", + "~":"\\~", + "_":"\\_", + "&":"\\&", + "%":"\\%", + "|":"{\\tt\\char124}", + "~":"{\\tt\\char126}", + "<":"{\\tt\\char60}", + ">":"{\\tt\\char62}", + } + +def tex_escape(x): + s = "" + for c in x: + s += _TEX_TR.get(c,c) + return s + +def colgen(*lstcol,**kargs): + """Returns a generator that mixes provided quantities forever + trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" + if len(lstcol) < 2: + lstcol *= 2 + trans = kargs.get("trans", lambda x,y,z: (x,y,z)) + while 1: + for i in range(len(lstcol)): + for j in range(len(lstcol)): + for k in range(len(lstcol)): + if i != j or j != k or k != i: + yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)]) + +def incremental_label(label="tag%05i", start=0): + while True: + yield label % start + start += 1 + +######################### +#### Enum management #### +######################### + +class EnumElement: + _value=None + def __init__(self, key, value): + self._key = key + self._value = value + def __repr__(self): + return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) + def __getattr__(self, attr): + return getattr(self._value, attr) + def __str__(self): + return self._key + def __eq__(self, other): + return self._value == int(other) + + +class Enum_metaclass(type): + element_class = EnumElement + def __new__(cls, name, bases, dct): + rdict={} + for k,v in dct.iteritems(): + if type(v) is int: + v = cls.element_class(k,v) + dct[k] = v + rdict[v] = k + dct["__rdict__"] = rdict + return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) + def __getitem__(self, attr): + return self.__rdict__[attr] + def __contains__(self, val): + return val in self.__rdict__ + def get(self, attr, val=None): + return self._rdict__.get(attr, val) + def __repr__(self): + return "<%s>" % self.__dict__.get("name", self.__name__) + + + +################### +## Object saving ## +################### + + +def export_object(obj): + print gzip.zlib.compress(cPickle.dumps(obj,2),9).encode("base64") + +def import_object(obj=None): + if obj is None: + obj = sys.stdin.read() + return cPickle.loads(gzip.zlib.decompress(obj.strip().decode("base64"))) + + +def save_object(fname, obj): + cPickle.dump(obj,gzip.open(fname,"wb")) + +def load_object(fname): + return cPickle.load(gzip.open(fname,"rb")) + +@conf.commands.register +def corrupt_bytes(s, p=0.01, n=None): + """Corrupt a given percentage or number of bytes from a string""" + s = array.array("B",str(s)) + l = len(s) + if n is None: + n = max(1,int(l*p)) + for i in random.sample(xrange(l), n): + s[i] = (s[i]+random.randint(1,255))%256 + return s.tostring() + +@conf.commands.register +def corrupt_bits(s, p=0.01, n=None): + """Flip a given percentage or number of bits from a string""" + s = array.array("B",str(s)) + l = len(s)*8 + if n is None: + n = max(1,int(l*p)) + for i in random.sample(xrange(l), n): + s[i/8] ^= 1 << (i%8) + return s.tostring() + + + + +############################# +## pcap capture file stuff ## +############################# + +@conf.commands.register +def wrpcap(filename, pkt, *args, **kargs): + """Write a list of packets to a pcap file +gz: set to 1 to save a gzipped capture +linktype: force linktype value +endianness: "<" or ">", force endianness""" + PcapWriter(filename, *args, **kargs).write(pkt) + +@conf.commands.register +def rdpcap(filename, count=-1): + """Read a pcap file and return a packet list +count: read only <count> packets""" + return PcapReader(filename).read_all(count=count) + + + +class RawPcapReader: + """A stateful pcap reader. Each packet is returned as a string""" + + def __init__(self, filename): + self.filename = filename + try: + self.f = gzip.open(filename,"rb") + magic = self.f.read(4) + except IOError: + self.f = open(filename,"rb") + magic = self.f.read(4) + if magic == "\xa1\xb2\xc3\xd4": #big endian + self.endian = ">" + elif magic == "\xd4\xc3\xb2\xa1": #little endian + self.endian = "<" + else: + raise Scapy_Exception("Not a pcap capture file (bad magic)") + hdr = self.f.read(20) + if len(hdr)<20: + raise Scapy_Exception("Invalid pcap file (too short)") + vermaj,vermin,tz,sig,snaplen,linktype = struct.unpack(self.endian+"HHIIII",hdr) + + self.linktype = linktype + + + + def __iter__(self): + return self + + def next(self): + """impliment the iterator protocol on a set of packets in a pcap file""" + pkt = self.read_packet() + if pkt == None: + raise StopIteration + return pkt + + + def read_packet(self, size=MTU): + """return a single packet read from the file + + returns None when no more packets are available + """ + hdr = self.f.read(16) + if len(hdr) < 16: + return None + sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr) + s = self.f.read(caplen)[:MTU] + return s,(sec,usec,wirelen) # caplen = len(s) + + + def dispatch(self, callback): + """call the specified callback routine for each packet read + + This is just a convienience function for the main loop + that allows for easy launching of packet processing in a + thread. + """ + for p in self: + callback(p) + + def read_all(self,count=-1): + """return a list of all packets in the pcap file + """ + res=[] + while count != 0: + count -= 1 + p = self.read_packet() + if p is None: + break + res.append(p) + return res + + def recv(self, size=MTU): + """ Emulate a socket + """ + return self.read_packet(size)[0] + + def fileno(self): + return self.f.fileno() + + def close(self): + return self.f.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tracback): + pass + + +class PcapReader(RawPcapReader): + def __init__(self, filename): + RawPcapReader.__init__(self, filename) + try: + self.LLcls = conf.l2types[self.linktype] + except KeyError: + raise Scapy_Exception("Scapy PcapReader: unknown LL type [%i]/[%#x]" % (self.linktype,self.linktype)) + #warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) + self.LLcls = conf.raw_layer + def read_packet(self, size=MTU): + rp = RawPcapReader.read_packet(self,size) + if rp is None: + return None + s,(sec,usec,wirelen) = rp + + try: + p = self.LLcls(s) + except KeyboardInterrupt: + raise + except: + if conf.debug_dissector: + raise + p = conf.raw_layer(s) + p.time = sec+0.000001*usec + return p + def read_all(self,count=-1): + res = RawPcapReader.read_all(self, count) + import plist + return plist.PacketList(res,name = os.path.basename(self.filename)) + def recv(self, size=MTU): + return self.read_packet(size) + + + +class RawPcapWriter: + """A stream PCAP writer with more control than wrpcap()""" + def __init__(self, filename, linktype=None, gz=False, endianness="", append=False, sync=False): + """ + linktype: force linktype to a given value. If None, linktype is taken + from the first writter packet + gz: compress the capture on the fly + endianness: force an endianness (little:"<", big:">"). Default is native + append: append packets to the capture file instead of truncating it + sync: do not bufferize writes to the capture file + """ + + self.linktype = linktype + self.header_present = 0 + self.append=append + self.gz = gz + self.endian = endianness + self.filename=filename + self.sync=sync + bufsz=4096 + if sync: + bufsz=0 + + self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz) + + def fileno(self): + return self.f.fileno() + + def _write_header(self, pkt): + self.header_present=1 + + if self.append: + # Even if prone to race conditions, this seems to be + # safest way to tell whether the header is already present + # because we have to handle compressed streams that + # are not as flexible as basic files + g = [open,gzip.open][self.gz](self.filename,"rb") + if g.read(16): + return + + self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b2c3d4L, + 2, 4, 0, 0, MTU, self.linktype)) + self.f.flush() + + + def write(self, pkt): + """accepts a either a single packet or a list of packets + to be written to the dumpfile + """ + if not self.header_present: + self._write_header(pkt) + if type(pkt) is str: + self._write_packet(pkt) + else: + for p in pkt: + self._write_packet(p) + + def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None): + """writes a single packet to the pcap file + """ + if caplen is None: + caplen = len(packet) + if wirelen is None: + wirelen = caplen + if sec is None or usec is None: + t=time.time() + it = int(t) + if sec is None: + sec = it + if usec is None: + usec = int(round((t-it)*1000000)) + self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen)) + self.f.write(packet) + if self.gz and self.sync: + self.f.flush() + + def flush(self): + return self.f.flush() + + def close(self): + return self.f.close() + + def __enter__(self): + return self + def __exit__(self, exc_type, exc_value, tracback): + self.flush() + + +class PcapWriter(RawPcapWriter): + def _write_header(self, pkt): + if self.linktype == None: + if type(pkt) is list or type(pkt) is tuple or isinstance(pkt,BasePacketList): + pkt = pkt[0] + try: + self.linktype = conf.l2types[pkt.__class__] + except KeyError: + warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)" % pkt.__class__.__name__) + self.linktype = 1 + RawPcapWriter._write_header(self, pkt) + + def _write_packet(self, packet): + sec = int(packet.time) + usec = int(round((packet.time-sec)*1000000)) + s = str(packet) + caplen = len(s) + RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) + + +re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") + +def import_hexcap(): + p = "" + try: + while 1: + l = raw_input().strip() + try: + p += re_extract_hexcap.match(l).groups()[2] + except: + warning("Parsing error during hexcap") + continue + except EOFError: + pass + + p = p.replace(" ","") + return p.decode("hex") + + + +@conf.commands.register +def wireshark(pktlist): + """Run wireshark on a list of packets""" + f = get_temp_file() + wrpcap(f, pktlist) + subprocess.Popen([conf.prog.wireshark, "-r", f]) + +@conf.commands.register +def hexedit(x): + x = str(x) + f = get_temp_file() + open(f,"w").write(x) + subprocess.call([conf.prog.hexedit, f]) + x = open(f).read() + os.unlink(f) + return x + +def __make_table(yfmtfunc, fmtfunc, endline, list, fxyz, sortx=None, sorty=None, seplinefunc=None): + vx = {} + vy = {} + vz = {} + vxf = {} + vyf = {} + l = 0 + for e in list: + xx,yy,zz = map(str, fxyz(e)) + l = max(len(yy),l) + vx[xx] = max(vx.get(xx,0), len(xx), len(zz)) + vy[yy] = None + vz[(xx,yy)] = zz + + vxk = vx.keys() + vyk = vy.keys() + if sortx: + vxk.sort(sortx) + else: + try: + vxk.sort(lambda x,y:int(x)-int(y)) + except: + try: + vxk.sort(lambda x,y: cmp(atol(x),atol(y))) + except: + vxk.sort() + if sorty: + vyk.sort(sorty) + else: + try: + vyk.sort(lambda x,y:int(x)-int(y)) + except: + try: + vyk.sort(lambda x,y: cmp(atol(x),atol(y))) + except: + vyk.sort() + + + if seplinefunc: + sepline = seplinefunc(l, map(lambda x:vx[x],vxk)) + print sepline + + fmt = yfmtfunc(l) + print fmt % "", + for x in vxk: + vxf[x] = fmtfunc(vx[x]) + print vxf[x] % x, + print endline + if seplinefunc: + print sepline + for y in vyk: + print fmt % y, + for x in vxk: + print vxf[x] % vz.get((x,y), "-"), + print endline + if seplinefunc: + print sepline + +def make_table(*args, **kargs): + __make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs) + +def make_lined_table(*args, **kargs): + __make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "", + seplinefunc=lambda a,x:"+".join(map(lambda y:"-"*(y+2), [a-1]+x+[-2])), + *args, **kargs) + +def make_tex_table(*args, **kargs): + __make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs) + |