diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/utils.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/utils.py | 1054 |
1 files changed, 0 insertions, 1054 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/utils.py b/scripts/external_libs/scapy-python3-0.18/scapy/utils.py deleted file mode 100644 index 252109bb..00000000 --- a/scripts/external_libs/scapy-python3-0.18/scapy/utils.py +++ /dev/null @@ -1,1054 +0,0 @@ -## This file is part of Scapy -## See http://www.secdev.org/projects/scapy for more informations -## Copyright (C) Philippe Biondi <phil@secdev.org> -## This program is published under a GPLv2 license - -""" -General utility functions. -""" - -import os,sys,socket,types -import random,time -import gzip,zlib -import re,struct,array,stat -import subprocess - -import warnings -warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__) - -from .config import conf -from .data import MTU -from .error import log_runtime,log_loading,log_interactive, Scapy_Exception -from .base_classes import BasePacketList,BasePacket - - -WINDOWS=sys.platform.startswith("win32") - -########### -## Tools ## -########### - -def get_temp_file(keep=False, autoext=""): - import tempfile - fd, fname = tempfile.mkstemp(suffix = ".scapy" + autoext) - os.close(fd) - if not keep: - conf.temp_files.append(fname) - return fname - -def str2bytes(x): - """Convert input argument to bytes""" - if type(x) is bytes: - return x - elif type(x) is str: - return bytes([ ord(i) for i in x ]) - else: - return str2bytes(str(x)) - -def chb(x): - if type(x) is str: - return x - else: - return chr(x) - -def orb(x): - if type(x) is str: - return ord(x) - else: - return x - -def any2b(x): - if type(x) is not str and type(x) is not bytes: - try: - x=bytes(x) - except: - x = str(x) - if type(x) is str: - x = bytes([ ord(i) for i in x ]) - return x - -def sane_color(x): - r="" - for i in x: - j = orb(i) - if (j < 32) or (j >= 127): - r=r+conf.color_theme.not_printable(".") - else: - r=r+chb(i) - return r - -def sane(x): - r="" - for i in x: - if type(x) is str: - j = ord(i) - else: - j = i - if (j < 32) or (j >= 127): - r=r+"." - else: - r=r+chb(i) - return r - -def lhex(x): - if type(x) is int: - return hex(x) - elif type(x) is tuple: - return "(%s)" % ", ".join(map(lhex, x)) - elif type(x) is list: - return "[%s]" % ", ".join(map(lhex, x)) - else: - return x - -@conf.commands.register -def hexdump(x): - if type(x) is not str and type(x) is not bytes: - try: - x=bytes(x) - except: - x = str(x) - l = len(x) - i = 0 - while i < l: - print("%04x " % i,end = " ") - for j in range(16): - if i+j < l: - print("%02X" % orb(x[i+j]), end = " ") - else: - print(" ", end = " ") - if j%16 == 7: - print("", end = " ") - print(" ", end = " ") - print(sane_color(x[i:i+16])) - i += 16 - -@conf.commands.register -def linehexdump(x, onlyasc=0, onlyhex=0): - if type(x) is not str and type(x) is not bytes: - try: - x=bytes(x) - except: - x = str(x) - l = len(x) - if not onlyasc: - for i in range(l): - print("%02X" % orb(x[i]), end = " ") - print("", end = " ") - if not onlyhex: - print(sane_color(x)) - -def chexdump(x): - if type(x) is not str and type(x) is not bytes: - try: - x=bytes(x) - except: - x = str(x) - print(", ".join(map(lambda x: "%#04x"%orb(x), x))) - -def hexstr(x, onlyasc=0, onlyhex=0): - s = [] - if not onlyasc: - s.append(" ".join(map(lambda x:"%02x"%orb(x), x))) - if not onlyhex: - s.append(sane(x)) - return " ".join(s) - - -@conf.commands.register -def hexdiff(x,y): - """Show differences between 2 binary strings""" - x=any2b(x)[::-1] - y=any2b(y)[::-1] - SUBST=1 - INSERT=1 - d={} - d[-1,-1] = 0,(-1,-1) - for j in range(len(y)): - d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1) - for i in range(len(x)): - d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1) - - for j in range(len(y)): - for i in range(len(x)): - d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ), - ( d[i-1,j][0]+INSERT, (i-1,j) ), - ( d[i,j-1][0]+INSERT, (i,j-1) ) ) - - - backtrackx = [] - backtracky = [] - i=len(x)-1 - j=len(y)-1 - while not (i == j == -1): - i2,j2 = d[i,j][1] - backtrackx.append(x[i2+1:i+1]) - backtracky.append(y[j2+1:j+1]) - i,j = i2,j2 - - - - x = y = i = 0 - colorize = { 0: lambda x:x, - -1: conf.color_theme.left, - 1: conf.color_theme.right } - - dox=1 - doy=0 - l = len(backtrackx) - while i < l: - separate=0 - linex = backtrackx[i:i+16] - liney = backtracky[i:i+16] - xx = sum(len(k) for k in linex) - yy = sum(len(k) for k in liney) - if dox and not xx: - dox = 0 - doy = 1 - if dox and linex == liney: - doy=1 - - if dox: - xd = y - j = 0 - while not linex[j]: - j += 1 - xd -= 1 - print(colorize[doy-dox]("%04x" % xd), end = " ") - x += xx - line=linex - else: - print(" ", end = " ") - if doy: - yd = y - j = 0 - while not liney[j]: - j += 1 - yd -= 1 - print(colorize[doy-dox]("%04x" % yd), end = " ") - y += yy - line=liney - else: - print(" ", end = " ") - - print(" ", end = " ") - - cl = "" - for j in range(16): - if i+j < l: - if line[j]: - col = colorize[(linex[j]!=liney[j])*(doy-dox)] - print(col("%02X" % line[j][0]), end = " ") - if linex[j]==liney[j]: - cl += sane_color(line[j]) - else: - cl += col(sane(line[j])) - else: - print(" ", end = " ") - cl += " " - else: - print(" ", end = " ") - if j == 7: - print("", end = " ") - - - print(" ",cl) - - if doy or not yy: - doy=0 - dox=1 - i += 16 - else: - if yy: - dox=0 - doy=1 - else: - i += 16 - - -crc32 = zlib.crc32 - -if struct.pack("H",1) == b"\x00\x01": # big endian - def checksum(pkt): - if len(pkt) % 2 == 1: - pkt += b"\0" - s = sum(array.array("H", pkt)) - s = (s >> 16) + (s & 0xffff) - s += s >> 16 - s = ~s - return s & 0xffff -else: - def checksum(pkt): - if len(pkt) % 2 == 1: - pkt += b"\0" - s = sum(array.array("H", pkt)) - s = (s >> 16) + (s & 0xffff) - s += s >> 16 - s = ~s - return (((s>>8)&0xff)|s<<8) & 0xffff - -def warning(x): - log_runtime.warning(x) - -def mac2str(mac): - #return "".join(map(lambda x: chr(int(x,16)), mac.split(":"))) - if type(mac) != str: - mac = mac.decode('ascii') - return b''.join([ bytes([int(i, 16)]) for i in mac.split(":") ]) - -def str2mac(s): - return ("%02x:"*6)[:-1] % tuple(s) - -def strxor(x,y): - #return "".join(map(lambda i,j:chr(ord(i)^ord(j)),x,y)) - return bytes([ i[0] ^ i[1] for i in zip(x,y) ] ) - -# Workarround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 -try: - socket.inet_aton("255.255.255.255") -except socket.error: - def inet_aton(x): - if x == "255.255.255.255": - return b"\xff"*4 - else: - return socket.inet_aton(x) -else: - inet_aton = socket.inet_aton - -inet_ntoa = socket.inet_ntoa -try: - inet_ntop = socket.inet_ntop - inet_pton = socket.inet_pton -except AttributeError: - from scapy.pton_ntop import * - log_loading.info("inet_ntop/pton functions not found. Python IPv6 support not present") - - -def atol(x): - try: - ip = inet_aton(x) - except socket.error: - ip = inet_aton(socket.gethostbyname(x)) - return struct.unpack("!I", ip)[0] -def ltoa(x): - return inet_ntoa(struct.pack("!I", x&0xffffffff)) - -def itom(x): - return (0xffffffff00000000>>x)&0xffffffff - -def do_graph(graph,prog=None,format='png',target=None,string=False,options=None, figsize = (12, 12), **kargs): - """do_graph(graph, prog=conf.prog.dot, format="png", - target=None, options=None, string=False): - if networkx library is available and graph is instance of Graph, use networkx.draw - - string: if not False, simply return the graph string - graph: GraphViz graph description - format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option. Ignored if target==None - target: filename. If None uses matplotlib to display - prog: which graphviz program to use - options: options to be passed to prog""" - - from scapy.arch import NETWORKX - if NETWORKX: - import networkx as nx - - if NETWORKX and isinstance(graph, nx.Graph): - nx.draw(graph, with_labels = True, edge_color = '0.75', **kargs) - else: # otherwise use dot as in scapy 2.x - if string: - return graph - if prog is None: - prog = conf.prog.dot - - if not target or not format: - format = 'png' - format = "-T %s" % format - - p = subprocess.Popen("%s %s %s" % (prog,options or "", format or ""), shell = True, stdin = subprocess.PIPE, stdout = subprocess.PIPE) - w, r = p.stdin, p.stdout - w.write(graph.encode('utf-8')) - w.close() - if target: - with open(target, 'wb') as f: - f.write(r.read()) - else: - try: - import matplotlib.image as mpimg - import matplotlib.pyplot as plt - plt.figure(figsize = figsize) - plt.axis('off') - return plt.imshow(mpimg.imread(r, format = format), **kargs) - - except ImportError: - warning('matplotlib.image required for interactive graph viewing. Use target option to write to a file') - -_TEX_TR = { - "{":"{\\tt\\char123}", - "}":"{\\tt\\char125}", - "\\":"{\\tt\\char92}", - "^":"\\^{}", - "$":"\\$", - "#":"\\#", - "~":"\\~", - "_":"\\_", - "&":"\\&", - "%":"\\%", - "|":"{\\tt\\char124}", - "~":"{\\tt\\char126}", - "<":"{\\tt\\char60}", - ">":"{\\tt\\char62}", - } - -def tex_escape(x): - s = "" - for c in x: - s += _TEX_TR.get(c,c) - return s - -def colgen(*lstcol,**kargs): - """Returns a generator that mixes provided quantities forever - trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" - if len(lstcol) < 2: - lstcol *= 2 - trans = kargs.get("trans", lambda x,y,z: (x,y,z)) - while 1: - for i in range(len(lstcol)): - for j in range(len(lstcol)): - for k in range(len(lstcol)): - if i != j or j != k or k != i: - yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)]) - -def incremental_label(label="tag%05i", start=0): - while True: - yield label % start - start += 1 - -######################### -#### Enum management #### -######################### - -class EnumElement: - _value=None - def __init__(self, key, value): - self._key = key - self._value = value - def __repr__(self): - return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) - def __getattr__(self, attr): - return getattr(self._value, attr) - def __str__(self): - return self._key - def __eq__(self, other): - #return self._value == int(other) - return self._value == hash(other) - def __hash__(self): - return self._value - - -class Enum_metaclass(type): - element_class = EnumElement - def __new__(cls, name, bases, dct): - rdict={} - for k,v in dct.items(): - if type(v) is int: - v = cls.element_class(k,v) - dct[k] = v - rdict[v] = k - dct["__rdict__"] = rdict - return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) - def __getitem__(self, attr): - return self.__rdict__[attr] - def __contains__(self, val): - return val in self.__rdict__ - def get(self, attr, val=None): - return self._rdict__.get(attr, val) - def __repr__(self): - return "<%s>" % self.__dict__.get("name", self.__name__) - - - -################### -## Object saving ## -################### - - -def export_object(obj): - import dill as pickle - import base64 - return base64.b64encode(gzip.zlib.compress(pickle.dumps(obj,4),9)).decode('utf-8') - - -def import_object(obj): - import dill as pickle - import base64 -# if obj is None: -# obj = sys.stdin.read().strip().encode('utf-8') - if obj is str: - obj = obj.strip().encode('utf-8') - return pickle.loads(gzip.zlib.decompress(base64.b64decode(obj))) - - -def save_object(fname, obj): - import dill as pickle - pickle.dump(obj,gzip.open(fname,"wb")) - -def load_object(fname): - import dill as pickle - return pickle.load(gzip.open(fname,"rb")) - -@conf.commands.register -def corrupt_bytes(s, p=0.01, n=None): - """Corrupt a given percentage or number of bytes from bytes""" - s = str2bytes(s) - s = array.array("B",s) - l = len(s) - if n is None: - n = max(1,int(l*p)) - for i in random.sample(range(l), n): - s[i] = (s[i]+random.randint(1,255))%256 - return s.tobytes() - -@conf.commands.register -def corrupt_bits(s, p=0.01, n=None): - """Flip a given percentage or number of bits from bytes""" - s = str2bytes(s) - s = array.array("B",s) - l = len(s)*8 - if n is None: - n = max(1, int(l*p)) - for i in random.sample(range(l), n): - s[i//8] ^= 1 << (i%8) - return s.tobytes() - - -############################# -## pcap capture file stuff ## -############################# - -@conf.commands.register -def wrpcap(filename, pkt, *args, **kargs): - """Write a list of packets to a pcap file -gz: set to 1 to save a gzipped capture -linktype: force linktype value -endianness: "<" or ">", force endianness""" - with PcapWriter(filename, *args, **kargs) as pcap: - pcap.write(pkt) - -@conf.commands.register -def rdpcap(filename, count=-1): - """Read a pcap file and return a packet list -count: read only <count> packets""" - with PcapReader(filename) as pcap: - return pcap.read_all(count=count) - -class RawPcapReader: - """A stateful pcap reader. Each packet is returned as bytes""" - def __init__(self, filename): - self.filename = filename - try: - if not stat.S_ISREG(os.stat(filename).st_mode): - raise IOError("GZIP detection works only for regular files") - self.f = gzip.open(filename,"rb") - magic = self.f.read(4) - except IOError: - self.f = open(filename,"rb") - magic = self.f.read(4) - if magic == b"\xa1\xb2\xc3\xd4": #big endian - self.endian = ">" - self.reader = _RawPcapOldReader(self.f, self.endian) - elif magic == b"\xd4\xc3\xb2\xa1": #little endian - self.endian = "<" - self.reader = _RawPcapOldReader(self.f, self.endian) - elif magic == b"\x0a\x0d\x0d\x0a": #PcapNG - self.reader = _RawPcapNGReader(self.f) - else: - raise Scapy_Exception("Not a pcap capture file (bad magic)") - - def __enter__(self): - return self.reader - - def __exit__(self, exc_type, exc_value, tracback): - self.close() - - def __iter__(self): - return self.reader.__iter__() - - def dispatch(self, callback): - """call the specified callback routine for each packet read - - This is just a convienience function for the main loop - that allows for easy launching of packet processing in a - thread. - """ - for p in self: - callback(p) - - def read_all(self,count=-1): - """return a list of all packets in the pcap file - """ - res=[] - while count != 0: - count -= 1 - p = self.read_packet() - if p is None: - break - res.append(p) - return res - - def recv(self, size=MTU): - """ Emulate a socket - """ - return self.read_packet(size)[0] - - def fileno(self): - return self.f.fileno() - - def close(self): - return self.f.close() - - def read_packet(self, size = MTU): - return self.reader.read_packet(size) - -def align32(n): - return n + (4 - n % 4) % 4 - -class _RawPcapNGReader: - def __init__(self, filep): - self.filep = filep - self.filep.seek(0, 0) - self.endian = '<' - self.tsresol = 6 - self.linktype = None - - def __iter__(self): - return self - - def __next__(self): - """implement the iterator protocol on a set of packets in a pcapng file""" - pkt = self.read_packet() - if pkt == None: - raise StopIteration - return pkt - - def read_packet(self, size = MTU): - while True: - buf = self._read_bytes(4, check = False) - if len(buf) == 0: - return None - elif len(buf) != 4: - raise IOError("PacketNGReader: Premature end of file") - block_type, = struct.unpack(self.endian + 'i', buf) - if block_type == 168627466: #Section Header b'\x0a\x0d\x0d\x0a' - self.read_section_header() - elif block_type == 1: - self.read_interface_description() - elif block_type == 6: - return self.read_enhanced_packet(size) - else: - warning("PacketNGReader: Unparsed block type %d/#%x" % (block_type, block_type)) - self.read_generic_block() - - def _read_bytes(self, n, check = True): - buf = self.filep.read(n) - if check and len(buf) < n: - raise IOError("PacketNGReader: Premature end of file") - return buf - - def read_generic_block(self): - block_length, = struct.unpack(self.endian + 'I', self._read_bytes(4)) - self._read_bytes(block_length - 12) - self._check_length(block_length) - - def read_section_header(self): - buf = self._read_bytes(16) - if buf[4:8] == b'\x1a\x2b\x3c\x4d': - self.endian = '>' - elif buf[4:8] == b'\x4d\x3c\x2b\x1a': - self.endian = '<' - else: - raise Scapy_Exception('Cannot read byte order value') - block_length, _, major_version, minor_version, section_length = struct.unpack(self.endian + 'IIHHi', buf) - options = self._read_bytes(block_length - 24) - if options: - opt = self.parse_options(options) - for i in opt.keys(): - if not i & (0b1 << 15): - warning("PcapNGReader: Unparsed option %d/#%x in section header" % (i, i)) - self._check_length(block_length) - - def read_interface_description(self): - buf = self._read_bytes(12) - block_length, self.linktype, reserved, self.snaplen = struct.unpack(self.endian + 'IHHI', buf) - options = self._read_bytes(block_length - 20) - if options: - opt = self.parse_options(options) - for i in opt.keys(): - if 9 in opt: - self.tsresol = opt[9][0] - elif not i & (0b1 << 15): - warning("PcapNGReader: Unparsed option %d/#%x in enhanced packet block" % (i, i)) - try: - self.LLcls = conf.l2types[self.linktype] - except KeyError: - warning("RawPcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) - self.LLcls = conf.raw_layer - - self._check_length(block_length) - - def read_enhanced_packet(self, size = MTU): - buf = self._read_bytes(24) - block_length, interface, ts_high, ts_low, caplen, wirelen = struct.unpack(self.endian + 'IIIIII', buf) - timestamp = (ts_high << 32) + ts_low - - pkt = self._read_bytes(align32(caplen))[:caplen] - options = self._read_bytes(block_length - align32(caplen) - 32) - if options: - opt = self.parse_options(options) - for i in opt.keys(): - if not i & (0b1 << 15): - warning("PcapNGReader: Unparsed option %d/#%x in enhanced packet block" % (i, i)) - self._check_length(block_length) - return pkt[:MTU], (self.parse_sec(timestamp), self.parse_usec(timestamp), wirelen) - - def parse_sec(self, t): - if self.tsresol & 0b10000000: - return t >> self.tsresol - else: - return t // pow(10, self.tsresol) - - def parse_usec(self, t): - if self.tsresol & 0b10000000: - return t & (1 << self.tsresol) - 1 - else: - return t % pow(10, self.tsresol) - - def parse_options(self, opt): - buf = opt - options = {} - while buf: - opt_type, opt_len = struct.unpack(self.endian + 'HH', buf[:4]) - if opt_type == 0: - return options - options[opt_type] = buf[4:4 + opt_len] - buf = buf[ 4 + align32(opt_len):] - return options - - def _check_length(self, block_length): - check_length, = struct.unpack(self.endian + 'I', self._read_bytes(4)) - if check_length != block_length: - raise Scapy_Exception('Block length values are not equal') - -class _RawPcapOldReader: - def __init__(self, filep, endianness): - self.endian = endianness - self.f = filep - hdr = self.f.read(20) - if len(hdr)<20: - raise Scapy_Exception("Invalid pcap file (too short)") - vermaj,vermin,tz,sig,snaplen,linktype = struct.unpack(self.endian+"HHIIII",hdr) - - self.linktype = linktype - try: - self.LLcls = conf.l2types[self.linktype] - except KeyError: - warning("RawPcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype)) - self.LLcls = conf.raw_layer - - def __iter__(self): - return self - - def __next__(self): - """implement the iterator protocol on a set of packets in a pcap file""" - pkt = self.read_packet() - if pkt == None: - raise StopIteration - return pkt - - def read_packet(self, size=MTU): - """return a single packet read from the file - bytes, (sec, #timestamp seconds - usec, #timestamp microseconds - wirelen) #actual length of packet - returns None when no more packets are available - """ - hdr = self.f.read(16) - if len(hdr) < 16: - return None - sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr) - s = self.f.read(caplen)[:MTU] - return s,(sec,usec,wirelen) # caplen = len(s) - - -class PcapReader(RawPcapReader): - def __init__(self, filename): - RawPcapReader.__init__(self, filename) - def __enter__(self): - return self - def __iter__(self): - return self - def __next__(self): - """implement the iterator protocol on a set of packets in a pcap file""" - pkt = self.read_packet() - if pkt == None: - raise StopIteration - return pkt - def read_packet(self, size=MTU): - rp = RawPcapReader.read_packet(self,size) - if rp is None: - return None - s,(sec,usec,wirelen) = rp - - - try: - p = self.reader.LLcls(s) - except KeyboardInterrupt: - raise - except: - if conf.debug_dissector: - raise - p = conf.raw_layer(s) - p.time = sec+0.000001*usec - p.wirelen = wirelen - return p - - def read_all(self,count=-1): - res = RawPcapReader.read_all(self, count) - import scapy.plist - return scapy.plist.PacketList(res,name = os.path.basename(self.filename)) - def recv(self, size=MTU): - return self.read_packet(size) - - -class RawPcapWriter: - """A stream PCAP writer with more control than wrpcap()""" - def __init__(self, filename, linktype=None, gz=False, endianness="", append=False, sync=False): - """ - linktype: force linktype to a given value. If None, linktype is taken - from the first writter packet - gz: compress the capture on the fly - endianness: force an endianness (little:"<", big:">"). Default is native - append: append packets to the capture file instead of truncating it - sync: do not bufferize writes to the capture file - """ - - self.linktype = linktype - self.header_present = 0 - self.append=append - self.gz = gz - self.endian = endianness - self.filename=filename - self.sync=sync - bufsz=4096 - if sync: - bufsz=0 - - self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz) - - def fileno(self): - return self.f.fileno() - - def _write_header(self, pkt): - self.header_present=1 - - if self.append: - # Even if prone to race conditions, this seems to be - # safest way to tell whether the header is already present - # because we have to handle compressed streams that - # are not as flexible as basic files - g = [open,gzip.open][self.gz](self.filename,"rb") - if g.read(16): - return - - self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b2c3d4, - 2, 4, 0, 0, MTU, self.linktype)) - self.f.flush() - - - def write(self, pkt): - """accepts a either a single packet or a list of packets - to be written to the dumpfile - """ - if not self.header_present: - self._write_header(pkt) - if type(pkt) is bytes: - self._write_packet(pkt) - else: - for p in pkt: - self._write_packet(p) - - def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None): - """writes a single packet to the pcap file - """ - if caplen is None: - caplen = len(packet) - if wirelen is None: - wirelen = caplen - if sec is None or usec is None: - t=time.time() - it = int(t) - if sec is None: - sec = it - if usec is None: - usec = int(round((t-it)*1000000)) - self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen)) - self.f.write(packet) - if self.gz and self.sync: - self.f.flush() - - def flush(self): - return self.f.flush() - - def close(self): - return self.f.close() - - def __enter__(self): - return self - def __exit__(self, exc_type, exc_value, tracback): - self.flush() - self.close() - - -class PcapWriter(RawPcapWriter): - def _write_header(self, pkt): - if self.linktype == None: - if type(pkt) is list or type(pkt) is tuple or isinstance(pkt,BasePacketList): - pkt = pkt[0] - try: - self.linktype = conf.l2types[pkt.__class__] - except KeyError: - warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)" % pkt.__class__.__name__) - self.linktype = 1 - RawPcapWriter._write_header(self, pkt) - - def _write_packet(self, packet): - try: - sec = int(packet.time) - usec = int(round((packet.time-sec)*1000000)) - s = bytes(packet) - caplen = len(s) - RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) - except Exception as e: - log_interactive.error(e) - def write(self, pkt): - """accepts a either a single packet or a list of packets - to be written to the dumpfile - """ - if not self.header_present: - self._write_header(pkt) - if isinstance(pkt, BasePacket): - self._write_packet(pkt) - else: - for p in pkt: - self._write_packet(p) - - -re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") - -def import_hexcap(): - p = "" - try: - while 1: - l = raw_input().strip() - try: - p += re_extract_hexcap.match(l).groups()[2] - except: - warning("Parsing error during hexcap") - continue - except EOFError: - pass - - p = p.replace(" ","") - return p.decode("hex") - - - -@conf.commands.register -def wireshark(pktlist, *args): - """Run wireshark on a list of packets""" - fname = get_temp_file() - wrpcap(fname, pktlist) - subprocess.Popen([conf.prog.wireshark, "-r", fname] + list(args)) - -@conf.commands.register -def tdecode(pkt, *args): - """Run tshark to decode and display the packet. If no args defined uses -V""" - if not args: - args = [ "-V" ] - fname = get_temp_file() - wrpcap(fname,[pkt]) - subprocess.call(["tshark", "-r", fname] + list(args)) - -@conf.commands.register -def hexedit(x): - """Run external hex editor on a packet or bytes. Set editor in conf.prog.hexedit""" - x = bytes(x) - fname = get_temp_file() - with open(fname,"wb") as f: - f.write(x) - subprocess.call([conf.prog.hexedit, fname]) - with open(fname, "rb") as f: - x = f.read() - return x - -def __make_table(yfmtfunc, fmtfunc, endline, items, fxyz, sortx=None, sorty=None, seplinefunc=None): - vx = {} - vy = {} - vz = {} - vxf = {} - vyf = {} - max_length = 0 - for record in items: - xx,yy,zz = map(str, fxyz(record[0], record[1])) - max_length = max(len(yy),max_length) - vx[xx] = max(vx.get(xx,0), len(xx), len(zz)) - vy[yy] = None - vz[(xx,yy)] = zz - - vxk = list(vx.keys()) - vyk = list(vy.keys()) - if sortx: - vxk.sort(sortx) - else: - try: - vxk.sort(key = lambda x: atol(x)) - except: - vxk.sort() - if sorty: - vyk.sort(sorty) - else: - try: - vyk.sort(key = lambda x: atol(x)) - except: - vyk.sort() - - - if seplinefunc: - sepline = seplinefunc(max_length, [vx[x] for x in vxk]) - print(sepline) - - fmt = yfmtfunc(max_length) - print(fmt % "", end = " ") - for x in vxk: - vxf[x] = fmtfunc(vx[x]) - print(vxf[x] % x, end = " ") - print(endline) - if seplinefunc: - print(sepline) - for y in vyk: - print(fmt % y, end = " ") - for x in vxk: - print(vxf[x] % vz.get((x,y), "-"), end = " ") - print(endline) - if seplinefunc: - print(sepline) - -def make_table(*args, **kargs): - __make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs) - -def make_lined_table(*args, **kargs): - __make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "", - seplinefunc=lambda max_length,x:"+".join([ "-"*(y+2) for y in [max_length-1]+x+[-2]]), - *args, **kargs) - -def make_tex_table(*args, **kargs): - __make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs) - |