diff options
Diffstat (limited to 'scripts/external_libs/scapy-python3-0.18/scapy/plist.py')
-rw-r--r-- | scripts/external_libs/scapy-python3-0.18/scapy/plist.py | 517 |
1 files changed, 517 insertions, 0 deletions
diff --git a/scripts/external_libs/scapy-python3-0.18/scapy/plist.py b/scripts/external_libs/scapy-python3-0.18/scapy/plist.py new file mode 100644 index 00000000..bdf0b757 --- /dev/null +++ b/scripts/external_libs/scapy-python3-0.18/scapy/plist.py @@ -0,0 +1,517 @@ +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Philippe Biondi <phil@secdev.org> +## This program is published under a GPLv2 license + +""" +PacketList: holds several packets and allows to do operations on them. +""" + + +import os,subprocess +from .config import conf +from .base_classes import BasePacket,BasePacketList +from collections import defaultdict + +from .utils import do_graph,hexdump,make_table,make_lined_table,make_tex_table,get_temp_file + +from scapy.arch import NETWORKX +if NETWORKX: + import networkx as nx + + +############# +## Results ## +############# + +class PacketList(BasePacketList): + res = [] + def __init__(self, res=None, name="PacketList", stats=None, vector_index = None): + """create a packet list from a list of packets + res: the list of packets + stats: a list of classes that will appear in the stats (defaults to [TCP,UDP,ICMP])""" + if stats is None: + stats = conf.stats_classic_protocols + self.stats = stats + if res is None: + res = [] + if isinstance(res, PacketList): + res = res.res + self.res = res + self.listname = name + self.vector_index = vector_index + def __len__(self): + return len(self.res) + def _elt2pkt(self, elt): + if self.vector_index == None: + return elt + else: + return elt[self.vector_index] + def _elt2sum(self, elt): + if self.vector_index == None: + return elt.summary() + else: + return "%s ==> %s" % (elt[0].summary(),elt[1].summary()) + + def _elt2show(self, elt): + return self._elt2sum(elt) + def __repr__(self): + stats=dict.fromkeys(self.stats,0) + other = 0 + for r in self.res: + f = 0 + for p in stats: + if self._elt2pkt(r).haslayer(p): + stats[p] += 1 + f = 1 + break + if not f: + other += 1 + s = "" + ct = conf.color_theme + for p in self.stats: + s += " %s%s%s" % (ct.packetlist_proto(p.name), + ct.punct(":"), + ct.packetlist_value(stats[p])) + s += " %s%s%s" % (ct.packetlist_proto("Other"), + ct.punct(":"), + ct.packetlist_value(other)) + return "%s%s%s%s%s" % (ct.punct("<"), + ct.packetlist_name(self.listname), + ct.punct(":"), + s, + ct.punct(">")) + def __getattr__(self, attr): + return getattr(self.res, attr) + def __getitem__(self, item): + if isinstance(item,type) and issubclass(item,BasePacket): + #return self.__class__(filter(lambda x: item in self._elt2pkt(x),self.res), + return self.__class__([ x for x in self.res if item in self._elt2pkt(x) ], + name="%s from %s"%(item.__name__,self.listname)) + if type(item) is slice: + return self.__class__(self.res.__getitem__(item), + name = "mod %s" % self.listname) + return self.res.__getitem__(item) + def __getslice__(self, *args, **kargs): + return self.__class__(self.res.__getslice__(*args, **kargs), + name="mod %s"%self.listname) + def __add__(self, other): + return self.__class__(self.res+other.res, + name="%s+%s"%(self.listname,other.listname)) + def summary(self, prn=None, lfilter=None): + """prints a summary of each packet +prn: function to apply to each packet instead of lambda x:x.summary() +lfilter: truth function to apply to each packet to decide whether it will be displayed""" + for r in self.res: + if lfilter is not None: + if not lfilter(r): + continue + if prn is None: + print(self._elt2sum(r)) + else: + print(prn(r)) + def nsummary(self,prn=None, lfilter=None): + """prints a summary of each packet with the packet's number +prn: function to apply to each packet instead of lambda x:x.summary() +lfilter: truth function to apply to each packet to decide whether it will be displayed""" + for i, p in enumerate(self.res): + if lfilter is not None: + if not lfilter(p): + continue + print(conf.color_theme.id(i,fmt="%04i"), end = " ") + if prn is None: + print(self._elt2sum(p)) + else: + print(prn(p)) + def display(self): # Deprecated. Use show() + """deprecated. is show()""" + self.show() + def show(self, *args, **kargs): + """Best way to display the packet list. Defaults to nsummary() method""" + return self.nsummary(*args, **kargs) + + def filter(self, func): + """Returns a packet list filtered by a truth function""" + return self.__class__(list(filter(func,self.res)), + name="filtered %s"%self.listname) + + def plot(self, f, lfilter=None,**kargs): + """Applies a function to each packet to get a value that will be plotted with matplotlib. A matplotlib object is returned + lfilter: a truth function that decides whether a packet must be ploted""" + + return plt.plot([ f(i) for i in self.res if not lfilter or lfilter(i) ], **kargs) + + def diffplot(self, f, delay=1, lfilter=None, **kargs): + """diffplot(f, delay=1, lfilter=None) + Applies a function to couples (l[i],l[i+delay])""" + + return plt.plot([ f(i, j) for i in self.res[:-delay] for j in self.res[delay:] if not lfilter or (lfilter(i) and lfilter(j))], + **kargs) + + def multiplot(self, f, lfilter=None, **kargs): + """Uses a function that returns a label and a value for this label, then plots all the values label by label""" + + d = defaultdict(list) + for i in self.res: + if lfilter and not lfilter(i): + continue + k, v = f(i) + d[k].append(v) + + figure = plt.figure() + ax = figure.add_axes(plt.axes()) + for i in d: + ax.plot(d[i], **kargs) + return figure + + + def rawhexdump(self): + """Prints an hexadecimal dump of each packet in the list""" + for p in self: + hexdump(self._elt2pkt(p)) + + def hexraw(self, lfilter=None): + """Same as nsummary(), except that if a packet has a Raw layer, it will be hexdumped + lfilter: a truth function that decides whether a packet must be displayed""" + for i,p in enumerate(self.res): + p1 = self._elt2pkt(p) + if lfilter is not None and not lfilter(p1): + continue + print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"), + p1.sprintf("%.time%"), + self._elt2sum(p))) + if p1.haslayer(conf.raw_layer): + hexdump(p1.getlayer(conf.raw_layer).load) + + def hexdump(self, lfilter=None): + """Same as nsummary(), except that packets are also hexdumped + lfilter: a truth function that decides whether a packet must be displayed""" + for i,p in enumerate(self.res): + p1 = self._elt2pkt(p) + if lfilter is not None and not lfilter(p1): + continue + print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"), + p1.sprintf("%.time%"), + self._elt2sum(p))) + hexdump(p1) + + def padding(self, lfilter=None): + """Same as hexraw(), for Padding layer""" + for i,p in enumerate(self.res): + p1 = self._elt2pkt(p) + if p1.haslayer(conf.padding_layer): + if lfilter is None or lfilter(p1): + print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"), + p1.sprintf("%.time%"), + self._elt2sum(p))) + hexdump(p1.getlayer(conf.padding_layer).load) + + def nzpadding(self, lfilter=None): + """Same as padding() but only non null padding""" + for i,p in enumerate(self.res): + p1 = self._elt2pkt(p) + if p1.haslayer(conf.padding_layer): + pad = p1.getlayer(conf.padding_layer).load + if pad == pad[0]*len(pad): + continue + if lfilter is None or lfilter(p1): + print("%s %s %s" % (conf.color_theme.id(i,fmt="%04i"), + p1.sprintf("%.time%"), + self._elt2sum(p))) + hexdump(p1.getlayer(conf.padding_layer).load) + + + def conversations(self, getsrcdst=None, draw = True, **kargs): + """Graphes a conversations between sources and destinations and display it + (using graphviz) + getsrcdst: a function that takes an element of the list and return the source and dest + by defaults, return source and destination IP + if networkx library is available returns a DiGraph, or draws it if draw = True otherwise graphviz is used + format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option + target: output filename. If None, matplotlib is used to display + prog: which graphviz program to use""" + if getsrcdst is None: + getsrcdst = lambda x:(x['IP'].src, x['IP'].dst) + conv = {} + for p in self.res: + p = self._elt2pkt(p) + try: + c = getsrcdst(p) + except: + #XXX warning() + continue + conv[c] = conv.get(c,0)+1 + + if NETWORKX: # networkx is available + gr = nx.DiGraph() + for s,d in conv: + if s not in gr: + gr.add_node(s) + if d not in gr: + gr.add_node(d) + gr.add_edge(s, d) + if draw: + return do_graph(gr, **kargs) + else: + return gr + else: + gr = 'digraph "conv" {\n' + for s,d in conv: + gr += '\t "%s" -> "%s"\n' % (s,d) + gr += "}\n" + return do_graph(gr, **kargs) + + def afterglow(self, src=None, event=None, dst=None, **kargs): + """Experimental clone attempt of http://sourceforge.net/projects/afterglow + each datum is reduced as src -> event -> dst and the data are graphed. + by default we have IP.src -> IP.dport -> IP.dst""" + if src is None: + src = lambda x: x['IP'].src + if event is None: + event = lambda x: x['IP'].dport + if dst is None: + dst = lambda x: x['IP'].dst + sl = {} + el = {} + dl = {} + for i in self.res: + try: + s,e,d = src(i),event(i),dst(i) + if s in sl: + n,l = sl[s] + n += 1 + if e not in l: + l.append(e) + sl[s] = (n,l) + else: + sl[s] = (1,[e]) + if e in el: + n,l = el[e] + n+=1 + if d not in l: + l.append(d) + el[e] = (n,l) + else: + el[e] = (1,[d]) + dl[d] = dl.get(d,0)+1 + except: + continue + + import math + def normalize(n): + return 2+math.log(n)/4.0 + + def minmax(x): + m,M = min(x),max(x) + if m == M: + m = 0 + if M == 0: + M = 1 + return m,M + + #mins,maxs = minmax(map(lambda (x,y): x, sl.values())) + mins,maxs = minmax([ a[0] for a in sl.values()]) + #mine,maxe = minmax(map(lambda (x,y): x, el.values())) + mine,maxe = minmax([ a[0] for a in el.values()]) + mind,maxd = minmax(dl.values()) + + gr = 'digraph "afterglow" {\n\tedge [len=2.5];\n' + + gr += "# src nodes\n" + for s in sl: + n,l = sl[s]; n = 1+(n-mins)/(maxs-mins) + gr += '"src.%s" [label = "%s", shape=box, fillcolor="#FF0000", style=filled, fixedsize=1, height=%.2f,width=%.2f];\n' % (repr(s),repr(s),n,n) + gr += "# event nodes\n" + for e in el: + n,l = el[e]; n = n = 1+(n-mine)/(maxe-mine) + gr += '"evt.%s" [label = "%s", shape=circle, fillcolor="#00FFFF", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(e),repr(e),n,n) + for d in dl: + n = dl[d]; n = n = 1+(n-mind)/(maxd-mind) + gr += '"dst.%s" [label = "%s", shape=triangle, fillcolor="#0000ff", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(d),repr(d),n,n) + + gr += "###\n" + for s in sl: + n,l = sl[s] + for e in l: + gr += ' "src.%s" -> "evt.%s";\n' % (repr(s),repr(e)) + for e in el: + n,l = el[e] + for d in l: + gr += ' "evt.%s" -> "dst.%s";\n' % (repr(e),repr(d)) + + gr += "}" + return do_graph(gr, **kargs) + + + def _dump_document(self, **kargs): + import pyx + d = pyx.document.document() + l = len(self.res) + for i in range(len(self.res)): + elt = self.res[i] + c = self._elt2pkt(elt).canvas_dump(**kargs) + cbb = c.bbox() + c.text(cbb.left(),cbb.top()+1,r"\font\cmssfont=cmss12\cmssfont{Frame %i/%i}" % (i,l),[pyx.text.size.LARGE]) + if conf.verb >= 2: + os.write(1,b".") + d.append(pyx.document.page(c, paperformat=pyx.document.paperformat.A4, + margin=1*pyx.unit.t_cm, + fittosize=1)) + return d + + + + def psdump(self, filename = None, **kargs): + """Creates a multipage poscript file with a psdump of every packet + filename: name of the file to write to. If empty, a temporary file is used and + conf.prog.psreader is called""" + d = self._dump_document(**kargs) + if filename is None: + filename = get_temp_file(autoext=".ps") + d.writePSfile(filename) + subprocess.Popen([conf.prog.psreader, filename+".ps"]) + else: + d.writePSfile(filename) + print + + def pdfdump(self, filename = None, **kargs): + """Creates a PDF file with a psdump of every packet + filename: name of the file to write to. If empty, a temporary file is used and + conf.prog.pdfreader is called""" + d = self._dump_document(**kargs) + if filename is None: + filename = get_temp_file(autoext=".pdf") + d.writePDFfile(filename) + subprocess.Popen([conf.prog.pdfreader, filename+".pdf"]) + else: + d.writePDFfile(filename) + print + + def sr(self,multi=0): + """sr([multi=1]) -> (SndRcvList, PacketList) + Matches packets in the list and return ( (matched couples), (unmatched packets) )""" + remain = self.res[:] + sr = [] + i = 0 + while i < len(remain): + s = remain[i] + j = i + while j < len(remain)-1: + j += 1 + r = remain[j] + if r.answers(s): + sr.append((s,r)) + if multi: + remain[i]._answered=1 + remain[j]._answered=2 + continue + del(remain[j]) + del(remain[i]) + i -= 1 + break + i += 1 + if multi: + remain = filter(lambda x:not hasattr(x,"_answered"), remain) + return SndRcvList(sr),PacketList(remain) + + def sessions(self, session_extractor=None): + if session_extractor is None: + def session_extractor(p): + sess = "Other" + if 'Ether' in p: + if 'IP' in p: + if 'TCP' in p: + sess = p.sprintf("TCP %IP.src%:%r,TCP.sport% > %IP.dst%:%r,TCP.dport%") + elif 'UDP' in p: + sess = p.sprintf("UDP %IP.src%:%r,UDP.sport% > %IP.dst%:%r,UDP.dport%") + elif 'ICMP' in p: + sess = p.sprintf("ICMP %IP.src% > %IP.dst% type=%r,ICMP.type% code=%r,ICMP.code% id=%ICMP.id%") + else: + sess = p.sprintf("IP %IP.src% > %IP.dst% proto=%IP.proto%") + elif 'ARP' in p: + sess = p.sprintf("ARP %ARP.psrc% > %ARP.pdst%") + else: + sess = p.sprintf("Ethernet type=%04xr,Ether.type%") + return sess + sessions = defaultdict(self.__class__) + for p in self.res: + sess = session_extractor(self._elt2pkt(p)) + sessions[sess].append(p) + return dict(sessions) + + def replace(self, *args, **kargs): + """ + lst.replace(<field>,[<oldvalue>,]<newvalue>) + lst.replace( (fld,[ov],nv),(fld,[ov,]nv),...) + if ov is None, all values are replaced + ex: + lst.replace( IP.src, "192.168.1.1", "10.0.0.1" ) + lst.replace( IP.ttl, 64 ) + lst.replace( (IP.ttl, 64), (TCP.sport, 666, 777), ) + """ + delete_checksums = kargs.get("delete_checksums",False) + x=PacketList(name="Replaced %s" % self.listname) + if type(args[0]) is not tuple: + args = (args,) + for p in self.res: + p = self._elt2pkt(p) + copied = False + for scheme in args: + fld = scheme[0] + old = scheme[1] # not used if len(scheme) == 2 + new = scheme[-1] + for o in fld.owners: + if o in p: + if len(scheme) == 2 or p[o].getfieldval(fld.name) == old: + if not copied: + p = p.copy() + if delete_checksums: + p.delete_checksums() + copied = True + setattr(p[o], fld.name, new) + x.append(p) + return x + + +class SndRcvList(PacketList): + def __init__(self, res=None, name="Results", stats=None): + PacketList.__init__(self, res, name, stats, vector_index = 1) + def summary(self, prn=None, lfilter=None): + """prints a summary of each SndRcv packet pair +prn: function to apply to each packet pair instead of lambda s, r: "%s ==> %s" % (s.summary(),r.summary()) +lfilter: truth function to apply to each packet pair to decide whether it will be displayed""" + for s, r in self.res: + if lfilter is not None: + if not lfilter(s, r): + continue + if prn is None: + print(self._elt2sum((s, r))) + else: + print(prn(s, r)) + def nsummary(self,prn=None, lfilter=None): + """prints a summary of each SndRcv packet pair with the pair's number +prn: function to apply to each packet pair instead of lambda s, r: "%s ==> %s" % (s.summary(),r.summary()) +lfilter: truth function to apply to each packet pair to decide whether it will be displayed""" + for i, (s, r) in enumerate(self.res): + if lfilter is not None: + if not lfilter(s, r): + continue + print(conf.color_theme.id(i,fmt="%04i"), end = " ") + if prn is None: + print(self._elt2sum((s, r))) + else: + print(prn(s, r)) + def filter(self, func): + """Returns a SndRcv list filtered by a truth function""" + return self.__class__( [ i for i in self.res if func(*i) ], name='filtered %s'%self.listname) + + def make_table(self, *args, **kargs): + """Prints a table using a function that returs for each packet its head column value, head row value and displayed value + ex: p.make_table(lambda s, r:(s[IP].dst, r[TCP].sport, s[TCP].sprintf("%flags%")) """ + return make_table(self.res, *args, **kargs) + def make_lined_table(self, *args, **kargs): + """Same as make_table, but print a table with lines""" + return make_lined_table(self.res, *args, **kargs) + def make_tex_table(self, *args, **kargs): + """Same as make_table, but print a table with LaTeX syntax""" + return make_tex_table(self.res, *args, **kargs) |