diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/utils.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/utils.py | 1054 |
1 files changed, 1054 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/utils.py b/scripts/external_libs/scapy-python3-0.18/scapy/utils.py new file mode 100644 index 00000000..252109bb --- /dev/null +++ b/scripts/external_libs/scapy-python3-0.18/scapy/utils.py @@ -0,0 +1,1054 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +General utility functions. +""" + +import os,sys,socket,types +import random,time +import gzip,zlib +import re,struct,array,stat +import subprocess + +import warnings +warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__) + +from .config import conf +from .data import MTU +from .error import log_runtime,log_loading,log_interactive, Scapy_Exception +from .base_classes import BasePacketList,BasePacket + + +WINDOWS=sys.platform.startswith("win32") + +########### +## Tools ## +########### + +def get_temp_file(keep=False, autoext=""): + import tempfile + fd, fname = tempfile.mkstemp(suffix = ".scapy" + autoext) + os.close(fd) + if not keep: + conf.temp_files.append(fname) + return fname + +def str2bytes(x): + """Convert input argument to bytes""" + if type(x) is bytes: + return x + elif type(x) is str: + return bytes([ ord(i) for i in x ]) + else: + return str2bytes(str(x)) + +def chb(x): + if type(x) is str: + return x + else: + return chr(x) + +def orb(x): + if type(x) is str: + return ord(x) + else: + return x + +def any2b(x): + if type(x) is not str and type(x) is not bytes: + try: + x=bytes(x) + except: + x = str(x) + if type(x) is str: + x = bytes([ ord(i) for i in x ]) + return x + +def sane_color(x): + r="" + for i in x: + j = orb(i) + if (j < 32) or (j >= 127): + r=r+conf.color_theme.not_printable(".") + else: + r=r+chb(i) + return r + +def sane(x): + r="" + for i in x: + if type(x) is str: + j = ord(i) + else: + j = i + if (j < 32) or (j >= 127): + r=r+"." + else: + r=r+chb(i) + return r + +def lhex(x): + if type(x) is int: + return hex(x) + elif type(x) is tuple: + return "(%s)" % ", ".join(map(lhex, x)) + elif type(x) is list: + return "[%s]" % ", ".join(map(lhex, x)) + else: + return x + +@conf.commands.register +def hexdump(x): + if type(x) is not str and type(x) is not bytes: + try: + x=bytes(x) + except: + x = str(x) + l = len(x) + i = 0 + while i < l: + print("%04x " % i,end = " ") + for j in range(16): + if i+j < l: + print("%02X" % orb(x[i+j]), end = " ") + else: + print(" ", end = " ") + if j%16 == 7: + print("", end = " ") + print(" ", end = " ") + print(sane_color(x[i:i+16])) + i += 16 + +@conf.commands.register +def linehexdump(x, onlyasc=0, onlyhex=0): + if type(x) is not str and type(x) is not bytes: + try: + x=bytes(x) + except: + x = str(x) + l = len(x) + if not onlyasc: + for i in range(l): + print("%02X" % orb(x[i]), end = " ") + print("", end = " ") + if not onlyhex: + print(sane_color(x)) + +def chexdump(x): + if type(x) is not str and type(x) is not bytes: + try: + x=bytes(x) + except: + x = str(x) + print(", ".join(map(lambda x: "%#04x"%orb(x), x))) + +def hexstr(x, onlyasc=0, onlyhex=0): + s = [] + if not onlyasc: + s.append(" ".join(map(lambda x:"%02x"%orb(x), x))) + if not onlyhex: + s.append(sane(x)) + return " ".join(s) + + +@conf.commands.register +def hexdiff(x,y): + """Show differences between 2 binary strings""" + x=any2b(x)[::-1] + y=any2b(y)[::-1] + SUBST=1 + INSERT=1 + d={} + d[-1,-1] = 0,(-1,-1) + for j in range(len(y)): + d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1) + for i in range(len(x)): + d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1) + + for j in range(len(y)): + for i in range(len(x)): + d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ), + ( d[i-1,j][0]+INSERT, (i-1,j) ), + ( d[i,j-1][0]+INSERT, (i,j-1) ) ) + + + backtrackx = [] + backtracky = [] + i=len(x)-1 + j=len(y)-1 + while not (i == j == -1): + i2,j2 = d[i,j][1] + backtrackx.append(x[i2+1:i+1]) + backtracky.append(y[j2+1:j+1]) + i,j = i2,j2 + + + + x = y = i = 0 + colorize = { 0: lambda x:x, + -1: conf.color_theme.left, + 1: conf.color_theme.right } + + dox=1 + doy=0 + l = len(backtrackx) + while i < l: + separate=0 + linex = backtrackx[i:i+16] + liney = backtracky[i:i+16] + xx = sum(len(k) for k in linex) + yy = sum(len(k) for k in liney) + if dox and not xx: + dox = 0 + doy = 1 + if dox and linex == liney: + doy=1 + + if dox: + xd = y + j = 0 + while not linex[j]: + j += 1 + xd -= 1 + print(colorize[doy-dox]("%04x" % xd), end = " ") + x += xx + line=linex + else: + print(" ", end = " ") + if doy: + yd = y + j = 0 + while not liney[j]: + j += 1 + yd -= 1 + print(colorize[doy-dox]("%04x" % yd), end = " ") + y += yy + line=liney + else: + print(" ", end = " ") + + print(" ", end = " ") + + cl = "" + for j in range(16): + if i+j < l: + if line[j]: + col = colorize[(linex[j]!=liney[j])*(doy-dox)] + print(col("%02X" % line[j][0]), end = " ") + if linex[j]==liney[j]: + cl += sane_color(line[j]) + else: + cl += col(sane(line[j])) + else: + print(" ", end = " ") + cl += " " + else: + print(" ", end = " ") + if j == 7: + print("", end = " ") + + + print(" ",cl) + + if doy or not yy: + doy=0 + dox=1 + i += 16 + else: + if yy: + dox=0 + doy=1 + else: + i += 16 + + +crc32 = zlib.crc32 + +if struct.pack("H",1) == b"\x00\x01": # big endian + def checksum(pkt): + if len(pkt) % 2 == 1: + pkt += b"\0" + s = sum(array.array("H", pkt)) + s = (s >> 16) + (s & 0xffff) + s += s >> 16 + s = ~s + return s & 0xffff +else: + def checksum(pkt): + if len(pkt) % 2 == 1: + pkt += b"\0" + s = sum(array.array("H", pkt)) + s = (s >> 16) + (s & 0xffff) + s += s >> 16 + s = ~s + return (((s>>8)&0xff)|s<<8) & 0xffff + +def warning(x): + log_runtime.warning(x) + +def mac2str(mac): + #return "".join(map(lambda x: chr(int(x,16)), mac.split(":"))) + if type(mac) != str: + mac = mac.decode('ascii') + return b''.join([ bytes([int(i, 16)]) for i in mac.split(":") ]) + +def str2mac(s): + return ("%02x:"*6)[:-1] % tuple(s) + +def strxor(x,y): + #return "".join(map(lambda i,j:chr(ord(i)^ord(j)),x,y)) + return bytes([ i[0] ^ i[1] for i in zip(x,y) ] ) + +# Workarround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 +try: + socket.inet_aton("255.255.255.255") +except socket.error: + def inet_aton(x): + if x == "255.255.255.255": + return b"\xff"*4 + else: + return socket.inet_aton(x) +else: + inet_aton = socket.inet_aton + +inet_ntoa = socket.inet_ntoa +try: + inet_ntop = socket.inet_ntop + inet_pton = socket.inet_pton +except AttributeError: + from scapy.pton_ntop import * + log_loading.info("inet_ntop/pton functions not found. Python IPv6 support not present") + + +def atol(x): + try: + ip = inet_aton(x) + except socket.error: + ip = inet_aton(socket.gethostbyname(x)) + return struct.unpack("!I", ip)[0] +def ltoa(x): + return inet_ntoa(struct.pack("!I", x&0xffffffff)) + +def itom(x): + return (0xffffffff00000000>>x)&0xffffffff + +def do_graph(graph,prog=None,format='png',target=None,string=False,options=None, figsize = (12, 12), **kargs): + """do_graph(graph, prog=conf.prog.dot, format="png", + target=None, options=None, string=False): + if networkx library is available and graph is instance of Graph, use networkx.draw + + string: if not False, simply return the graph string + graph: GraphViz graph description + format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option. Ignored if target==None + target: filename. If None uses matplotlib to display + prog: which graphviz program to use + options: options to be passed to prog""" + + from scapy.arch import NETWORKX + if NETWORKX: + import networkx as nx + + if NETWORKX and isinstance(graph, nx.Graph): + nx.draw(graph, with_labels = True, edge_color = '0.75', **kargs) + else: # otherwise use dot as in scapy 2.x + if string: + return graph + if prog is None: + prog = conf.prog.dot + + if not target or not format: + format = 'png' + format = "-T %s" % format + + p = subprocess.Popen("%s %s %s" % (prog,options or "", format or ""), shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE) + w, r = p.stdin, p.stdout + w.write(graph.encode('utf-8')) + w.close() + if target: + with open(target, 'wb') as f: + f.write(r.read()) + else: + try: + import matplotlib.image as mpimg + import matplotlib.pyplot as plt + plt.figure(figsize = figsize) + plt.axis('off') + return plt.imshow(mpimg.imread(r, format = format), **kargs) + + except ImportError: + warning('matplotlib.image required for interactive graph viewing. Use target option to write to a file') + +_TEX_TR = { + "{":"{\\tt\\char123}", + "}":"{\\tt\\char125}", + "\\":"{\\tt\\char92}", + "^":"\\^{}", + "$":"\\$", + "#":"\\#", + "~":"\\~", + "_":"\\_", + "&":"\\&", + "%":"\\%", + "|":"{\\tt\\char124}", + "~":"{\\tt\\char126}", + "<":"{\\tt\\char60}", + ">":"{\\tt\\char62}", + } + +def tex_escape(x): + s = "" + for c in x: + s += _TEX_TR.get(c,c) + return s + +def colgen(*lstcol,**kargs): + """Returns a generator that mixes provided quantities forever + trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" + if len(lstcol) < 2: + lstcol *= 2 + trans = kargs.get("trans", lambda x,y,z: (x,y,z)) + while 1: + for i in range(len(lstcol)): + for j in range(len(lstcol)): + for k in range(len(lstcol)): + if i != j or j != k or k != i: + yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)]) + +def incremental_label(label="tag%05i", start=0): + while True: + yield label % start + start += 1 + +######################### +#### Enum management #### +######################### + +class EnumElement: + _value=None + def __init__(self, key, value): + self._key = key + self._value = value + def __repr__(self): + return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) + def __getattr__(self, attr): + return getattr(self._value, attr) + def __str__(self): + return self._key + def __eq__(self, other): + #return self._value == int(other) + return self._value == hash(other) + def __hash__(self): + return self._value + + +class Enum_metaclass(type): + element_class = EnumElement + def __new__(cls, name, bases, dct): + rdict={} + for k,v in dct.items(): + if type(v) is int: + v = cls.element_class(k,v) + dct[k] = v + rdict[v] = k + dct["__rdict__"] = rdict + return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) + def __getitem__(self, attr): + return self.__rdict__[attr] + def __contains__(self, val): + return val in self.__rdict__ + def get(self, attr, val=None): + return self._rdict__.get(attr, val) + def __repr__(self): + return "<%s>" % self.__dict__.get("name", self.__name__) + + + +################### +## Object saving ## +################### + + +def export_object(obj): + import dill as pickle + import base64 + return base64.b64encode(gzip.zlib.compress(pickle.dumps(obj,4),9)).decode('utf-8') + + +def import_object(obj): + import dill as pickle + import base64 +# if obj is None: +# obj = sys.stdin.read().strip().encode('utf-8') + if obj is str: + obj = obj.strip().encode('utf-8') + return pickle.loads(gzip.zlib.decompress(base64.b64decode(obj))) + + +def save_object(fname, obj): + import dill as pickle + pickle.dump(obj,gzip.open(fname,"wb")) + +def load_object(fname): + import dill as pickle + return pickle.load(gzip.open(fname,"rb")) + +@conf.commands.register +def corrupt_bytes(s, p=0.01, n=None): + """Corrupt a given percentage or number of bytes from bytes""" + s = str2bytes(s) + s = array.array("B",s) + l = len(s) + if n is None: + n = max(1,int(l*p)) + for i in random.sample(range(l), n): + s[i] = (s[i]+random.randint(1,255))%256 + return s.tobytes() + +@conf.commands.register +def corrupt_bits(s, p=0.01, n=None): + """Flip a given percentage or number of bits from bytes""" + s = str2bytes(s) + s = array.array("B",s) + l = len(s)*8 + if n is None: + n = max(1, int(l*p)) + for i in random.sample(range(l), n): + s[i//8] ^= 1 << (i%8) + return s.tobytes() + + +############################# +## pcap capture file stuff ## +############################# + +@conf.commands.register +def wrpcap(filename, pkt, *args, **kargs): + """Write a list of packets to a pcap file +gz: set to 1 to save a gzipped capture +linktype: force linktype value +endianness: "<" or ">", force endianness""" + with PcapWriter(filename, *args, **kargs) as pcap: + pcap.write(pkt) + +@conf.commands.register +def rdpcap(filename, count=-1): + """Read a pcap file and return a packet list +count: read only <count> packets""" + with PcapReader(filename) as pcap: + return pcap.read_all(count=count) + +class RawPcapReader: + """A stateful pcap reader. Each packet is returned as bytes""" + def __init__(self, filename): + self.filename = filename + try: + if not stat.S_ISREG(os.stat(filename).st_mode): + raise IOError("GZIP detection works only for regular files") + self.f = gzip.open(filename,"rb") + magic = self.f.read(4) + except IOError: + self.f = open(filename,"rb") + magic = self.f.read(4) + if magic == b"\xa1\xb2\xc3\xd4": #big endian + self.endian = ">" + self.reader = _RawPcapOldReader(self.f, self.endian) + elif magic == b"\xd4\xc3\xb2\xa1": #little endian + self.endian = "<" + self.reader = _RawPcapOldReader(self.f, self.endian) + elif magic == b"\x0a\x0d\x0d\x0a": #PcapNG + self.reader = _RawPcapNGReader(self.f) + else: + raise Scapy_Exception("Not a pcap capture file (bad magic)") + + def __enter__(self): + return self.reader + + def __exit__(self, exc_type, exc_value, tracback): + self.close() + + def __iter__(self): + return self.reader.__iter__() + + def dispatch(self, callback): + """call the specified callback routine for each packet read + + This is just a convienience function for the main loop + that allows for easy launching of packet processing in a + thread. + """ + for p in self: + callback(p) + + def read_all(self,count=-1): + """return a list of all packets in the pcap file + """ + res=[] + while count != 0: + count -= 1 + p = self.read_packet() + if p is None: + break + res.append(p) + return res + + def recv(self, size=MTU): + """ Emulate a socket + """ + return self.read_packet(size)[0] + + def fileno(self): + return self.f.fileno() + + def close(self): + return self.f.close() + + def read_packet(self, size = MTU): + return self.reader.read_packet(size) + +def align32(n): + return n + (4 - n % 4) % 4 + +class _RawPcapNGReader: + def __init__(self, filep): + self.filep = filep + self.filep.seek(0, 0) + self.endian = '<' + self.tsresol = 6 + self.linktype = None + + def __iter__(self): + return self + + def __next__(self): + """implement the iterator protocol on a set of packets in a pcapng file""" + pkt = self.read_packet() + if pkt == None: + raise StopIteration + return pkt + + def read_packet(self, size = MTU): + while True: + buf = self._read_bytes(4, check = False) + if len(buf) == 0: + return None + elif len(buf) != 4: + raise IOError("PacketNGReader: Premature end of file") + block_type, = struct.unpack(self.endian + 'i', buf) + if block_type == 168627466: #Section Header b'\x0a\x0d\x0d\x0a' + self.read_section_header() + elif block_type == 1: + self.read_interface_description() + elif block_type == 6: + return self.read_enhanced_packet(size) + else: + warning("PacketNGReader: Unparsed block type %d/#%x" % (block_type, block_type)) + self.read_generic_block() + + def _read_bytes(self, n, check = True): + buf = self.filep.read(n) + if check and len(buf) < n: + raise IOError("PacketNGReader: Premature end of file") + return buf + + def read_generic_block(self): + block_length, = struct.unpack(self.endian + 'I', self._read_bytes(4)) + self._read_bytes(block_length - 12) + self._check_length(block_length) + + def read_section_header(self): + buf = self._read_bytes(16) + if buf[4:8] == b'\x1a\x2b\x3c\x4d': + self.endian = '>' + elif buf[4:8] == b'\x4d\x3c\x2b\x1a': + self.endian = '<' + else: + raise Scapy_Exception('Cannot read byte order value') + block_length, _, major_version, minor_version, section_length = struct.unpack(self.endian + 'IIHHi', buf) + options = self._read_bytes(block_length - 24) + if options: + opt = self.parse_options(options) + for i in opt.keys(): + if not i & (0b1 << 15): + warning("PcapNGReader: Unparsed option %d/#%x in section header" % (i, i)) + self._check_length(block_length) + + def read_interface_description(self): + buf = self._read_bytes(12) + block_length, self.linktype, reserved, self.snaplen = struct.unpack(self.endian + 'IHHI', buf) + options = self._read_bytes(block_length - 20) + if options: + opt = self.parse_options(options) + for i in opt.keys(): + if 9 in opt: + self.tsresol = opt[9][0] + elif not i & (0b1 << 15): + warning("PcapNGReader: Unparsed option %d/#%x in enhanced packet block" % (i, i)) + try: + self.LLcls = conf.l2types[self.linktype] + except KeyError: + warning("RawPcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) + self.LLcls = conf.raw_layer + + self._check_length(block_length) + + def read_enhanced_packet(self, size = MTU): + buf = self._read_bytes(24) + block_length, interface, ts_high, ts_low, caplen, wirelen = struct.unpack(self.endian + 'IIIIII', buf) + timestamp = (ts_high << 32) + ts_low + + pkt = self._read_bytes(align32(caplen))[:caplen] + options = self._read_bytes(block_length - align32(caplen) - 32) + if options: + opt = self.parse_options(options) + for i in opt.keys(): + if not i & (0b1 << 15): + warning("PcapNGReader: Unparsed option %d/#%x in enhanced packet block" % (i, i)) + self._check_length(block_length) + return pkt[:MTU], (self.parse_sec(timestamp), self.parse_usec(timestamp), wirelen) + + def parse_sec(self, t): + if self.tsresol & 0b10000000: + return t >> self.tsresol + else: + return t // pow(10, self.tsresol) + + def parse_usec(self, t): + if self.tsresol & 0b10000000: + return t & (1 << self.tsresol) - 1 + else: + return t % pow(10, self.tsresol) + + def parse_options(self, opt): + buf = opt + options = {} + while buf: + opt_type, opt_len = struct.unpack(self.endian + 'HH', buf[:4]) + if opt_type == 0: + return options + options[opt_type] = buf[4:4 + opt_len] + buf = buf[ 4 + align32(opt_len):] + return options + + def _check_length(self, block_length): + check_length, = struct.unpack(self.endian + 'I', self._read_bytes(4)) + if check_length != block_length: + raise Scapy_Exception('Block length values are not equal') + +class _RawPcapOldReader: + def __init__(self, filep, endianness): + self.endian = endianness + self.f = filep + hdr = self.f.read(20) + if len(hdr)<20: + raise Scapy_Exception("Invalid pcap file (too short)") + vermaj,vermin,tz,sig,snaplen,linktype = struct.unpack(self.endian+"HHIIII",hdr) + + self.linktype = linktype + try: + self.LLcls = conf.l2types[self.linktype] + except KeyError: + warning("RawPcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) + self.LLcls = conf.raw_layer + + def __iter__(self): + return self + + def __next__(self): + """implement the iterator protocol on a set of packets in a pcap file""" + pkt = self.read_packet() + if pkt == None: + raise StopIteration + return pkt + + def read_packet(self, size=MTU): + """return a single packet read from the file + bytes, (sec, #timestamp seconds + usec, #timestamp microseconds + wirelen) #actual length of packet + returns None when no more packets are available + """ + hdr = self.f.read(16) + if len(hdr) < 16: + return None + sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr) + s = self.f.read(caplen)[:MTU] + return s,(sec,usec,wirelen) # caplen = len(s) + + +class PcapReader(RawPcapReader): + def __init__(self, filename): + RawPcapReader.__init__(self, filename) + def __enter__(self): + return self + def __iter__(self): + return self + def __next__(self): + """implement the iterator protocol on a set of packets in a pcap file""" + pkt = self.read_packet() + if pkt == None: + raise StopIteration + return pkt + def read_packet(self, size=MTU): + rp = RawPcapReader.read_packet(self,size) + if rp is None: + return None + s,(sec,usec,wirelen) = rp + + + try: + p = self.reader.LLcls(s) + except KeyboardInterrupt: + raise + except: + if conf.debug_dissector: + raise + p = conf.raw_layer(s) + p.time = sec+0.000001*usec + p.wirelen = wirelen + return p + + def read_all(self,count=-1): + res = RawPcapReader.read_all(self, count) + import scapy.plist + return scapy.plist.PacketList(res,name = os.path.basename(self.filename)) + def recv(self, size=MTU): + return self.read_packet(size) + + +class RawPcapWriter: + """A stream PCAP writer with more control than wrpcap()""" + def __init__(self, filename, linktype=None, gz=False, endianness="", append=False, sync=False): + """ + linktype: force linktype to a given value. If None, linktype is taken + from the first writter packet + gz: compress the capture on the fly + endianness: force an endianness (little:"<", big:">"). Default is native + append: append packets to the capture file instead of truncating it + sync: do not bufferize writes to the capture file + """ + + self.linktype = linktype + self.header_present = 0 + self.append=append + self.gz = gz + self.endian = endianness + self.filename=filename + self.sync=sync + bufsz=4096 + if sync: + bufsz=0 + + self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz) + + def fileno(self): + return self.f.fileno() + + def _write_header(self, pkt): + self.header_present=1 + + if self.append: + # Even if prone to race conditions, this seems to be + # safest way to tell whether the header is already present + # because we have to handle compressed streams that + # are not as flexible as basic files + g = [open,gzip.open][self.gz](self.filename,"rb") + if g.read(16): + return + + self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b2c3d4, + 2, 4, 0, 0, MTU, self.linktype)) + self.f.flush() + + + def write(self, pkt): + """accepts a either a single packet or a list of packets + to be written to the dumpfile + """ + if not self.header_present: + self._write_header(pkt) + if type(pkt) is bytes: + self._write_packet(pkt) + else: + for p in pkt: + self._write_packet(p) + + def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None): + """writes a single packet to the pcap file + """ + if caplen is None: + caplen = len(packet) + if wirelen is None: + wirelen = caplen + if sec is None or usec is None: + t=time.time() + it = int(t) + if sec is None: + sec = it + if usec is None: + usec = int(round((t-it)*1000000)) + self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen)) + self.f.write(packet) + if self.gz and self.sync: + self.f.flush() + + def flush(self): + return self.f.flush() + + def close(self): + return self.f.close() + + def __enter__(self): + return self + def __exit__(self, exc_type, exc_value, tracback): + self.flush() + self.close() + + +class PcapWriter(RawPcapWriter): + def _write_header(self, pkt): + if self.linktype == None: + if type(pkt) is list or type(pkt) is tuple or isinstance(pkt,BasePacketList): + pkt = pkt[0] + try: + self.linktype = conf.l2types[pkt.__class__] + except KeyError: + warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)" % pkt.__class__.__name__) + self.linktype = 1 + RawPcapWriter._write_header(self, pkt) + + def _write_packet(self, packet): + try: + sec = int(packet.time) + usec = int(round((packet.time-sec)*1000000)) + s = bytes(packet) + caplen = len(s) + RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) + except Exception as e: + log_interactive.error(e) + def write(self, pkt): + """accepts a either a single packet or a list of packets + to be written to the dumpfile + """ + if not self.header_present: + self._write_header(pkt) + if isinstance(pkt, BasePacket): + self._write_packet(pkt) + else: + for p in pkt: + self._write_packet(p) + + +re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") + +def import_hexcap(): + p = "" + try: + while 1: + l = raw_input().strip() + try: + p += re_extract_hexcap.match(l).groups()[2] + except: + warning("Parsing error during hexcap") + continue + except EOFError: + pass + + p = p.replace(" ","") + return p.decode("hex") + + + +@conf.commands.register +def wireshark(pktlist, *args): + """Run wireshark on a list of packets""" + fname = get_temp_file() + wrpcap(fname, pktlist) + subprocess.Popen([conf.prog.wireshark, "-r", fname] + list(args)) + +@conf.commands.register +def tdecode(pkt, *args): + """Run tshark to decode and display the packet. If no args defined uses -V""" + if not args: + args = [ "-V" ] + fname = get_temp_file() + wrpcap(fname,[pkt]) + subprocess.call(["tshark", "-r", fname] + list(args)) + +@conf.commands.register +def hexedit(x): + """Run external hex editor on a packet or bytes. Set editor in conf.prog.hexedit""" + x = bytes(x) + fname = get_temp_file() + with open(fname,"wb") as f: + f.write(x) + subprocess.call([conf.prog.hexedit, fname]) + with open(fname, "rb") as f: + x = f.read() + return x + +def __make_table(yfmtfunc, fmtfunc, endline, items, fxyz, sortx=None, sorty=None, seplinefunc=None): + vx = {} + vy = {} + vz = {} + vxf = {} + vyf = {} + max_length = 0 + for record in items: + xx,yy,zz = map(str, fxyz(record[0], record[1])) + max_length = max(len(yy),max_length) + vx[xx] = max(vx.get(xx,0), len(xx), len(zz)) + vy[yy] = None + vz[(xx,yy)] = zz + + vxk = list(vx.keys()) + vyk = list(vy.keys()) + if sortx: + vxk.sort(sortx) + else: + try: + vxk.sort(key = lambda x: atol(x)) + except: + vxk.sort() + if sorty: + vyk.sort(sorty) + else: + try: + vyk.sort(key = lambda x: atol(x)) + except: + vyk.sort() + + + if seplinefunc: + sepline = seplinefunc(max_length, [vx[x] for x in vxk]) + print(sepline) + + fmt = yfmtfunc(max_length) + print(fmt % "", end = " ") + for x in vxk: + vxf[x] = fmtfunc(vx[x]) + print(vxf[x] % x, end = " ") + print(endline) + if seplinefunc: + print(sepline) + for y in vyk: + print(fmt % y, end = " ") + for x in vxk: + print(vxf[x] % vz.get((x,y), "-"), end = " ") + print(endline) + if seplinefunc: + print(sepline) + +def make_table(*args, **kargs): + __make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs) + +def make_lined_table(*args, **kargs): + __make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "", + seplinefunc=lambda max_length,x:"+".join([ "-"*(y+2) for y in [max_length-1]+x+[-2]]), + *args, **kargs) + +def make_tex_table(*args, **kargs): + __make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs) + |